<h1 style="text-align:center; color:#8868fb"> PicoCTF Walkthrough: Part 5 </h1> Dưới đây là các lời giải cho một số thử thách của **picoCTF** mà mình cảm thấy thú vị và hữu ích. Đây **chắc chắn** là phần cuối cùng trong năm nay khi mà chỉ còn hai bài nữa là kết thúc tất cả các thử thách về mật mã học trên **picoCTF**. ## Ricochet ![image](https://hackmd.io/_uploads/SJ7rIh1kWx.png) :::spoiler <b>crypto.py</b> ```python= import hashlib import monocypher import os def compute_hmac(message, nonce, key): hmac = (message + str(nonce) + key.hex()).encode() for _ in range(32): hmac = monocypher.blake2b(hmac + key) return monocypher.blake2b(hmac) def add_hmac(message, nonce, key): hmac = compute_hmac(message, nonce, key) return {"message": message, "nonce": nonce, "hmac": hmac.hex()} def validate_hmac(message, nonce, key): hmac = compute_hmac(message["message"], nonce, key) if message["hmac"] == hmac.hex() and message["nonce"] == nonce: return message["message"] return None def encrypt(message, key): key = monocypher.blake2b(key)[:32] nonce = os.urandom(24) tag, ciphertext = monocypher.lock(key, nonce, message.encode()) return ciphertext.hex() + ";" + tag.hex() + ";" + nonce.hex() def decrypt(message, key): key = monocypher.blake2b(key)[:32] ciphertext, tag, nonce = message.split(";") plaintext = monocypher.unlock(key, bytes.fromhex(nonce), bytes.fromhex(tag), bytes.fromhex(ciphertext)) return plaintext ``` ::: :::spoiler <b>radio_interface.py</b> ```python= import requests # TODO: you will need to fill this in with the URL of the challenge SERVER_URL = "http://127.0.0.1:5000" # Returns a list of messages seen on the air since the last time this function # was called def receive_radio_messages(): messages = requests.get(SERVER_URL+"/radio_rx").json() for msg in messages: print("DEBUG: Received message", msg) return messages # [message] argument should be a Python dict object def inject_radio_message(message): requests.post(SERVER_URL+"/radio_tx", json=message) def start_robot(): requests.get(SERVER_URL+"/start") def stop_robot(): requests.get(SERVER_URL+"/stop") # Get the location of the robot and the obstacles on the grid def get_board_state(): return requests.get(SERVER_URL+"/state").json() ``` ::: :::spoiler <b>robot.py</b> ```python= from radio_base import RadioDevice import asyncio from keys import keys import traceback import os import crypto import json import robot_low_level import monocypher ROBOT_ADDRESS = 0x10 class Robot(RadioDevice): def __init__(self, network): super().__init__("Robot", network) self.start = False self.stop = False self.address = 0x20 self.receive_buffer = [] self.dh_key_priv = os.urandom(32) # Initialize to a random key so we don't accidentally send unencrypted data self.dh_key_shared = os.urandom(32) self.nonce = 0 def debug(self, msg): msg = str(msg) self.send_message({ "msg_type": "debug", "src": self.address, "dst": 0xFF, "message": msg }) def message_callback(self, msg): try: msg_type = msg["msg_type"] msg_dst = int(msg["dst"]) if msg_dst != self.address: return if msg_type == "ping": self.send_message({ "msg_type": "pong", "src": self.address, "dst": msg["src"] }) else: self.receive_buffer.append(msg) except: traceback.print_exc() async def wait_message(self, msg_type): for _ in range(100): try: if len(self.receive_buffer): msg = self.receive_buffer.pop(0) if msg["msg_type"] == msg_type: return msg except: traceback.print_exc() await asyncio.sleep(0.1) return {} def hmac_and_encrypt(self, msg, nonce): msg_with_hmac = json.dumps(crypto.add_hmac(msg, nonce, keys["shared_hmac_key"])) msg_encrypted = crypto.encrypt(msg_with_hmac, self.dh_key_shared) return msg_encrypted def decrypt_and_check_hmac(self, msg, nonce): msg = crypto.decrypt(msg, self.dh_key_shared) if msg is None: return None msg_with_hmac = json.loads(msg) return crypto.validate_hmac(msg_with_hmac, nonce, keys["shared_hmac_key"]) async def send_secure_data(self, msg): msg_encrypted = self.hmac_and_encrypt(msg, self.nonce) self.send_message({ "msg_type": "secure_data", "src": self.address, "dst": ROBOT_ADDRESS, "encrypted": msg_encrypted }) response = await self.wait_message("secure_data_ack") ack_decrypted = self.decrypt_and_check_hmac(response["encrypted"], self.nonce) if ack_decrypted is None: self.debug("acknowledgement failed to validate") return # Ack packets are zero length if len(ack_decrypted) != 0: self.debug("acknowledgement mismatch") return self.nonce += 1 async def recv_secure_data(self): # Zero length packet as a secure data request msg_encrypted = self.hmac_and_encrypt("", self.nonce) self.send_message({ "msg_type": "secure_data_request", "src": self.address, "dst": ROBOT_ADDRESS, "encrypted": msg_encrypted }) response = await self.wait_message("secure_data_response") response_decrypted = self.decrypt_and_check_hmac(response["encrypted"], self.nonce) if response_decrypted is None: self.debug("response failed to validate") return self.nonce += 1 return response_decrypted async def read_command(self): await self.send_secure_data("get_movement") await asyncio.sleep(0.2) while True: resp = await self.recv_secure_data() if resp != "": return resp async def run(self): while self.running: while not self.start: await asyncio.sleep(0.1) self.nonce = 0 # move the robot back to its origin/home location robot_low_level.reset() # Validate the robot's authenticity challenge = os.urandom(16).hex() self.send_message({ "src": self.address, "dst": ROBOT_ADDRESS, "msg_type": "validate", "challenge": challenge }) response = await self.wait_message("ack_validate") expected_response = crypto.compute_hmac(challenge, 0, keys["authenticity_key"]) if response.get("response") != expected_response.hex(): self.debug("Robot validation failed") self.start = False continue self.debug("Robot successfully validated") # Diffie-Hellman key exchange to secure future communications self.send_message({ "src": self.address, "dst": ROBOT_ADDRESS, "msg_type": "key_exchange", "key": monocypher.compute_key_exchange_public_key(self.dh_key_priv).hex() }) response = await self.wait_message("ack_key_exchange") self.dh_key_shared = monocypher.key_exchange(self.dh_key_priv, bytes.fromhex(response["key"])) # Once validated, read some commands from the controller and execute them for _ in range(20): cmd = (await self.read_command()) if cmd is None: self.debug("timed out trying to read a robot command") break if len(cmd) != 0: await robot_low_level.move(cmd) if self.stop: break if self.stop: await robot_low_level.move("stop") else: await robot_low_level.move("done") self.stop = False self.start = False ``` ::: :::spoiler <b>robotcontroller.py</b> ```python= from radio_base import RadioDevice import asyncio import traceback from keys import keys import os import crypto import json import monocypher class RobotController(RadioDevice): def __init__(self, network): super().__init__("RobotController", network) self.address = 0x10 self.dh_key_priv = os.urandom(32) # Initialize to a random key so we don't accidentally send unencrypted data self.dh_key_shared = os.urandom(32) self.send_buffer_secure = "" self.recv_buffer_secure = "" self.nonce = 0 self.movement_counter = 0 def reset(self): self.nonce = 0 self.movement_counter = 0 def debug(self, msg): msg = str(msg) self.send_message({ "msg_type": "debug", "src": self.address, "dst": 0xFF, "message": msg }) def message_callback(self, msg): try: msg_type = msg["msg_type"] msg_dst = int(msg["dst"]) if msg_dst != self.address: return if msg_type == "ping": self.send_message({ "msg_type": "pong", "src": self.address, "dst": msg["src"] }) # To avoid the bootstrapping problem, changing the address needs to # be an unauthenticated operation if msg_type == "set_addr": self.address = int(msg["new_addr"]) & 0xFF self.send_message({ "msg_type": "ack_set_addr", "src": self.address, "dst": msg["src"] }) if msg_type == "validate": response = crypto.compute_hmac(msg["challenge"], 0, keys["authenticity_key"]) self.send_message({ "msg_type": "ack_validate", "src": self.address, "dst": msg["src"], "response": response.hex() }) if msg_type == "key_exchange": self.send_message({ "msg_type": "ack_key_exchange", "src": self.address, "dst": msg["src"], "key": monocypher.compute_key_exchange_public_key(self.dh_key_priv).hex() }) self.dh_key_shared = monocypher.key_exchange(self.dh_key_priv, bytes.fromhex(msg["key"])) if msg_type == "secure_data_request": self.process_secure_data_request(msg) if msg_type == "secure_data": self.process_secure_data(msg) except: traceback.print_exc() def hmac_and_encrypt(self, msg, nonce): msg_with_hmac = json.dumps(crypto.add_hmac(msg, nonce, keys["shared_hmac_key"])) msg_encrypted = crypto.encrypt(msg_with_hmac, self.dh_key_shared) return msg_encrypted def decrypt_and_check_hmac(self, msg, nonce): msg = crypto.decrypt(msg, self.dh_key_shared) if msg is None: return None msg_with_hmac = json.loads(msg) return crypto.validate_hmac(msg_with_hmac, nonce, keys["shared_hmac_key"]) def process_secure_data_request(self, request): request_decrypted = self.decrypt_and_check_hmac(request["encrypted"], self.nonce) if request_decrypted is None: self.debug("message failed to validate") return # Avoid type confusion by ensuring a secure data request should always # be of length zero if len(request_decrypted) != 0: self.debug("message mismatch") return # If the request is valid, send our current send buffer in response msg = self.send_buffer_secure self.send_buffer_secure = "" msg_encrypted = self.hmac_and_encrypt(msg, self.nonce) self.nonce += 1 self.send_message({ "msg_type": "secure_data_response", "src": self.address, "dst": request["src"], "encrypted": msg_encrypted }) def process_secure_data(self, msg): msg_decrypted = self.decrypt_and_check_hmac(msg["encrypted"], self.nonce) if msg_decrypted is None: self.debug("message failed to validate") return # If the request is valid, send an empty message as an acknowledgement msg_encrypted = self.hmac_and_encrypt("", self.nonce) self.nonce += 1 self.send_message({ "msg_type": "secure_data_ack", "src": self.address, "dst": msg["src"], "encrypted": msg_encrypted }) self.recv_buffer_secure = msg_decrypted async def run(self): while self.running: await asyncio.sleep(0.05) # Wait for the secure messaging layer to provide a secure command if self.recv_buffer_secure == "get_movement": self.recv_buffer_secure = "" # Demo version sends the same four commands repeatedly # Full control is available in the licensed version of this system self.send_buffer_secure = ["east", "south", "west", "north"][self.movement_counter % 4] self.movement_counter += 1 ``` ::: Bài này khó kinh khủng, đợi khi nào trình độ mình cao lên thì mình sẽ giải thích cho các bạn lời giải, còn tạm thời bây giờ các bạn chịu khó đọc write-up [**ở đây**](https://systemweakness.com/ricochet-picoctf-2025-mitm-replay-attack-bb47375fb7db). Dưới đây là toàn bộ lời giải cho thử thách này: :::spoiler <b style="color:#e1829c">collecting_HMAC_1.py</b> ```python= # Python program to gather HMAC values of secure_data and secure_data_response message types with corresponding nonce values. import requests import time import hashlib import monocypher import os import json import time # Returns a list of messages seen on the air since the last time this function # was called def receive_radio_messages(SERVER_URL): msg_group = [] messages = requests.get(SERVER_URL+"/radio_rx").json() for msg in messages: print("\nDEBUG: Received message", msg) msg_group.append(msg) return msg_group # [message] argument should be a Python dict object def inject_radio_message(SERVER_URL, message): requests.post(SERVER_URL+"/radio_tx", json=message) def start_robot(SERVER_URL): requests.get(SERVER_URL+"/start") def stop_robot(SERVER_URL): requests.get(SERVER_URL+"/stop") # Get the location of the robot and the obstacles on the grid def get_board_state(SERVER_URL): return requests.get(SERVER_URL+"/state").json() def compute_hmac(message, nonce, key): hmac = (message + str(nonce) + key.hex()).encode() for _ in range(32): hmac = monocypher.blake2b(hmac + key) return monocypher.blake2b(hmac) def add_hmac(message, nonce, key): hmac = compute_hmac(message, nonce, key) return {"message": message, "nonce": nonce, "hmac": hmac.hex()} def encrypt(message, key): key = monocypher.blake2b(key)[:32] nonce = os.urandom(24) tag, ciphertext = monocypher.lock(key, nonce, message.encode()) return ciphertext.hex() + ";" + tag.hex() + ";" + nonce.hex() def decrypt(message, key): key = monocypher.blake2b(key)[:32] ciphertext, tag, nonce = message.split(";") plaintext = monocypher.unlock(key, bytes.fromhex(nonce), bytes.fromhex(tag), bytes.fromhex(ciphertext)) return plaintext def initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key): # Checking whether Robot and Controller are online. ping_robot = { "msg_type": "ping", "src": controller_addr, "dst": robot_addr, } ping_controller = { "msg_type": "ping", "src": robot_addr, "dst": controller_addr, } # Sending messages. inject = inject_radio_message(server, ping_robot) time.sleep(1) inject = inject_radio_message(server, ping_controller) time.sleep(1) # Reading output. print("Confirming that both Robot and Controller are online.") msg_recv = receive_radio_messages(server) # Set new addr for controller to conduct MITM attack. set_new_addr = { "msg_type": "set_addr", "src": robot_addr, "dst": controller_addr, "new_addr": new_controller_addr, } # Sending message. inject = inject_radio_message(server, set_new_addr) time.sleep(1) # Reading output. print("\nChanging Controller's address to 0x30.") msg_recv = receive_radio_messages(server) # Checking the new address of Controller. ping_controller = { "msg_type": "ping", "src": robot_addr, "dst": new_controller_addr, } # Sending message. inject = inject_radio_message(server, ping_controller) time.sleep(1) # Reading output. print("\nConfirming that Controller's address is changed.") msg_recv = receive_radio_messages(server) # Start the Robot. print("\nStarting robot...") start_robot(server) time.sleep(1) # Reading output. Extracting challenge key. msg_recv = receive_radio_messages(server) # Extracting challenge validation key. challenge = msg_recv[0]['challenge'] print("\nChallenge validation key sent from Robot:", challenge) print() # Validating robot authenticty. validate_msg = { "src": robot_addr, "dst": new_controller_addr, "msg_type": "validate", "challenge": challenge } # Sending message. inject = inject_radio_message(server, validate_msg) time.sleep(1) # Reading output. Extracting Robot's public key. msg_recv = receive_radio_messages(server) # Robot's public key. robot_dh_pub_key = msg_recv[2]['key'] print("\nRobot's public key:", robot_dh_pub_key) # The "ack_key_exchange" message for Robot. ack_key_exchange = { "msg_type": "ack_key_exchange", "src": new_controller_addr, "dst": robot_addr, "key": monocypher.compute_key_exchange_public_key(my_priv_key).hex() } my_dh_pub_key_robot = ack_key_exchange['key'] print("\nMy public key sent to Robot:", my_dh_pub_key_robot) # Construct my shared key with Robot. shared_key_robot = monocypher.key_exchange(my_priv_key, bytes.fromhex(robot_dh_pub_key)) print("\nMy shared key with Robot:", shared_key_robot.hex()) # Sending message. inject = inject_radio_message(server, ack_key_exchange) time.sleep(1) # Reading output. msg_recv = receive_radio_messages(server) # Decrypting the secure_data message from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec, shared_key_robot def get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key): # The "key_exchange" message for Controller. key_exchange = { "msg_type": "key_exchange", "src": dummy_robot_addr, "dst": new_controller_addr, "key": monocypher.compute_key_exchange_public_key(my_priv_key).hex() } my_dh_pub_key_controller = key_exchange['key'] print("\nMy public key sent to Controller:", my_dh_pub_key_controller) # Sending message. inject = inject_radio_message(server, key_exchange) time.sleep(1) # Reading output. msg_recv = receive_radio_messages(server) # Controller's public key. controller_dh_pub_key = msg_recv[0]['key'] print("\nController's public key:", controller_dh_pub_key) # Construct my shared key with Controller. shared_key_controller = monocypher.key_exchange(my_priv_key, bytes.fromhex(controller_dh_pub_key)) print("\nMy shared key with Controller:", shared_key_controller.hex()) return shared_key_controller def gather_secure_data(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr, commands): # Collecting secure_data. commands.append(msg_robot_dec) # Re-encrypt the decrypted message from Robot to send it to Controller. msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller) # Crafting secure_data from Robot to Controller. secure_data = { "msg_type": "secure_data", "src": dummy_robot_addr, "dst": new_controller_addr, "encrypted": msg_robot_reenc } # Sending message. print("\nSending re-encrypted message from Robot to Controller...") inject = inject_radio_message(server, secure_data) time.sleep(1) # Reading output. Running a while loop until msg is available as, sometimes, there is latency. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Controller. msg_controller_enc = msg_recv[0]['encrypted'] msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode()) print("\nDecrypted message from Controller:", msg_controller_dec) # Re-encrypt the decrypted message from Controller to send it to Robot. msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot) secure_data_ack = { "msg_type": "secure_data_ack", "src": new_controller_addr, "dst": robot_addr, "encrypted": msg_controller_reenc } # Sending message. print("\nSending re-encrypted message from Controller to Robot...") inject = inject_radio_message(server, secure_data_ack) time.sleep(1) # Reading output. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) # Re-encrypt the decrypted message from Robot to send it to Controller. msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller) secure_data_request = { "msg_type": "secure_data_request", "src": dummy_robot_addr, "dst": new_controller_addr, "encrypted": msg_robot_reenc } # Sending message. print("\nSending re-encrypted message from Robot to Controller...") inject = inject_radio_message(server, secure_data_request) time.sleep(1) # Reading output. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Controller. msg_controller_enc = msg_recv[0]['encrypted'] msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode()) print("\nDecrypted message from Controller:", msg_controller_dec) # Collecting secure_data_response. commands.append(msg_controller_dec) # Re-encrypt the decrypted message from Controller to send it to Robot. msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot) secure_data_response = { "msg_type": "secure_data_response", "src": new_controller_addr, "dst": robot_addr, "encrypted": msg_controller_reenc } # Sending message. print("\nSending re-encrypted message from Controller to Robot...") inject = inject_radio_message(server, secure_data_response) # Reading output. Added a timeout to break loop if no more data is flowing in. msg_recv = [] timeout = False start_time = time.time() while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) if time.time() - start_time > 10: timeout = True break if timeout == False: # Decrypt message back from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) else: pass return msg_robot_dec, commands def main(): # Initialization of SERVER_URL and PORT. PORT = int(input("Enter PORT: ")) server = "http://activist-birds.picoctf.net:" + str(PORT) + "/" # Initialization of variables. robot_addr = 0x20 controller_addr = 0x10 new_controller_addr = 0x30 dummy_robot_addr = 0x40 commands = [] my_priv_key = os.urandom(32) # Initilization process for validating robot and exchanging keys. msg_robot_dec, shared_key_robot = initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key) # Establishing a shared key with controller. shared_key_controller = get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key) # Collecting data. for i in range(20): msg_robot_dec, commands = gather_secure_data(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr, commands) print("\nCollected messages:", commands) if __name__ == "__main__": main() ``` ::: :::spoiler <b style="color:#e1829c">collecting_HMAC_2.py</b> ```python= # Program to gather HMAC values of blank messages with corresponding nonce values. import requests import time import hashlib import monocypher import os import json import time # Returns a list of messages seen on the air since the last time this function # was called def receive_radio_messages(SERVER_URL): msg_group = [] messages = requests.get(SERVER_URL+"/radio_rx").json() for msg in messages: print("\nDEBUG: Received message", msg) msg_group.append(msg) return msg_group # [message] argument should be a Python dict object def inject_radio_message(SERVER_URL, message): requests.post(SERVER_URL+"/radio_tx", json=message) def start_robot(SERVER_URL): requests.get(SERVER_URL+"/start") def stop_robot(SERVER_URL): requests.get(SERVER_URL+"/stop") # Get the location of the robot and the obstacles on the grid def get_board_state(SERVER_URL): return requests.get(SERVER_URL+"/state").json() def compute_hmac(message, nonce, key): hmac = (message + str(nonce) + key.hex()).encode() for _ in range(32): hmac = monocypher.blake2b(hmac + key) return monocypher.blake2b(hmac) def add_hmac(message, nonce, key): hmac = compute_hmac(message, nonce, key) return {"message": message, "nonce": nonce, "hmac": hmac.hex()} def encrypt(message, key): key = monocypher.blake2b(key)[:32] nonce = os.urandom(24) tag, ciphertext = monocypher.lock(key, nonce, message.encode()) return ciphertext.hex() + ";" + tag.hex() + ";" + nonce.hex() def decrypt(message, key): key = monocypher.blake2b(key)[:32] ciphertext, tag, nonce = message.split(";") plaintext = monocypher.unlock(key, bytes.fromhex(nonce), bytes.fromhex(tag), bytes.fromhex(ciphertext)) return plaintext def initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key): # Checking whether Robot and Controller are online. ping_robot = { "msg_type": "ping", "src": controller_addr, "dst": robot_addr, } ping_controller = { "msg_type": "ping", "src": robot_addr, "dst": controller_addr, } # Sending messages. inject = inject_radio_message(server, ping_robot) time.sleep(1) inject = inject_radio_message(server, ping_controller) time.sleep(1) # Reading output. print("Confirming that both Robot and Controller are online.") msg_recv = receive_radio_messages(server) # Set new addr for controller to conduct MITM attack. set_new_addr = { "msg_type": "set_addr", "src": robot_addr, "dst": controller_addr, "new_addr": new_controller_addr, } # Sending message. inject = inject_radio_message(server, set_new_addr) time.sleep(1) # Reading output. print("\nChanging Controller's address to 0x30.") msg_recv = receive_radio_messages(server) # Checking the new address of Controller. ping_controller = { "msg_type": "ping", "src": robot_addr, "dst": new_controller_addr, } # Sending message. inject = inject_radio_message(server, ping_controller) time.sleep(1) # Reading output. print("\nConfirming that Controller's address is changed.") msg_recv = receive_radio_messages(server) # Start the Robot. print("\nStarting robot...") start_robot(server) time.sleep(1) # Reading output. Extracting challenge key. msg_recv = receive_radio_messages(server) # Extracting challenge validation key. challenge = msg_recv[0]['challenge'] print("\nChallenge validation key sent from Robot:", challenge) print() # Validating robot authenticty. validate_msg = { "src": robot_addr, "dst": new_controller_addr, "msg_type": "validate", "challenge": challenge } # Sending message. inject = inject_radio_message(server, validate_msg) time.sleep(1) # Reading output. Extracting Robot's public key. msg_recv = receive_radio_messages(server) # Robot's public key. robot_dh_pub_key = msg_recv[2]['key'] print("\nRobot's public key:", robot_dh_pub_key) # The "ack_key_exchange" message for Robot. ack_key_exchange = { "msg_type": "ack_key_exchange", "src": new_controller_addr, "dst": robot_addr, "key": monocypher.compute_key_exchange_public_key(my_priv_key).hex() } my_dh_pub_key_robot = ack_key_exchange['key'] print("\nMy public key sent to Robot:", my_dh_pub_key_robot) # Construct my shared key with Robot. shared_key_robot = monocypher.key_exchange(my_priv_key, bytes.fromhex(robot_dh_pub_key)) print("\nMy shared key with Robot:", shared_key_robot.hex()) # Sending message. inject = inject_radio_message(server, ack_key_exchange) time.sleep(1) # Reading output. msg_recv = receive_radio_messages(server) # Decrypting the secure_data message from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec, shared_key_robot def get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key): # The "key_exchange" message for Controller. key_exchange = { "msg_type": "key_exchange", "src": dummy_robot_addr, "dst": new_controller_addr, "key": monocypher.compute_key_exchange_public_key(my_priv_key).hex() } my_dh_pub_key_controller = key_exchange['key'] print("\nMy public key sent to Controller:", my_dh_pub_key_controller) # Sending message. inject = inject_radio_message(server, key_exchange) time.sleep(1) # Reading output. msg_recv = receive_radio_messages(server) # Controller's public key. controller_dh_pub_key = msg_recv[0]['key'] print("\nController's public key:", controller_dh_pub_key) # Construct my shared key with Controller. shared_key_controller = monocypher.key_exchange(my_priv_key, bytes.fromhex(controller_dh_pub_key)) print("\nMy shared key with Controller:", shared_key_controller.hex()) return shared_key_controller def invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr): # This function sends an invalid secure_data_ack message to Robot to tamper nonce count. invalid_msg = encrypt(json.dumps(add_hmac("", 99, shared_key_robot)), shared_key_robot) secure_data_ack = { "msg_type": "secure_data_ack", "src": new_controller_addr, "dst": robot_addr, "encrypted": invalid_msg } # Sending message. print("\nSending invalid message to Robot...") inject = inject_radio_message(server, secure_data_ack) time.sleep(1) # Reading output. Running a while loop until msg is available as, sometimes, there is latency. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Robot. msg_robot_enc = msg_recv[1]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec def gather_blank_secure_data_response(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr, commands): # Re-encrypt the decrypted message from Robot to send it to Controller. msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller) secure_data_request = { "msg_type": "secure_data_request", "src": dummy_robot_addr, "dst": new_controller_addr, "encrypted": msg_robot_reenc } # Sending message. print("\nSending re-encrypted message from Robot to Controller...") inject = inject_radio_message(server, secure_data_request) time.sleep(1) # Reading output. Running a while loop until msg is available as, sometimes, there is latency. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Controller. msg_controller_enc = msg_recv[0]['encrypted'] msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode()) print("\nDecrypted message from Controller:", msg_controller_dec) # Collect secure_data_response message. commands.append(msg_controller_dec) # Re-encrypt the decrypted message from Controller to send it to Robot. msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot) secure_data_response = { "msg_type": "secure_data_response", "src": new_controller_addr, "dst": robot_addr, "encrypted": msg_controller_reenc } # Sending message. print("\nSending re-encrypted message from Controller to Robot...") inject = inject_radio_message(server, secure_data_response) time.sleep(1) # Reading output. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec, commands def main(): # Initialization of SERVER_URL and PORT. PORT = int(input("Enter PORT: ")) server = "http://activist-birds.picoctf.net:" + str(PORT) + "/" # Initialization of variables. robot_addr = 0x20 controller_addr = 0x10 new_controller_addr = 0x30 dummy_robot_addr = 0x40 commands = [] my_priv_key = os.urandom(32) # Initilization process for validating robot and exchanging keys. msg_robot_dec, shared_key_robot = initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key) # Establishing a shared key with controller. shared_key_controller = get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key) # Sending an invalid secure_data_ack message to Robot to begin tampering nonce count. msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr) # Collecting data. for i in range(30): msg_robot_dec, commands = gather_blank_secure_data_response(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr, commands) print("\nCollected messages (blank response):", commands) if __name__ == "__main__": main() ``` ::: :::spoiler <b style="color:#e1829c">solution.py</b> ```python= # The Python program for carrying out the exploit. import requests import time import hashlib import monocypher import os import json # Returns a list of messages seen on the air since the last time this function # was called def receive_radio_messages(SERVER_URL): msg_group = [] messages = requests.get(SERVER_URL+"/radio_rx").json() for msg in messages: print("\nDEBUG: Received message", msg) msg_group.append(msg) return msg_group # [message] argument should be a Python dict object def inject_radio_message(SERVER_URL, message): requests.post(SERVER_URL+"/radio_tx", json=message) def start_robot(SERVER_URL): requests.get(SERVER_URL+"/start") def stop_robot(SERVER_URL): requests.get(SERVER_URL+"/stop") # Get the location of the robot and the obstacles on the grid def get_board_state(SERVER_URL): return requests.get(SERVER_URL+"/state").json() def compute_hmac(message, nonce, key): hmac = (message + str(nonce) + key.hex()).encode() for _ in range(32): hmac = monocypher.blake2b(hmac + key) return monocypher.blake2b(hmac) def add_hmac(message, nonce, key): hmac = compute_hmac(message, nonce, key) return {"message": message, "nonce": nonce, "hmac": hmac.hex()} def encrypt(message, key): key = monocypher.blake2b(key)[:32] nonce = os.urandom(24) tag, ciphertext = monocypher.lock(key, nonce, message.encode()) return ciphertext.hex() + ";" + tag.hex() + ";" + nonce.hex() def decrypt(message, key): key = monocypher.blake2b(key)[:32] ciphertext, tag, nonce = message.split(";") plaintext = monocypher.unlock(key, bytes.fromhex(nonce), bytes.fromhex(tag), bytes.fromhex(ciphertext)) return plaintext def initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key): # Checking whether Robot and Controller are online. ping_robot = { "msg_type": "ping", "src": controller_addr, "dst": robot_addr, } ping_controller = { "msg_type": "ping", "src": robot_addr, "dst": controller_addr, } # Sending messages. inject = inject_radio_message(server, ping_robot) time.sleep(1) inject = inject_radio_message(server, ping_controller) time.sleep(1) # Reading output. print("Confirming that both Robot and Controller are online.") msg_recv = receive_radio_messages(server) # Set new addr for controller to conduct MITM attack. set_new_addr = { "msg_type": "set_addr", "src": robot_addr, "dst": controller_addr, "new_addr": new_controller_addr, } # Sending message. inject = inject_radio_message(server, set_new_addr) time.sleep(1) # Reading output. print("\nChanging Controller's address to 0x30.") msg_recv = receive_radio_messages(server) # Checking the new address of Controller. ping_controller = { "msg_type": "ping", "src": robot_addr, "dst": new_controller_addr, } # Sending message. inject = inject_radio_message(server, ping_controller) time.sleep(1) # Reading output. print("\nConfirming that Controller's address is changed.") msg_recv = receive_radio_messages(server) # Start the Robot. print("\nStarting robot...") start_robot(server) time.sleep(1) # Reading output. Extracting challenge key. msg_recv = receive_radio_messages(server) # Extracting challenge validation key. challenge = msg_recv[0]['challenge'] print("\nChallenge validation key sent from Robot:", challenge) print() # Validating robot authenticty. validate_msg = { "src": robot_addr, "dst": new_controller_addr, "msg_type": "validate", "challenge": challenge } # Sending message. inject = inject_radio_message(server, validate_msg) time.sleep(1) # Reading output. Extracting Robot's public key. msg_recv = receive_radio_messages(server) # Robot's public key. robot_dh_pub_key = msg_recv[2]['key'] print("\nRobot's public key:", robot_dh_pub_key) # The "ack_key_exchange" message for Robot. ack_key_exchange = { "msg_type": "ack_key_exchange", "src": new_controller_addr, "dst": robot_addr, "key": monocypher.compute_key_exchange_public_key(my_priv_key).hex() } my_dh_pub_key_robot = ack_key_exchange['key'] print("\nMy public key sent to Robot:", my_dh_pub_key_robot) # Construct my shared key with Robot. shared_key_robot = monocypher.key_exchange(my_priv_key, bytes.fromhex(robot_dh_pub_key)) print("\nMy shared key with Robot:", shared_key_robot.hex()) # Sending message. inject = inject_radio_message(server, ack_key_exchange) time.sleep(1) # Reading output. msg_recv = receive_radio_messages(server) # Decrypting the secure_data message from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec, shared_key_robot def get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key): # The "key_exchange" message for Controller. key_exchange = { "msg_type": "key_exchange", "src": dummy_robot_addr, "dst": new_controller_addr, "key": monocypher.compute_key_exchange_public_key(my_priv_key).hex() } my_dh_pub_key_controller = key_exchange['key'] print("\nMy public key sent to Controller:", my_dh_pub_key_controller) # Sending message. inject = inject_radio_message(server, key_exchange) time.sleep(1) # Reading output. msg_recv = receive_radio_messages(server) # Controller's public key. controller_dh_pub_key = msg_recv[0]['key'] print("\nController's public key:", controller_dh_pub_key) # Construct my shared key with Controller. shared_key_controller = monocypher.key_exchange(my_priv_key, bytes.fromhex(controller_dh_pub_key)) print("\nMy shared key with Controller:", shared_key_controller.hex()) return shared_key_controller def send_recv_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr): # Re-encrypt the decrypted message from Robot to send it to Controller. msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller) # Sending secure_data from Robot to Controller. secure_data = { "msg_type": "secure_data", "src": dummy_robot_addr, "dst": new_controller_addr, "encrypted": msg_robot_reenc } # Sending message. print("\nSending re-encrypted message from Robot to Controller...") inject = inject_radio_message(server, secure_data) time.sleep(1) # Reading output. Running a while loop until msg is available as, sometimes, there is latency. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Controller. msg_controller_enc = msg_recv[0]['encrypted'] msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode()) print("\nDecrypted message from Controller:", msg_controller_dec) # Re-encrypt the decrypted message from Controller to send it to Robot. msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot) secure_data_ack = { "msg_type": "secure_data_ack", "src": new_controller_addr, "dst": robot_addr, "encrypted": msg_controller_reenc } # Sending message. print("\nSending re-encrypted message from Controller to Robot...") inject = inject_radio_message(server, secure_data_ack) time.sleep(1) # Reading output. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) # Re-encrypt the decrypted message from Robot to send it to Controller. msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller) secure_data_request = { "msg_type": "secure_data_request", "src": dummy_robot_addr, "dst": new_controller_addr, "encrypted": msg_robot_reenc } # Sending message. print("\nSending re-encrypted message from Robot to Controller...") inject = inject_radio_message(server, secure_data_request) time.sleep(1) # Reading output. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Controller. msg_controller_enc = msg_recv[0]['encrypted'] msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode()) print("\nDecrypted message from Controller:", msg_controller_dec) # Re-encrypt the decrypted message from Controller to send it to Robot. msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot) secure_data_response = { "msg_type": "secure_data_response", "src": new_controller_addr, "dst": robot_addr, "encrypted": msg_controller_reenc } # Sending message. print("\nSending re-encrypted message from Controller to Robot...") inject = inject_radio_message(server, secure_data_response) # Reading output. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec def send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr): # This function sends only secure_data_response messages. # Encrypting decrypted secure_data_response message to send to Robot. msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot) secure_data_response = { "msg_type": "secure_data_response", "src": new_controller_addr, "dst": robot_addr, "encrypted": msg_controller_reenc } # Sending message. print("\nSending re-encrypted message from Controller to Robot...") inject = inject_radio_message(server, secure_data_response) # Reading output. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec def send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr): # Re-encrypt the decrypted message from Robot to send it to Controller. msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller) secure_data_request = { "msg_type": "secure_data_request", "src": dummy_robot_addr, "dst": new_controller_addr, "encrypted": msg_robot_reenc } # Sending message. print("\nSending re-encrypted message from Robot to Controller...") inject = inject_radio_message(server, secure_data_request) time.sleep(1) # Reading output. Running a while loop until msg is available as, sometimes, there is latency. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Controller. msg_controller_enc = msg_recv[0]['encrypted'] msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode()) print("\nDecrypted message from Controller:", msg_controller_dec) # Re-encrypt the decrypted message from Controller to send it to Robot. msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot) secure_data_response = { "msg_type": "secure_data_response", "src": new_controller_addr, "dst": robot_addr, "encrypted": msg_controller_reenc } # Sending message. print("\nSending re-encrypted message from Controller to Robot...") inject = inject_radio_message(server, secure_data_response) time.sleep(1) # Reading output. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Robot. msg_robot_enc = msg_recv[0]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec def invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr): # This function sends an invalid secure_data_ack message to Robot to tamper nonce count. invalid_msg = encrypt(json.dumps(add_hmac("", 99, shared_key_robot)), shared_key_robot) secure_data_ack = { "msg_type": "secure_data_ack", "src": new_controller_addr, "dst": robot_addr, "encrypted": invalid_msg } # Sending message. print("\nSending invalid message to Robot...") inject = inject_radio_message(server, secure_data_ack) time.sleep(1) # Reading output. Running a while loop until msg is available as, sometimes, there is latency. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Robot. msg_robot_enc = msg_recv[1]['encrypted'] msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode()) print("\nDecrypted message from Robot:", msg_robot_dec) return msg_robot_dec def send_secure_data_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr): # This function just sends secure_data message. # Re-encrypt the decrypted message from Robot to send it to Controller. msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller) # Sending secure_data from Robot to Controller. secure_data = { "msg_type": "secure_data", "src": dummy_robot_addr, "dst": new_controller_addr, "encrypted": msg_robot_reenc } # Sending message. print("\nSending re-encrypted message from Robot to Controller...") inject = inject_radio_message(server, secure_data) time.sleep(1) # Reading output. Running a while loop until msg is available as, sometimes, there is latency. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Controller. msg_controller_enc = msg_recv[0]['encrypted'] msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode()) print("\nDecrypted message from Controller:", msg_controller_dec) def send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr): # This function just sends secure_data_request message. # Re-encrypt the decrypted message from Robot to send it to Controller. msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller) secure_data_request = { "msg_type": "secure_data_request", "src": dummy_robot_addr, "dst": new_controller_addr, "encrypted": msg_robot_reenc } # Sending message. print("\nSending re-encrypted message from Robot to Controller...") inject = inject_radio_message(server, secure_data_request) time.sleep(1) # Reading output. Running a while loop until msg is available as, sometimes, there is latency. msg_recv = [] while len(msg_recv) < 1: msg_recv = receive_radio_messages(server) # Decrypt message back from Controller. msg_controller_enc = msg_recv[0]['encrypted'] msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode()) print("\nDecrypted message from Controller:", msg_controller_dec) def main(): # Initialization of SERVER_URL and PORT. PORT = int(input("Enter PORT: ")) server = "http://activist-birds.picoctf.net:" + str(PORT) + "/" # Initialization of variables. robot_addr = 0x20 controller_addr = 0x10 new_controller_addr = 0x30 dummy_robot_addr = 0x40 my_priv_key = os.urandom(32) secure_data_response_msg = [ {'message': 'east', 'nonce': 9, 'hmac': 'd28df9bdfb9eeb33ddec908bf1af7bb808f5fc169e07f6159c5293fed6e1f2181595a99cf8a14fc981e6dbb9a4d7dfcd6adda095fd6d56327c01afdfcb39144a'}, {'message': 'north', 'nonce': 15, 'hmac': '147cf90985b80904dda9b91318fb39017c80f67c6d04c43c3da33248c67a1942444ec3ea4ed90e58d41322918d45570cf3de06da547600be8537da8b6d51ea8c'}, {'message': 'west', 'nonce': 21, 'hmac': '55d18b806a961e747553c889ff0d209442df1c6bbd28c120f49c6d3ba27ab12234f5b928883b65b572e7b79c76c832e3cecb0919d4977a3e2270b1335e9eb7de'}, {'message': 'south', 'nonce': 27, 'hmac': '0e9ca7c11bf838a8d08c434c96333a20d05c4bbb40077eddf820c6236fc67f839e37bd0c0bed550ea344a4f87b64ef09f1c731538cb31bf1cd1d273b2777ca2b'}, {'message': 'east', 'nonce': 33, 'hmac': 'd0836278113fe603e43a169bb503ed34b7597955a70ac6e64e5bdc6359c4841d68cf3d8ac0a32c09d4f29a81ac68f258efed6ab10d90ce2b2e3c6e5183755438'} ] blank_msg = [ {'message': '', 'nonce': 10, 'hmac': '77127f22c314ecea77eef5f0a5bd9fd4c558a33aeb0cf057798ad4ebfdec457672ae5c5bf7afc78268a38c632e24d7fe953fc748cac6a614560d1854f2629c17'}, {'message': '', 'nonce': 11, 'hmac': '1f2231f4dbe97869ee5b1c95e032913aa2990080d9216279eacc6e9a3d55ecd59f6af4357ad682eb036d74c22c7f8249ebcb1d3e21ec75cd012034098dbb21dc'} ] # Initilization process for validating robot and exchanging keys. msg_robot_dec, shared_key_robot = initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key) # Establishing a shared key with controller. shared_key_controller = get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key) # First go east then south. for i in range(2): msg_robot_dec = send_recv_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending invalid message to Robot to tamper nonce values. msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr) # Sending and receiving blank messages to increase nonce. for i in range(5): msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending one more secure_data_request to Controller to match nonce with Robot. send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr) # Sending send_data_response to move robot 'east'. msg_controller_dec = secure_data_response_msg[0] msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending one secure_data_ack to Controller to match nonce with Robot and pass 'west'. send_secure_data_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr) # Sending invalid message to Robot to tamper nonce values. msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr) # Sending blank secure_data_response to increase nonce. msg_controller_dec = blank_msg[0] msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending one more secure_data_request to Controller to match nonce with Robot. send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr) # Sending blank secure_data_response to increase nonce. msg_controller_dec = blank_msg[1] msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending and receiving blank messages to increase nonce. for i in range(3): msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending one more secure_data_request to Controller to match nonce with Robot. send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr) # Sending send_data_response to move robot 'north'. msg_controller_dec = secure_data_response_msg[1] msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending invalid message to Robot to tamper nonce values. msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr) # Sending and receiving blank messages to increase nonce. for i in range(5): msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending one more secure_data_request to Controller to match nonce with Robot. send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr) # Sending send_data_response to move robot 'west'. msg_controller_dec = secure_data_response_msg[2] msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending invalid message to Robot to tamper nonce values. msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr) # Sending and receiving blank messages to increase nonce. for i in range(5): msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending one more secure_data_request to Controller to match nonce with Robot. send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr) # Sending send_data_response to move robot 'south'. msg_controller_dec = secure_data_response_msg[3] msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending invalid message to Robot to tamper nonce values. msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr) # Sending and receiving blank messages to increase nonce. for i in range(5): msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Sending one more secure_data_request to Controller to match nonce with Robot. send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr) # Sending send_data_response to move robot 'east'. msg_controller_dec = secure_data_response_msg[4] msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Finally, go 'north', 'east', 'south'. for i in range(6): msg_robot_dec = send_recv_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr) # Print Robot state. result = get_board_state(server) print("\nResult:", result) if __name__ == "__main__": main() ``` ::: :::success **Flag: picoCTF{r1gh7_84ck_47_y4_2e0fd8d6}** ::: ## Compress and Attack (Under Maintenance) ## Summary Cuối cùng sau bao ngày nỗ lực thì mình cũng đã dứt điểm được trang web quái quỷ này rồi 🥳🥳🥳. Chúc các bạn một ngày vui vẻ và hạnh phúc cùng bạn bè và người thân 😊. Không còn phần sau trong năm nay để mà mong chờ nữa đâu 😆.