# Pigeon Post (2) Writeup > From: HKCERT CTF 2024 > Tags: crypto > Challenge author: Mystiz > Challenge description: > In 1861, our antagonists Alice and Byron noticed that Ozetta is actively sniffing their messages. They decided to look for you for transmitting their messages, but this time they have already come up with a key beforehand. > > Can you retrieve their secret this time? > > Solved by: Jackylkk2003 ## First Pigeon vs Second Pigeon Since we have an easier challenge Pigeon Post (1) (with [a lovely guide](https://hackmd.io/@blackb6a/hkcert-ctf-2024-ii-en-07128acbc80dd0a4#Crypto-Pigeon-Post-1)), we can see what is the difference between the easier version and the hard version. ```! $ diff pigeon1/chall.py pigeon2/chall.py 146a147,152 > res = byron.handle_message(res) > print(res) > > res = alice.handle_message(res) > print(res) > ``` By looking at the code and also the challenge description, we know that this time, we cannot interfere with the key exchange process. ## Code Analysis ```python!=8 # This is the parameter specified in RFC-3526. Let's assume this is safe :) # https://datatracker.ietf.org/doc/html/rfc3526#section-3 P = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca18217c32905e462e36ce3be39e772c180e86039b2783a2ec07a28fb5c55df06f4c52c9de2bcbf6955817183995497cea956ae515d2261898fa051015728e5a8aacaa68ffffffffffffffff G = 0x2 ``` Let's assume the parameters are safe ~~because Mystiz said it is secure in the comments~~. Let's consider everything before we can give the first input. ```python=13 class User: def __init__(self, id, secret=None, other_secret=None): self.id = id self.secret = secret self.other_secret = other_secret self.private_key = None self.session_key = None pass # Message handler def handle_message(self, message): message = json.loads(message) type_ = message.get('type') if type_ == 'init_handshake': res = self.init_handshake() elif type_ == 'receive_handshake': other_public_key = message.get('public_key') res = self.receive_handshake(other_public_key) elif type_ == 'finish_handshake': other_public_key = message.get('public_key') res = self.finish_handshake(other_public_key) elif type_ == 'communicate': ciphertext = bytes.fromhex(message.get('ciphertext')) res = self.communicate(ciphertext) else: raise Exception(f'unknown message type {type_}') return json.dumps(res, separators=(',', ':')) # === # Phase 1: Handshaking def init_handshake(self): assert self.private_key is None self.private_key = secrets.randbelow(P) self.public_key = pow(G, self.private_key, P) return { "type": "receive_handshake", "public_key": self.public_key } def receive_handshake(self, other_public_key): assert self.private_key is None assert self.session_key is None self.private_key = secrets.randbelow(P) self.public_key = pow(G, self.private_key, P) self.session_key = derive_session_key(other_public_key, self.private_key) return { "type": "finish_handshake", "public_key": self.public_key } def finish_handshake(self, other_public_key): assert self.session_key is None self.session_key = derive_session_key(other_public_key, self.private_key) message = b'done!' ciphertext = encrypt_message(self.session_key, message) return { "type": "communicate", "ciphertext": ciphertext.hex() } ``` And also some code in main function: ```python=137 def main(): flag = os.environ.get('FLAG', 'hkcert24{***REDACTED***}') token = os.urandom(8).hex() alice = User('Alice', flag, token) byron = User('Byron', token) res = alice.handle_message('{"type":"init_handshake"}') print(res) res = byron.handle_message(res) print(res) res = alice.handle_message(res) print(res) ``` The code is a bit lengthy. But the key idea is that before we can even give an input, the handshaking has already finished. Alice and Byron both generated their own secrets, private keys, and they also share a common session key. We have their public keys though. We can also see the asserts in the handshaking functions. They constrained that these handshaking functions cannot be called more than once. In Pigeon Post (1), we can perform man-in-the-middle attack and establish malicious connections with Alice and Byron. > Although the key exchange protocol is able to prevent passive attacks, it does not check whether the opposite party is the one we want to communicate with. An adversary can actively act as the opposite party to both Alice and Byron. That is, if we trust Mystiz again, the vulnerability is not in the Diffie-Hellman key exchange. There is something else to exploit. Now let's see what happens when we give inputs. ```python=153 while True: command, *args = input('🕊️ ').strip().split(' ') if command == 'alice': # send a message to Alice content, = args print(alice.handle_message(content)) elif command == 'byron': # send a message to Byron content, = args print(byron.handle_message(content)) ``` Sending message, nothing special. ```python=85 # Phase 2: Encrypted communication def communicate(self, incoming_ciphertext): incoming_message = decrypt_message(self.session_key, incoming_ciphertext) # message handler if self.id == 'Byron': if incoming_message == b'done!': outgoing_message = f'what is the flag? I have the secret {self.secret}'.encode() elif incoming_message.startswith(b'the flag is '): flag = incoming_message[12:].strip() if re.match(br'hkcert24{.*}', flag): outgoing_message = b'nice flag!' else: outgoing_message = b'too bad...' else: outgoing_message = b'???' elif self.id == 'Alice': if incoming_message == f'what is the flag? I have the secret {self.other_secret}'.encode(): outgoing_message = f'the flag is {self.secret}'.encode() elif incoming_message == b'nice flag!': outgoing_message = b':)' elif incoming_message == b'too bad...': outgoing_message = b'what happened?' else: outgoing_message = b'???' outgoing_ciphertext = encrypt_message(self.session_key, outgoing_message) return { "type": "communicate", "ciphertext": outgoing_ciphertext.hex() } ``` By looking at this function, we can see that both Byron and Alice are replying only based on the current message and the handshaking messages. Anything between the handshaking and the latest message is irrelevant in the latest reply. That means, replay attack or similar attacks may have some effects. The last thing we have to look at are the utility functions. ```python=119 # Utility functions def derive_session_key(other_public_key, self_private_key): shared_key = pow(other_public_key, self_private_key, P) session_key = hashlib.sha256(shared_key.to_bytes(512, 'big')).digest() return session_key def encrypt_message(session_key: bytes, message: bytes): nonce = os.urandom(8) cipher = AES.new(session_key, AES.MODE_CTR, nonce=nonce) return nonce + cipher.encrypt(message) def decrypt_message(session_key: bytes, ciphertext: bytes): nonce, ciphertext = ciphertext[:8], ciphertext[8:] cipher = AES.new(session_key, AES.MODE_CTR, nonce=nonce) return cipher.decrypt(ciphertext) ``` The derive session key is not really special. But one thing that worth noticing is the message encryption and decryption method. We can see that the messages are encrypted using AES-CTR, with the nonce prepended to the ciphertext. In other words, we can control the nonce. Recall that the most important vulnerability of CTR is nonce reuse. If we can control the nonce, then we can reuse the nonce. ## Vulnerability So we have identified the vulnerability is the reuse of nonce in CTR. But we can only control what Alice and Byron receive, not send. Notice that the flag is only sent in the message `the flag is {self.secret}`. We have to reuse the nonce in that message. From that message, we know the plaintext starts with `the flag is hkcert24{` and the respective key stream. That is, we can send encrypted messages that starts with `the flag is hkcert24{`. ## Oracle To facilitate the attack, we will have to find an oracle. By looking at the code again, we can find something interesting that can work as an oracle. ```python=94 elif incoming_message.startswith(b'the flag is '): flag = incoming_message[12:].strip() if re.match(br'hkcert24{.*}', flag): outgoing_message = b'nice flag!' else: outgoing_message = b'too bad...' ``` Given that our message starts with `the flag is hkcert24{`, this code becomes checking if the message ends with `}`. Now the remaining problem is that we don't directly receive the message `nice flag!` or `too bad...`, we receive an encrypted version of that. We cannot distinguish between the 2 messages without knowing the key. Since we do not have the key, we will ask Alice who has the key to decrypt the message. When Alice receives `nice flag!`, Alice will reply with an encrypted `:)`, and if Alice receives `too bad...`, she will reply with an encrypted `what happened?` instead. > But we still cannot decrypt the message... Actually it is possible for us to distinguish between the messages. In the AES-CTR implementation of `pycryptodome`, the plaintext and ciphertext lengths are not necessarily integer multiples of 16 bytes. The ciphertext length will be exactly the same as the plaintext length. In other words, the ciphertext of `what happened?` will be longer than the ciphertext of `:)`, and we can obtain information from this. So we now have an oracle that returns whether the decrypted message ends with `}` given that it starts with `the flag is hkcert24{`. ## Attack Since the oracle tells us whether the decrypted message ends with `}`, this is actually similar to the padding oracle. The attack idea is the same as padding oracle. We leak the bytes one by one until the whole flag is decrypted. Using the oracle, we can guess the key stream and lastly we can use this attack to crack the cipher. ## Solve Script The utility functions of the solve script are the same as what we have in Pigeon Post (1) solve script, so they are skipped. You can find the whole solve script ```python= def experiment2(): r = connect() # Alice runs `init_handshake` j = receive(r) # Byron runs `receive_handshake` j = receive(r) # Alice runs `finish_handshake` j = receive(r) send(r, 'byron', j) # Byron sends what is the flag? j = receive(r) send(r, 'alice', j) # Alice sends the flag j = receive(r) nonce, ciphertext = bytes.fromhex(j['ciphertext'][:16]), bytes.fromhex(j['ciphertext'][16:]) known_plaintext = b'the flag is hkcert24{' known_stream = xor(ciphertext, known_plaintext, cut='min') def check_payload(payload): send(r, 'byron', { "type": "communicate", "ciphertext": (nonce + payload).hex() }) j = receive(r) send(r, 'alice', j) j = receive(r) if len(j['ciphertext']) == (8 + len(b':)')) * 2: # * 2 because hex # print("Flag format OK") return True elif len(j['ciphertext']) == (8 + len(b'what happened?')) * 2: # print("Flag format NOT OK") return False else: raise Exception('unexpected length') for _ in tqdm(range(len(known_plaintext), len(ciphertext))): for i in range(256): payload = xor(known_stream, known_plaintext) + bytes([i ^ ord('}')]) test_stream = known_stream + bytes([i]) if check_payload(payload): known_stream = test_stream break known_plaintext = xor(known_stream, ciphertext, cut='min') print(known_plaintext.decode()) ``` Now we can get the flag `hkcert24{0n3_c4n_4ls0_l34k_1nf0rm4710n_fr0m_th3_l3n9th}` after waiting for around 20 minutes. Yay. It is not an [extremely long wait](https://hackmd.io/@blackb6a/hkcert-ctf-2024-i-en-8381451153faac4a#Solution14) anyway. ## How come we do it so slowly The script works and we have the flag. Let's optimize the code to make it run faster. The idea for the optimization is the usage of batching to reduce the communication delays, which is a common practice for crypto challenges. Batching is to send multiple messages in batches and receiving replies also in batches, and this can reduce the network delay significantly. To faciliate the batching, we write some helper functions to make our lifes easier ```python= def quick_send(r, target, req): j = json.dumps(req, separators=(',', ':')) r.sendline(f'{target} {j}'.encode()) def quick_receive(r): res = r.recvuntil(b'\n' + f'🕊️'.encode()).strip(b'\n' + f'🕊️'.encode()).decode() return json.loads(res) ``` For the actual attack logic, we also change the `check_payload` verification function into a function `next_byte` that returns the correct byte for the next position. ```python= def next_byte(ciphertext_prefix, bytes_to_test): for b in bytes_to_test: quick_send(r, 'byron', { "type": "communicate", "ciphertext": (ciphertext_prefix + bytes([b])).hex() }) replies = [quick_receive(r) for _ in range(256)] for i in range(256): quick_send(r, 'alice', replies[i]) replies = [quick_receive(r) for _ in range(256)] for i in range(256): if len(replies[i]['ciphertext']) == (8 + len(b':)')) * 2: return bytes([i]) raise Exception('no valid byte found') ``` And lastly we have our attack main loop: ```python= for _ in tqdm(range(len(known_plaintext), len(ciphertext))): bytes_to_test = bytes([i ^ ord('}') for i in range(256)]) ciphertext_prefix = nonce + xor(known_stream, known_plaintext) known_stream += next_byte(ciphertext_prefix, bytes_to_test) known_plaintext = xor(known_stream, ciphertext, cut='min') print(known_plaintext.decode()) ``` With this batching optimization, the time to get flag can be reduced to around 20 seconds from around 20 minutes. 60x speed up!!! And here is again our lovely flag `hkcert24{0n3_c4n_4ls0_l34k_1nf0rm4710n_fr0m_th3_l3n9th}`, and we can infer data from the length of ciphertexts. ## Solution Files ### `solve_slow.py`: ```python= from Crypto.Cipher import AES from pwn import * import json import os from tqdm import tqdm # interact with users def receive(r): res = r.recvline().decode().strip() return json.loads(res) def send(r, target, req): j = json.dumps(req, separators=(',', ':')) r.sendlineafter(f'🕊️'.encode(), f'{target} {j}'.encode()) # cryptographic toolbox P = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca18217c32905e462e36ce3be39e772c180e86039b2783a2ec07a28fb5c55df06f4c52c9de2bcbf6955817183995497cea956ae515d2261898fa051015728e5a8aacaa68ffffffffffffffff G = 0x2 def derive_public_key(private_key: int): return pow(G, private_key, P) def derive_session_key(other_public_key: int, self_private_key: int): shared_key = pow(other_public_key, self_private_key, P) session_key = hashlib.sha256(shared_key.to_bytes(512, 'big')).digest() return session_key def encrypt(session_key: bytes, message: bytes) -> str: nonce = os.urandom(8) cipher = AES.new(session_key, AES.MODE_CTR, nonce=nonce) ciphertext = nonce + cipher.encrypt(message) return ciphertext.hex() def decrypt(session_key: bytes, ciphertext: str) -> bytes: ciphertext = bytes.fromhex(ciphertext) nonce, ciphertext = ciphertext[:8], ciphertext[8:] cipher = AES.new(session_key, AES.MODE_CTR, nonce=nonce) return cipher.decrypt(ciphertext) # === def connect(): # return process(["python3", "chall.py"]) return remote("c24b-pigeon-2.hkcert24.pwnable.hk", 1337, ssl=True) def experiment1(): r = connect() # Alice runs `init_handshake` j = receive(r) print(f'Alice->Byron: {j}') send(r, 'byron', j) # Byron runs `receive_handshake` j = receive(r) print(f'Byron->Alice: {j}') send(r, 'alice', j) # Alice runs `finish_handshake` j = receive(r) print(f'Alice->Byron: {j}') send(r, 'byron', j) # Byron receives the message "done!" j = receive(r) print(f'Byron->Alice: {j}') send(r, 'alice', j) # Alice receives the message "what is the flag?" j = receive(r) print(f'Alice->Byron: {j}') send(r, 'byron', j) # Byron receives the message "the flag is..." j = receive(r) print(f'Byron->Alice: {j}') send(r, 'alice', j) # Alice receives the message "nice flag!" j = receive(r) print(f'Alice->Byron: {j}') send(r, 'byron', j) # Byron receives the message ":)" def experiment2(): r = connect() # Alice runs `init_handshake` j = receive(r) # Byron runs `receive_handshake` j = receive(r) # Alice runs `finish_handshake` j = receive(r) send(r, 'byron', j) # Byron sends what is the flag? j = receive(r) send(r, 'alice', j) # Alice sends the flag j = receive(r) nonce, ciphertext = bytes.fromhex(j['ciphertext'][:16]), bytes.fromhex(j['ciphertext'][16:]) known_plaintext = b'the flag is hkcert24{' known_stream = xor(ciphertext, known_plaintext, cut='min') def check_payload(payload): send(r, 'byron', { "type": "communicate", "ciphertext": (nonce + payload).hex() }) j = receive(r) send(r, 'alice', j) j = receive(r) if len(j['ciphertext']) == (8 + len(b':)')) * 2: # * 2 because hex # print("Flag format OK") return True elif len(j['ciphertext']) == (8 + len(b'what happened?')) * 2: # print("Flag format NOT OK") return False else: raise Exception('unexpected length') for _ in tqdm(range(len(known_plaintext), len(ciphertext))): for i in range(256): payload = xor(known_stream, known_plaintext) + bytes([i ^ ord('}')]) test_stream = known_stream + bytes([i]) if check_payload(payload): known_stream = test_stream break known_plaintext = xor(known_stream, ciphertext, cut='min') print(known_plaintext.decode()) if __name__ == '__main__': # experiment1() experiment2() ``` ### `solve_fast.py` ```python= from Crypto.Cipher import AES from pwn import * import json import os from tqdm import tqdm # interact with users def receive(r): res = r.recvline().decode().strip() return json.loads(res) def send(r, target, req): j = json.dumps(req, separators=(',', ':')) r.sendlineafter(f'🕊️'.encode(), f'{target} {j}'.encode()) def quick_send(r, target, req): j = json.dumps(req, separators=(',', ':')) r.sendline(f'{target} {j}'.encode()) def quick_receive(r): res = r.recvuntil(b'\n' + f'🕊️'.encode()).strip(b'\n' + f'🕊️'.encode()).decode() return json.loads(res) # cryptographic toolbox P = 0xffffffffffffffffc90fdaa22168c234c4c6628b80dc1cd129024e088a67cc74020bbea63b139b22514a08798e3404ddef9519b3cd3a431b302b0a6df25f14374fe1356d6d51c245e485b576625e7ec6f44c42e9a637ed6b0bff5cb6f406b7edee386bfb5a899fa5ae9f24117c4b1fe649286651ece45b3dc2007cb8a163bf0598da48361c55d39a69163fa8fd24cf5f83655d23dca3ad961c62f356208552bb9ed529077096966d670c354e4abc9804f1746c08ca18217c32905e462e36ce3be39e772c180e86039b2783a2ec07a28fb5c55df06f4c52c9de2bcbf6955817183995497cea956ae515d2261898fa051015728e5a8aacaa68ffffffffffffffff G = 0x2 def derive_public_key(private_key: int): return pow(G, private_key, P) def derive_session_key(other_public_key: int, self_private_key: int): shared_key = pow(other_public_key, self_private_key, P) session_key = hashlib.sha256(shared_key.to_bytes(512, 'big')).digest() return session_key def encrypt(session_key: bytes, message: bytes) -> str: nonce = os.urandom(8) cipher = AES.new(session_key, AES.MODE_CTR, nonce=nonce) ciphertext = nonce + cipher.encrypt(message) return ciphertext.hex() def decrypt(session_key: bytes, ciphertext: str) -> bytes: ciphertext = bytes.fromhex(ciphertext) nonce, ciphertext = ciphertext[:8], ciphertext[8:] cipher = AES.new(session_key, AES.MODE_CTR, nonce=nonce) return cipher.decrypt(ciphertext) # === def connect(): # return process(["python3", "chall.py"]) return remote("c24b-pigeon-2.hkcert24.pwnable.hk", 1337, ssl=True) def experiment1(): r = connect() # Alice runs `init_handshake` j = receive(r) print(f'Alice->Byron: {j}') send(r, 'byron', j) # Byron runs `receive_handshake` j = receive(r) print(f'Byron->Alice: {j}') send(r, 'alice', j) # Alice runs `finish_handshake` j = receive(r) print(f'Alice->Byron: {j}') send(r, 'byron', j) # Byron receives the message "done!" j = receive(r) print(f'Byron->Alice: {j}') send(r, 'alice', j) # Alice receives the message "what is the flag?" j = receive(r) print(f'Alice->Byron: {j}') send(r, 'byron', j) # Byron receives the message "the flag is..." j = receive(r) print(f'Byron->Alice: {j}') send(r, 'alice', j) # Alice receives the message "nice flag!" j = receive(r) print(f'Alice->Byron: {j}') send(r, 'byron', j) # Byron receives the message ":)" def experiment2(): r = connect() # Alice runs `init_handshake` j = receive(r) # Byron runs `receive_handshake` j = receive(r) # Alice runs `finish_handshake` j = receive(r) send(r, 'byron', j) # Byron sends what is the flag? j = receive(r) send(r, 'alice', j) # Alice sends the flag j = receive(r) nonce, ciphertext = bytes.fromhex(j['ciphertext'][:16]), bytes.fromhex(j['ciphertext'][16:]) known_plaintext = b'the flag is hkcert24{' known_stream = xor(ciphertext, known_plaintext, cut='min') def next_byte(ciphertext_prefix, bytes_to_test): for b in bytes_to_test: quick_send(r, 'byron', { "type": "communicate", "ciphertext": (ciphertext_prefix + bytes([b])).hex() }) replies = [quick_receive(r) for _ in range(256)] for i in range(256): quick_send(r, 'alice', replies[i]) replies = [quick_receive(r) for _ in range(256)] for i in range(256): if len(replies[i]['ciphertext']) == (8 + len(b':)')) * 2: return bytes([i]) raise Exception('no valid byte found') for _ in tqdm(range(len(known_plaintext), len(ciphertext))): bytes_to_test = bytes([i ^ ord('}') for i in range(256)]) ciphertext_prefix = nonce + xor(known_stream, known_plaintext) known_stream += next_byte(ciphertext_prefix, bytes_to_test) known_plaintext = xor(known_stream, ciphertext, cut='min') print(known_plaintext.decode()) if __name__ == '__main__': # experiment1() experiment2() ```