<h1 style="text-align:center; color:#8868fb">
PicoCTF Walkthrough: Part 5
</h1>
Dưới đây là các lời giải cho một số thử thách của **picoCTF** mà mình cảm thấy thú vị và hữu ích.
Đây **chắc chắn** là phần cuối cùng trong năm nay khi mà chỉ còn hai bài nữa là kết thúc tất cả các thử thách về mật mã học trên **picoCTF**.
## Ricochet

:::spoiler <b>crypto.py</b>
```python=
import hashlib
import monocypher
import os
def compute_hmac(message, nonce, key):
hmac = (message + str(nonce) + key.hex()).encode()
for _ in range(32):
hmac = monocypher.blake2b(hmac + key)
return monocypher.blake2b(hmac)
def add_hmac(message, nonce, key):
hmac = compute_hmac(message, nonce, key)
return {"message": message, "nonce": nonce, "hmac": hmac.hex()}
def validate_hmac(message, nonce, key):
hmac = compute_hmac(message["message"], nonce, key)
if message["hmac"] == hmac.hex() and message["nonce"] == nonce:
return message["message"]
return None
def encrypt(message, key):
key = monocypher.blake2b(key)[:32]
nonce = os.urandom(24)
tag, ciphertext = monocypher.lock(key, nonce, message.encode())
return ciphertext.hex() + ";" + tag.hex() + ";" + nonce.hex()
def decrypt(message, key):
key = monocypher.blake2b(key)[:32]
ciphertext, tag, nonce = message.split(";")
plaintext = monocypher.unlock(key, bytes.fromhex(nonce), bytes.fromhex(tag), bytes.fromhex(ciphertext))
return plaintext
```
:::
:::spoiler <b>radio_interface.py</b>
```python=
import requests
# TODO: you will need to fill this in with the URL of the challenge
SERVER_URL = "http://127.0.0.1:5000"
# Returns a list of messages seen on the air since the last time this function
# was called
def receive_radio_messages():
messages = requests.get(SERVER_URL+"/radio_rx").json()
for msg in messages:
print("DEBUG: Received message", msg)
return messages
# [message] argument should be a Python dict object
def inject_radio_message(message):
requests.post(SERVER_URL+"/radio_tx", json=message)
def start_robot():
requests.get(SERVER_URL+"/start")
def stop_robot():
requests.get(SERVER_URL+"/stop")
# Get the location of the robot and the obstacles on the grid
def get_board_state():
return requests.get(SERVER_URL+"/state").json()
```
:::
:::spoiler <b>robot.py</b>
```python=
from radio_base import RadioDevice
import asyncio
from keys import keys
import traceback
import os
import crypto
import json
import robot_low_level
import monocypher
ROBOT_ADDRESS = 0x10
class Robot(RadioDevice):
def __init__(self, network):
super().__init__("Robot", network)
self.start = False
self.stop = False
self.address = 0x20
self.receive_buffer = []
self.dh_key_priv = os.urandom(32)
# Initialize to a random key so we don't accidentally send unencrypted data
self.dh_key_shared = os.urandom(32)
self.nonce = 0
def debug(self, msg):
msg = str(msg)
self.send_message({
"msg_type": "debug",
"src": self.address,
"dst": 0xFF,
"message": msg
})
def message_callback(self, msg):
try:
msg_type = msg["msg_type"]
msg_dst = int(msg["dst"])
if msg_dst != self.address:
return
if msg_type == "ping":
self.send_message({
"msg_type": "pong",
"src": self.address,
"dst": msg["src"]
})
else:
self.receive_buffer.append(msg)
except:
traceback.print_exc()
async def wait_message(self, msg_type):
for _ in range(100):
try:
if len(self.receive_buffer):
msg = self.receive_buffer.pop(0)
if msg["msg_type"] == msg_type:
return msg
except:
traceback.print_exc()
await asyncio.sleep(0.1)
return {}
def hmac_and_encrypt(self, msg, nonce):
msg_with_hmac = json.dumps(crypto.add_hmac(msg, nonce, keys["shared_hmac_key"]))
msg_encrypted = crypto.encrypt(msg_with_hmac, self.dh_key_shared)
return msg_encrypted
def decrypt_and_check_hmac(self, msg, nonce):
msg = crypto.decrypt(msg, self.dh_key_shared)
if msg is None: return None
msg_with_hmac = json.loads(msg)
return crypto.validate_hmac(msg_with_hmac, nonce, keys["shared_hmac_key"])
async def send_secure_data(self, msg):
msg_encrypted = self.hmac_and_encrypt(msg, self.nonce)
self.send_message({
"msg_type": "secure_data",
"src": self.address,
"dst": ROBOT_ADDRESS,
"encrypted": msg_encrypted
})
response = await self.wait_message("secure_data_ack")
ack_decrypted = self.decrypt_and_check_hmac(response["encrypted"], self.nonce)
if ack_decrypted is None:
self.debug("acknowledgement failed to validate")
return
# Ack packets are zero length
if len(ack_decrypted) != 0:
self.debug("acknowledgement mismatch")
return
self.nonce += 1
async def recv_secure_data(self):
# Zero length packet as a secure data request
msg_encrypted = self.hmac_and_encrypt("", self.nonce)
self.send_message({
"msg_type": "secure_data_request",
"src": self.address,
"dst": ROBOT_ADDRESS,
"encrypted": msg_encrypted
})
response = await self.wait_message("secure_data_response")
response_decrypted = self.decrypt_and_check_hmac(response["encrypted"], self.nonce)
if response_decrypted is None:
self.debug("response failed to validate")
return
self.nonce += 1
return response_decrypted
async def read_command(self):
await self.send_secure_data("get_movement")
await asyncio.sleep(0.2)
while True:
resp = await self.recv_secure_data()
if resp != "":
return resp
async def run(self):
while self.running:
while not self.start:
await asyncio.sleep(0.1)
self.nonce = 0
# move the robot back to its origin/home location
robot_low_level.reset()
# Validate the robot's authenticity
challenge = os.urandom(16).hex()
self.send_message({
"src": self.address,
"dst": ROBOT_ADDRESS,
"msg_type": "validate",
"challenge": challenge
})
response = await self.wait_message("ack_validate")
expected_response = crypto.compute_hmac(challenge, 0, keys["authenticity_key"])
if response.get("response") != expected_response.hex():
self.debug("Robot validation failed")
self.start = False
continue
self.debug("Robot successfully validated")
# Diffie-Hellman key exchange to secure future communications
self.send_message({
"src": self.address,
"dst": ROBOT_ADDRESS,
"msg_type": "key_exchange",
"key": monocypher.compute_key_exchange_public_key(self.dh_key_priv).hex()
})
response = await self.wait_message("ack_key_exchange")
self.dh_key_shared = monocypher.key_exchange(self.dh_key_priv, bytes.fromhex(response["key"]))
# Once validated, read some commands from the controller and execute them
for _ in range(20):
cmd = (await self.read_command())
if cmd is None:
self.debug("timed out trying to read a robot command")
break
if len(cmd) != 0:
await robot_low_level.move(cmd)
if self.stop:
break
if self.stop:
await robot_low_level.move("stop")
else:
await robot_low_level.move("done")
self.stop = False
self.start = False
```
:::
:::spoiler <b>robotcontroller.py</b>
```python=
from radio_base import RadioDevice
import asyncio
import traceback
from keys import keys
import os
import crypto
import json
import monocypher
class RobotController(RadioDevice):
def __init__(self, network):
super().__init__("RobotController", network)
self.address = 0x10
self.dh_key_priv = os.urandom(32)
# Initialize to a random key so we don't accidentally send unencrypted data
self.dh_key_shared = os.urandom(32)
self.send_buffer_secure = ""
self.recv_buffer_secure = ""
self.nonce = 0
self.movement_counter = 0
def reset(self):
self.nonce = 0
self.movement_counter = 0
def debug(self, msg):
msg = str(msg)
self.send_message({
"msg_type": "debug",
"src": self.address,
"dst": 0xFF,
"message": msg
})
def message_callback(self, msg):
try:
msg_type = msg["msg_type"]
msg_dst = int(msg["dst"])
if msg_dst != self.address:
return
if msg_type == "ping":
self.send_message({
"msg_type": "pong",
"src": self.address,
"dst": msg["src"]
})
# To avoid the bootstrapping problem, changing the address needs to
# be an unauthenticated operation
if msg_type == "set_addr":
self.address = int(msg["new_addr"]) & 0xFF
self.send_message({
"msg_type": "ack_set_addr",
"src": self.address,
"dst": msg["src"]
})
if msg_type == "validate":
response = crypto.compute_hmac(msg["challenge"], 0, keys["authenticity_key"])
self.send_message({
"msg_type": "ack_validate",
"src": self.address,
"dst": msg["src"],
"response": response.hex()
})
if msg_type == "key_exchange":
self.send_message({
"msg_type": "ack_key_exchange",
"src": self.address,
"dst": msg["src"],
"key": monocypher.compute_key_exchange_public_key(self.dh_key_priv).hex()
})
self.dh_key_shared = monocypher.key_exchange(self.dh_key_priv, bytes.fromhex(msg["key"]))
if msg_type == "secure_data_request":
self.process_secure_data_request(msg)
if msg_type == "secure_data":
self.process_secure_data(msg)
except:
traceback.print_exc()
def hmac_and_encrypt(self, msg, nonce):
msg_with_hmac = json.dumps(crypto.add_hmac(msg, nonce, keys["shared_hmac_key"]))
msg_encrypted = crypto.encrypt(msg_with_hmac, self.dh_key_shared)
return msg_encrypted
def decrypt_and_check_hmac(self, msg, nonce):
msg = crypto.decrypt(msg, self.dh_key_shared)
if msg is None: return None
msg_with_hmac = json.loads(msg)
return crypto.validate_hmac(msg_with_hmac, nonce, keys["shared_hmac_key"])
def process_secure_data_request(self, request):
request_decrypted = self.decrypt_and_check_hmac(request["encrypted"], self.nonce)
if request_decrypted is None:
self.debug("message failed to validate")
return
# Avoid type confusion by ensuring a secure data request should always
# be of length zero
if len(request_decrypted) != 0:
self.debug("message mismatch")
return
# If the request is valid, send our current send buffer in response
msg = self.send_buffer_secure
self.send_buffer_secure = ""
msg_encrypted = self.hmac_and_encrypt(msg, self.nonce)
self.nonce += 1
self.send_message({
"msg_type": "secure_data_response",
"src": self.address,
"dst": request["src"],
"encrypted": msg_encrypted
})
def process_secure_data(self, msg):
msg_decrypted = self.decrypt_and_check_hmac(msg["encrypted"], self.nonce)
if msg_decrypted is None:
self.debug("message failed to validate")
return
# If the request is valid, send an empty message as an acknowledgement
msg_encrypted = self.hmac_and_encrypt("", self.nonce)
self.nonce += 1
self.send_message({
"msg_type": "secure_data_ack",
"src": self.address,
"dst": msg["src"],
"encrypted": msg_encrypted
})
self.recv_buffer_secure = msg_decrypted
async def run(self):
while self.running:
await asyncio.sleep(0.05)
# Wait for the secure messaging layer to provide a secure command
if self.recv_buffer_secure == "get_movement":
self.recv_buffer_secure = ""
# Demo version sends the same four commands repeatedly
# Full control is available in the licensed version of this system
self.send_buffer_secure = ["east", "south", "west", "north"][self.movement_counter % 4]
self.movement_counter += 1
```
:::
Bài này khó kinh khủng, đợi khi nào trình độ mình cao lên thì mình sẽ giải thích cho các bạn lời giải, còn tạm thời bây giờ các bạn chịu khó đọc write-up [**ở đây**](https://systemweakness.com/ricochet-picoctf-2025-mitm-replay-attack-bb47375fb7db).
Dưới đây là toàn bộ lời giải cho thử thách này:
:::spoiler <b style="color:#e1829c">collecting_HMAC_1.py</b>
```python=
# Python program to gather HMAC values of secure_data and secure_data_response message types with corresponding nonce values.
import requests
import time
import hashlib
import monocypher
import os
import json
import time
# Returns a list of messages seen on the air since the last time this function
# was called
def receive_radio_messages(SERVER_URL):
msg_group = []
messages = requests.get(SERVER_URL+"/radio_rx").json()
for msg in messages:
print("\nDEBUG: Received message", msg)
msg_group.append(msg)
return msg_group
# [message] argument should be a Python dict object
def inject_radio_message(SERVER_URL, message):
requests.post(SERVER_URL+"/radio_tx", json=message)
def start_robot(SERVER_URL):
requests.get(SERVER_URL+"/start")
def stop_robot(SERVER_URL):
requests.get(SERVER_URL+"/stop")
# Get the location of the robot and the obstacles on the grid
def get_board_state(SERVER_URL):
return requests.get(SERVER_URL+"/state").json()
def compute_hmac(message, nonce, key):
hmac = (message + str(nonce) + key.hex()).encode()
for _ in range(32):
hmac = monocypher.blake2b(hmac + key)
return monocypher.blake2b(hmac)
def add_hmac(message, nonce, key):
hmac = compute_hmac(message, nonce, key)
return {"message": message, "nonce": nonce, "hmac": hmac.hex()}
def encrypt(message, key):
key = monocypher.blake2b(key)[:32]
nonce = os.urandom(24)
tag, ciphertext = monocypher.lock(key, nonce, message.encode())
return ciphertext.hex() + ";" + tag.hex() + ";" + nonce.hex()
def decrypt(message, key):
key = monocypher.blake2b(key)[:32]
ciphertext, tag, nonce = message.split(";")
plaintext = monocypher.unlock(key, bytes.fromhex(nonce), bytes.fromhex(tag), bytes.fromhex(ciphertext))
return plaintext
def initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key):
# Checking whether Robot and Controller are online.
ping_robot = {
"msg_type": "ping",
"src": controller_addr,
"dst": robot_addr,
}
ping_controller = {
"msg_type": "ping",
"src": robot_addr,
"dst": controller_addr,
}
# Sending messages.
inject = inject_radio_message(server, ping_robot)
time.sleep(1)
inject = inject_radio_message(server, ping_controller)
time.sleep(1)
# Reading output.
print("Confirming that both Robot and Controller are online.")
msg_recv = receive_radio_messages(server)
# Set new addr for controller to conduct MITM attack.
set_new_addr = {
"msg_type": "set_addr",
"src": robot_addr,
"dst": controller_addr,
"new_addr": new_controller_addr,
}
# Sending message.
inject = inject_radio_message(server, set_new_addr)
time.sleep(1)
# Reading output.
print("\nChanging Controller's address to 0x30.")
msg_recv = receive_radio_messages(server)
# Checking the new address of Controller.
ping_controller = {
"msg_type": "ping",
"src": robot_addr,
"dst": new_controller_addr,
}
# Sending message.
inject = inject_radio_message(server, ping_controller)
time.sleep(1)
# Reading output.
print("\nConfirming that Controller's address is changed.")
msg_recv = receive_radio_messages(server)
# Start the Robot.
print("\nStarting robot...")
start_robot(server)
time.sleep(1)
# Reading output. Extracting challenge key.
msg_recv = receive_radio_messages(server)
# Extracting challenge validation key.
challenge = msg_recv[0]['challenge']
print("\nChallenge validation key sent from Robot:", challenge)
print()
# Validating robot authenticty.
validate_msg = {
"src": robot_addr,
"dst": new_controller_addr,
"msg_type": "validate",
"challenge": challenge
}
# Sending message.
inject = inject_radio_message(server, validate_msg)
time.sleep(1)
# Reading output. Extracting Robot's public key.
msg_recv = receive_radio_messages(server)
# Robot's public key.
robot_dh_pub_key = msg_recv[2]['key']
print("\nRobot's public key:", robot_dh_pub_key)
# The "ack_key_exchange" message for Robot.
ack_key_exchange = {
"msg_type": "ack_key_exchange",
"src": new_controller_addr,
"dst": robot_addr,
"key": monocypher.compute_key_exchange_public_key(my_priv_key).hex()
}
my_dh_pub_key_robot = ack_key_exchange['key']
print("\nMy public key sent to Robot:", my_dh_pub_key_robot)
# Construct my shared key with Robot.
shared_key_robot = monocypher.key_exchange(my_priv_key, bytes.fromhex(robot_dh_pub_key))
print("\nMy shared key with Robot:", shared_key_robot.hex())
# Sending message.
inject = inject_radio_message(server, ack_key_exchange)
time.sleep(1)
# Reading output.
msg_recv = receive_radio_messages(server)
# Decrypting the secure_data message from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec, shared_key_robot
def get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key):
# The "key_exchange" message for Controller.
key_exchange = {
"msg_type": "key_exchange",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"key": monocypher.compute_key_exchange_public_key(my_priv_key).hex()
}
my_dh_pub_key_controller = key_exchange['key']
print("\nMy public key sent to Controller:", my_dh_pub_key_controller)
# Sending message.
inject = inject_radio_message(server, key_exchange)
time.sleep(1)
# Reading output.
msg_recv = receive_radio_messages(server)
# Controller's public key.
controller_dh_pub_key = msg_recv[0]['key']
print("\nController's public key:", controller_dh_pub_key)
# Construct my shared key with Controller.
shared_key_controller = monocypher.key_exchange(my_priv_key, bytes.fromhex(controller_dh_pub_key))
print("\nMy shared key with Controller:", shared_key_controller.hex())
return shared_key_controller
def gather_secure_data(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr, commands):
# Collecting secure_data.
commands.append(msg_robot_dec)
# Re-encrypt the decrypted message from Robot to send it to Controller.
msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller)
# Crafting secure_data from Robot to Controller.
secure_data = {
"msg_type": "secure_data",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"encrypted": msg_robot_reenc
}
# Sending message.
print("\nSending re-encrypted message from Robot to Controller...")
inject = inject_radio_message(server, secure_data)
time.sleep(1)
# Reading output. Running a while loop until msg is available as, sometimes, there is latency.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Controller.
msg_controller_enc = msg_recv[0]['encrypted']
msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode())
print("\nDecrypted message from Controller:", msg_controller_dec)
# Re-encrypt the decrypted message from Controller to send it to Robot.
msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot)
secure_data_ack = {
"msg_type": "secure_data_ack",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": msg_controller_reenc
}
# Sending message.
print("\nSending re-encrypted message from Controller to Robot...")
inject = inject_radio_message(server, secure_data_ack)
time.sleep(1)
# Reading output.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
# Re-encrypt the decrypted message from Robot to send it to Controller.
msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller)
secure_data_request = {
"msg_type": "secure_data_request",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"encrypted": msg_robot_reenc
}
# Sending message.
print("\nSending re-encrypted message from Robot to Controller...")
inject = inject_radio_message(server, secure_data_request)
time.sleep(1)
# Reading output.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Controller.
msg_controller_enc = msg_recv[0]['encrypted']
msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode())
print("\nDecrypted message from Controller:", msg_controller_dec)
# Collecting secure_data_response.
commands.append(msg_controller_dec)
# Re-encrypt the decrypted message from Controller to send it to Robot.
msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot)
secure_data_response = {
"msg_type": "secure_data_response",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": msg_controller_reenc
}
# Sending message.
print("\nSending re-encrypted message from Controller to Robot...")
inject = inject_radio_message(server, secure_data_response)
# Reading output. Added a timeout to break loop if no more data is flowing in.
msg_recv = []
timeout = False
start_time = time.time()
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
if time.time() - start_time > 10:
timeout = True
break
if timeout == False:
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
else:
pass
return msg_robot_dec, commands
def main():
# Initialization of SERVER_URL and PORT.
PORT = int(input("Enter PORT: "))
server = "http://activist-birds.picoctf.net:" + str(PORT) + "/"
# Initialization of variables.
robot_addr = 0x20
controller_addr = 0x10
new_controller_addr = 0x30
dummy_robot_addr = 0x40
commands = []
my_priv_key = os.urandom(32)
# Initilization process for validating robot and exchanging keys.
msg_robot_dec, shared_key_robot = initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key)
# Establishing a shared key with controller.
shared_key_controller = get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key)
# Collecting data.
for i in range(20):
msg_robot_dec, commands = gather_secure_data(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr, commands)
print("\nCollected messages:", commands)
if __name__ == "__main__":
main()
```
:::
:::spoiler <b style="color:#e1829c">collecting_HMAC_2.py</b>
```python=
# Program to gather HMAC values of blank messages with corresponding nonce values.
import requests
import time
import hashlib
import monocypher
import os
import json
import time
# Returns a list of messages seen on the air since the last time this function
# was called
def receive_radio_messages(SERVER_URL):
msg_group = []
messages = requests.get(SERVER_URL+"/radio_rx").json()
for msg in messages:
print("\nDEBUG: Received message", msg)
msg_group.append(msg)
return msg_group
# [message] argument should be a Python dict object
def inject_radio_message(SERVER_URL, message):
requests.post(SERVER_URL+"/radio_tx", json=message)
def start_robot(SERVER_URL):
requests.get(SERVER_URL+"/start")
def stop_robot(SERVER_URL):
requests.get(SERVER_URL+"/stop")
# Get the location of the robot and the obstacles on the grid
def get_board_state(SERVER_URL):
return requests.get(SERVER_URL+"/state").json()
def compute_hmac(message, nonce, key):
hmac = (message + str(nonce) + key.hex()).encode()
for _ in range(32):
hmac = monocypher.blake2b(hmac + key)
return monocypher.blake2b(hmac)
def add_hmac(message, nonce, key):
hmac = compute_hmac(message, nonce, key)
return {"message": message, "nonce": nonce, "hmac": hmac.hex()}
def encrypt(message, key):
key = monocypher.blake2b(key)[:32]
nonce = os.urandom(24)
tag, ciphertext = monocypher.lock(key, nonce, message.encode())
return ciphertext.hex() + ";" + tag.hex() + ";" + nonce.hex()
def decrypt(message, key):
key = monocypher.blake2b(key)[:32]
ciphertext, tag, nonce = message.split(";")
plaintext = monocypher.unlock(key, bytes.fromhex(nonce), bytes.fromhex(tag), bytes.fromhex(ciphertext))
return plaintext
def initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key):
# Checking whether Robot and Controller are online.
ping_robot = {
"msg_type": "ping",
"src": controller_addr,
"dst": robot_addr,
}
ping_controller = {
"msg_type": "ping",
"src": robot_addr,
"dst": controller_addr,
}
# Sending messages.
inject = inject_radio_message(server, ping_robot)
time.sleep(1)
inject = inject_radio_message(server, ping_controller)
time.sleep(1)
# Reading output.
print("Confirming that both Robot and Controller are online.")
msg_recv = receive_radio_messages(server)
# Set new addr for controller to conduct MITM attack.
set_new_addr = {
"msg_type": "set_addr",
"src": robot_addr,
"dst": controller_addr,
"new_addr": new_controller_addr,
}
# Sending message.
inject = inject_radio_message(server, set_new_addr)
time.sleep(1)
# Reading output.
print("\nChanging Controller's address to 0x30.")
msg_recv = receive_radio_messages(server)
# Checking the new address of Controller.
ping_controller = {
"msg_type": "ping",
"src": robot_addr,
"dst": new_controller_addr,
}
# Sending message.
inject = inject_radio_message(server, ping_controller)
time.sleep(1)
# Reading output.
print("\nConfirming that Controller's address is changed.")
msg_recv = receive_radio_messages(server)
# Start the Robot.
print("\nStarting robot...")
start_robot(server)
time.sleep(1)
# Reading output. Extracting challenge key.
msg_recv = receive_radio_messages(server)
# Extracting challenge validation key.
challenge = msg_recv[0]['challenge']
print("\nChallenge validation key sent from Robot:", challenge)
print()
# Validating robot authenticty.
validate_msg = {
"src": robot_addr,
"dst": new_controller_addr,
"msg_type": "validate",
"challenge": challenge
}
# Sending message.
inject = inject_radio_message(server, validate_msg)
time.sleep(1)
# Reading output. Extracting Robot's public key.
msg_recv = receive_radio_messages(server)
# Robot's public key.
robot_dh_pub_key = msg_recv[2]['key']
print("\nRobot's public key:", robot_dh_pub_key)
# The "ack_key_exchange" message for Robot.
ack_key_exchange = {
"msg_type": "ack_key_exchange",
"src": new_controller_addr,
"dst": robot_addr,
"key": monocypher.compute_key_exchange_public_key(my_priv_key).hex()
}
my_dh_pub_key_robot = ack_key_exchange['key']
print("\nMy public key sent to Robot:", my_dh_pub_key_robot)
# Construct my shared key with Robot.
shared_key_robot = monocypher.key_exchange(my_priv_key, bytes.fromhex(robot_dh_pub_key))
print("\nMy shared key with Robot:", shared_key_robot.hex())
# Sending message.
inject = inject_radio_message(server, ack_key_exchange)
time.sleep(1)
# Reading output.
msg_recv = receive_radio_messages(server)
# Decrypting the secure_data message from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec, shared_key_robot
def get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key):
# The "key_exchange" message for Controller.
key_exchange = {
"msg_type": "key_exchange",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"key": monocypher.compute_key_exchange_public_key(my_priv_key).hex()
}
my_dh_pub_key_controller = key_exchange['key']
print("\nMy public key sent to Controller:", my_dh_pub_key_controller)
# Sending message.
inject = inject_radio_message(server, key_exchange)
time.sleep(1)
# Reading output.
msg_recv = receive_radio_messages(server)
# Controller's public key.
controller_dh_pub_key = msg_recv[0]['key']
print("\nController's public key:", controller_dh_pub_key)
# Construct my shared key with Controller.
shared_key_controller = monocypher.key_exchange(my_priv_key, bytes.fromhex(controller_dh_pub_key))
print("\nMy shared key with Controller:", shared_key_controller.hex())
return shared_key_controller
def invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr):
# This function sends an invalid secure_data_ack message to Robot to tamper nonce count.
invalid_msg = encrypt(json.dumps(add_hmac("", 99, shared_key_robot)), shared_key_robot)
secure_data_ack = {
"msg_type": "secure_data_ack",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": invalid_msg
}
# Sending message.
print("\nSending invalid message to Robot...")
inject = inject_radio_message(server, secure_data_ack)
time.sleep(1)
# Reading output. Running a while loop until msg is available as, sometimes, there is latency.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[1]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec
def gather_blank_secure_data_response(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr, commands):
# Re-encrypt the decrypted message from Robot to send it to Controller.
msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller)
secure_data_request = {
"msg_type": "secure_data_request",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"encrypted": msg_robot_reenc
}
# Sending message.
print("\nSending re-encrypted message from Robot to Controller...")
inject = inject_radio_message(server, secure_data_request)
time.sleep(1)
# Reading output. Running a while loop until msg is available as, sometimes, there is latency.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Controller.
msg_controller_enc = msg_recv[0]['encrypted']
msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode())
print("\nDecrypted message from Controller:", msg_controller_dec)
# Collect secure_data_response message.
commands.append(msg_controller_dec)
# Re-encrypt the decrypted message from Controller to send it to Robot.
msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot)
secure_data_response = {
"msg_type": "secure_data_response",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": msg_controller_reenc
}
# Sending message.
print("\nSending re-encrypted message from Controller to Robot...")
inject = inject_radio_message(server, secure_data_response)
time.sleep(1)
# Reading output.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec, commands
def main():
# Initialization of SERVER_URL and PORT.
PORT = int(input("Enter PORT: "))
server = "http://activist-birds.picoctf.net:" + str(PORT) + "/"
# Initialization of variables.
robot_addr = 0x20
controller_addr = 0x10
new_controller_addr = 0x30
dummy_robot_addr = 0x40
commands = []
my_priv_key = os.urandom(32)
# Initilization process for validating robot and exchanging keys.
msg_robot_dec, shared_key_robot = initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key)
# Establishing a shared key with controller.
shared_key_controller = get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key)
# Sending an invalid secure_data_ack message to Robot to begin tampering nonce count.
msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr)
# Collecting data.
for i in range(30):
msg_robot_dec, commands = gather_blank_secure_data_response(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr, commands)
print("\nCollected messages (blank response):", commands)
if __name__ == "__main__":
main()
```
:::
:::spoiler <b style="color:#e1829c">solution.py</b>
```python=
# The Python program for carrying out the exploit.
import requests
import time
import hashlib
import monocypher
import os
import json
# Returns a list of messages seen on the air since the last time this function
# was called
def receive_radio_messages(SERVER_URL):
msg_group = []
messages = requests.get(SERVER_URL+"/radio_rx").json()
for msg in messages:
print("\nDEBUG: Received message", msg)
msg_group.append(msg)
return msg_group
# [message] argument should be a Python dict object
def inject_radio_message(SERVER_URL, message):
requests.post(SERVER_URL+"/radio_tx", json=message)
def start_robot(SERVER_URL):
requests.get(SERVER_URL+"/start")
def stop_robot(SERVER_URL):
requests.get(SERVER_URL+"/stop")
# Get the location of the robot and the obstacles on the grid
def get_board_state(SERVER_URL):
return requests.get(SERVER_URL+"/state").json()
def compute_hmac(message, nonce, key):
hmac = (message + str(nonce) + key.hex()).encode()
for _ in range(32):
hmac = monocypher.blake2b(hmac + key)
return monocypher.blake2b(hmac)
def add_hmac(message, nonce, key):
hmac = compute_hmac(message, nonce, key)
return {"message": message, "nonce": nonce, "hmac": hmac.hex()}
def encrypt(message, key):
key = monocypher.blake2b(key)[:32]
nonce = os.urandom(24)
tag, ciphertext = monocypher.lock(key, nonce, message.encode())
return ciphertext.hex() + ";" + tag.hex() + ";" + nonce.hex()
def decrypt(message, key):
key = monocypher.blake2b(key)[:32]
ciphertext, tag, nonce = message.split(";")
plaintext = monocypher.unlock(key, bytes.fromhex(nonce), bytes.fromhex(tag), bytes.fromhex(ciphertext))
return plaintext
def initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key):
# Checking whether Robot and Controller are online.
ping_robot = {
"msg_type": "ping",
"src": controller_addr,
"dst": robot_addr,
}
ping_controller = {
"msg_type": "ping",
"src": robot_addr,
"dst": controller_addr,
}
# Sending messages.
inject = inject_radio_message(server, ping_robot)
time.sleep(1)
inject = inject_radio_message(server, ping_controller)
time.sleep(1)
# Reading output.
print("Confirming that both Robot and Controller are online.")
msg_recv = receive_radio_messages(server)
# Set new addr for controller to conduct MITM attack.
set_new_addr = {
"msg_type": "set_addr",
"src": robot_addr,
"dst": controller_addr,
"new_addr": new_controller_addr,
}
# Sending message.
inject = inject_radio_message(server, set_new_addr)
time.sleep(1)
# Reading output.
print("\nChanging Controller's address to 0x30.")
msg_recv = receive_radio_messages(server)
# Checking the new address of Controller.
ping_controller = {
"msg_type": "ping",
"src": robot_addr,
"dst": new_controller_addr,
}
# Sending message.
inject = inject_radio_message(server, ping_controller)
time.sleep(1)
# Reading output.
print("\nConfirming that Controller's address is changed.")
msg_recv = receive_radio_messages(server)
# Start the Robot.
print("\nStarting robot...")
start_robot(server)
time.sleep(1)
# Reading output. Extracting challenge key.
msg_recv = receive_radio_messages(server)
# Extracting challenge validation key.
challenge = msg_recv[0]['challenge']
print("\nChallenge validation key sent from Robot:", challenge)
print()
# Validating robot authenticty.
validate_msg = {
"src": robot_addr,
"dst": new_controller_addr,
"msg_type": "validate",
"challenge": challenge
}
# Sending message.
inject = inject_radio_message(server, validate_msg)
time.sleep(1)
# Reading output. Extracting Robot's public key.
msg_recv = receive_radio_messages(server)
# Robot's public key.
robot_dh_pub_key = msg_recv[2]['key']
print("\nRobot's public key:", robot_dh_pub_key)
# The "ack_key_exchange" message for Robot.
ack_key_exchange = {
"msg_type": "ack_key_exchange",
"src": new_controller_addr,
"dst": robot_addr,
"key": monocypher.compute_key_exchange_public_key(my_priv_key).hex()
}
my_dh_pub_key_robot = ack_key_exchange['key']
print("\nMy public key sent to Robot:", my_dh_pub_key_robot)
# Construct my shared key with Robot.
shared_key_robot = monocypher.key_exchange(my_priv_key, bytes.fromhex(robot_dh_pub_key))
print("\nMy shared key with Robot:", shared_key_robot.hex())
# Sending message.
inject = inject_radio_message(server, ack_key_exchange)
time.sleep(1)
# Reading output.
msg_recv = receive_radio_messages(server)
# Decrypting the secure_data message from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec, shared_key_robot
def get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key):
# The "key_exchange" message for Controller.
key_exchange = {
"msg_type": "key_exchange",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"key": monocypher.compute_key_exchange_public_key(my_priv_key).hex()
}
my_dh_pub_key_controller = key_exchange['key']
print("\nMy public key sent to Controller:", my_dh_pub_key_controller)
# Sending message.
inject = inject_radio_message(server, key_exchange)
time.sleep(1)
# Reading output.
msg_recv = receive_radio_messages(server)
# Controller's public key.
controller_dh_pub_key = msg_recv[0]['key']
print("\nController's public key:", controller_dh_pub_key)
# Construct my shared key with Controller.
shared_key_controller = monocypher.key_exchange(my_priv_key, bytes.fromhex(controller_dh_pub_key))
print("\nMy shared key with Controller:", shared_key_controller.hex())
return shared_key_controller
def send_recv_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr):
# Re-encrypt the decrypted message from Robot to send it to Controller.
msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller)
# Sending secure_data from Robot to Controller.
secure_data = {
"msg_type": "secure_data",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"encrypted": msg_robot_reenc
}
# Sending message.
print("\nSending re-encrypted message from Robot to Controller...")
inject = inject_radio_message(server, secure_data)
time.sleep(1)
# Reading output. Running a while loop until msg is available as, sometimes, there is latency.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Controller.
msg_controller_enc = msg_recv[0]['encrypted']
msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode())
print("\nDecrypted message from Controller:", msg_controller_dec)
# Re-encrypt the decrypted message from Controller to send it to Robot.
msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot)
secure_data_ack = {
"msg_type": "secure_data_ack",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": msg_controller_reenc
}
# Sending message.
print("\nSending re-encrypted message from Controller to Robot...")
inject = inject_radio_message(server, secure_data_ack)
time.sleep(1)
# Reading output.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
# Re-encrypt the decrypted message from Robot to send it to Controller.
msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller)
secure_data_request = {
"msg_type": "secure_data_request",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"encrypted": msg_robot_reenc
}
# Sending message.
print("\nSending re-encrypted message from Robot to Controller...")
inject = inject_radio_message(server, secure_data_request)
time.sleep(1)
# Reading output.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Controller.
msg_controller_enc = msg_recv[0]['encrypted']
msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode())
print("\nDecrypted message from Controller:", msg_controller_dec)
# Re-encrypt the decrypted message from Controller to send it to Robot.
msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot)
secure_data_response = {
"msg_type": "secure_data_response",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": msg_controller_reenc
}
# Sending message.
print("\nSending re-encrypted message from Controller to Robot...")
inject = inject_radio_message(server, secure_data_response)
# Reading output.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec
def send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr):
# This function sends only secure_data_response messages.
# Encrypting decrypted secure_data_response message to send to Robot.
msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot)
secure_data_response = {
"msg_type": "secure_data_response",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": msg_controller_reenc
}
# Sending message.
print("\nSending re-encrypted message from Controller to Robot...")
inject = inject_radio_message(server, secure_data_response)
# Reading output.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec
def send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr):
# Re-encrypt the decrypted message from Robot to send it to Controller.
msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller)
secure_data_request = {
"msg_type": "secure_data_request",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"encrypted": msg_robot_reenc
}
# Sending message.
print("\nSending re-encrypted message from Robot to Controller...")
inject = inject_radio_message(server, secure_data_request)
time.sleep(1)
# Reading output. Running a while loop until msg is available as, sometimes, there is latency.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Controller.
msg_controller_enc = msg_recv[0]['encrypted']
msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode())
print("\nDecrypted message from Controller:", msg_controller_dec)
# Re-encrypt the decrypted message from Controller to send it to Robot.
msg_controller_reenc = encrypt(json.dumps(msg_controller_dec), shared_key_robot)
secure_data_response = {
"msg_type": "secure_data_response",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": msg_controller_reenc
}
# Sending message.
print("\nSending re-encrypted message from Controller to Robot...")
inject = inject_radio_message(server, secure_data_response)
time.sleep(1)
# Reading output.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[0]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec
def invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr):
# This function sends an invalid secure_data_ack message to Robot to tamper nonce count.
invalid_msg = encrypt(json.dumps(add_hmac("", 99, shared_key_robot)), shared_key_robot)
secure_data_ack = {
"msg_type": "secure_data_ack",
"src": new_controller_addr,
"dst": robot_addr,
"encrypted": invalid_msg
}
# Sending message.
print("\nSending invalid message to Robot...")
inject = inject_radio_message(server, secure_data_ack)
time.sleep(1)
# Reading output. Running a while loop until msg is available as, sometimes, there is latency.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Robot.
msg_robot_enc = msg_recv[1]['encrypted']
msg_robot_dec = json.loads(decrypt(msg_robot_enc, shared_key_robot).decode())
print("\nDecrypted message from Robot:", msg_robot_dec)
return msg_robot_dec
def send_secure_data_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr):
# This function just sends secure_data message.
# Re-encrypt the decrypted message from Robot to send it to Controller.
msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller)
# Sending secure_data from Robot to Controller.
secure_data = {
"msg_type": "secure_data",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"encrypted": msg_robot_reenc
}
# Sending message.
print("\nSending re-encrypted message from Robot to Controller...")
inject = inject_radio_message(server, secure_data)
time.sleep(1)
# Reading output. Running a while loop until msg is available as, sometimes, there is latency.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Controller.
msg_controller_enc = msg_recv[0]['encrypted']
msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode())
print("\nDecrypted message from Controller:", msg_controller_dec)
def send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr):
# This function just sends secure_data_request message.
# Re-encrypt the decrypted message from Robot to send it to Controller.
msg_robot_reenc = encrypt(json.dumps(msg_robot_dec), shared_key_controller)
secure_data_request = {
"msg_type": "secure_data_request",
"src": dummy_robot_addr,
"dst": new_controller_addr,
"encrypted": msg_robot_reenc
}
# Sending message.
print("\nSending re-encrypted message from Robot to Controller...")
inject = inject_radio_message(server, secure_data_request)
time.sleep(1)
# Reading output. Running a while loop until msg is available as, sometimes, there is latency.
msg_recv = []
while len(msg_recv) < 1:
msg_recv = receive_radio_messages(server)
# Decrypt message back from Controller.
msg_controller_enc = msg_recv[0]['encrypted']
msg_controller_dec = json.loads(decrypt(msg_controller_enc, shared_key_controller).decode())
print("\nDecrypted message from Controller:", msg_controller_dec)
def main():
# Initialization of SERVER_URL and PORT.
PORT = int(input("Enter PORT: "))
server = "http://activist-birds.picoctf.net:" + str(PORT) + "/"
# Initialization of variables.
robot_addr = 0x20
controller_addr = 0x10
new_controller_addr = 0x30
dummy_robot_addr = 0x40
my_priv_key = os.urandom(32)
secure_data_response_msg = [
{'message': 'east', 'nonce': 9, 'hmac': 'd28df9bdfb9eeb33ddec908bf1af7bb808f5fc169e07f6159c5293fed6e1f2181595a99cf8a14fc981e6dbb9a4d7dfcd6adda095fd6d56327c01afdfcb39144a'},
{'message': 'north', 'nonce': 15, 'hmac': '147cf90985b80904dda9b91318fb39017c80f67c6d04c43c3da33248c67a1942444ec3ea4ed90e58d41322918d45570cf3de06da547600be8537da8b6d51ea8c'},
{'message': 'west', 'nonce': 21, 'hmac': '55d18b806a961e747553c889ff0d209442df1c6bbd28c120f49c6d3ba27ab12234f5b928883b65b572e7b79c76c832e3cecb0919d4977a3e2270b1335e9eb7de'},
{'message': 'south', 'nonce': 27, 'hmac': '0e9ca7c11bf838a8d08c434c96333a20d05c4bbb40077eddf820c6236fc67f839e37bd0c0bed550ea344a4f87b64ef09f1c731538cb31bf1cd1d273b2777ca2b'},
{'message': 'east', 'nonce': 33, 'hmac': 'd0836278113fe603e43a169bb503ed34b7597955a70ac6e64e5bdc6359c4841d68cf3d8ac0a32c09d4f29a81ac68f258efed6ab10d90ce2b2e3c6e5183755438'}
]
blank_msg = [
{'message': '', 'nonce': 10, 'hmac': '77127f22c314ecea77eef5f0a5bd9fd4c558a33aeb0cf057798ad4ebfdec457672ae5c5bf7afc78268a38c632e24d7fe953fc748cac6a614560d1854f2629c17'},
{'message': '', 'nonce': 11, 'hmac': '1f2231f4dbe97869ee5b1c95e032913aa2990080d9216279eacc6e9a3d55ecd59f6af4357ad682eb036d74c22c7f8249ebcb1d3e21ec75cd012034098dbb21dc'}
]
# Initilization process for validating robot and exchanging keys.
msg_robot_dec, shared_key_robot = initialization(server, robot_addr, controller_addr, new_controller_addr, my_priv_key)
# Establishing a shared key with controller.
shared_key_controller = get_shared_key_controller(server, dummy_robot_addr, new_controller_addr, my_priv_key)
# First go east then south.
for i in range(2):
msg_robot_dec = send_recv_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending invalid message to Robot to tamper nonce values.
msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr)
# Sending and receiving blank messages to increase nonce.
for i in range(5):
msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending one more secure_data_request to Controller to match nonce with Robot.
send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr)
# Sending send_data_response to move robot 'east'.
msg_controller_dec = secure_data_response_msg[0]
msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending one secure_data_ack to Controller to match nonce with Robot and pass 'west'.
send_secure_data_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr)
# Sending invalid message to Robot to tamper nonce values.
msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr)
# Sending blank secure_data_response to increase nonce.
msg_controller_dec = blank_msg[0]
msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending one more secure_data_request to Controller to match nonce with Robot.
send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr)
# Sending blank secure_data_response to increase nonce.
msg_controller_dec = blank_msg[1]
msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending and receiving blank messages to increase nonce.
for i in range(3):
msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending one more secure_data_request to Controller to match nonce with Robot.
send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr)
# Sending send_data_response to move robot 'north'.
msg_controller_dec = secure_data_response_msg[1]
msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending invalid message to Robot to tamper nonce values.
msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr)
# Sending and receiving blank messages to increase nonce.
for i in range(5):
msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending one more secure_data_request to Controller to match nonce with Robot.
send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr)
# Sending send_data_response to move robot 'west'.
msg_controller_dec = secure_data_response_msg[2]
msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending invalid message to Robot to tamper nonce values.
msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr)
# Sending and receiving blank messages to increase nonce.
for i in range(5):
msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending one more secure_data_request to Controller to match nonce with Robot.
send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr)
# Sending send_data_response to move robot 'south'.
msg_controller_dec = secure_data_response_msg[3]
msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending invalid message to Robot to tamper nonce values.
msg_robot_dec = invalid_msg(server, shared_key_robot, robot_addr, new_controller_addr)
# Sending and receiving blank messages to increase nonce.
for i in range(5):
msg_robot_dec = send_recv_blank_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Sending one more secure_data_request to Controller to match nonce with Robot.
send_secure_data_request_msg(msg_robot_dec, server, shared_key_controller, dummy_robot_addr, new_controller_addr)
# Sending send_data_response to move robot 'east'.
msg_controller_dec = secure_data_response_msg[4]
msg_robot_dec = send_recv_response_msg(msg_controller_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Finally, go 'north', 'east', 'south'.
for i in range(6):
msg_robot_dec = send_recv_msg(msg_robot_dec, server, shared_key_controller, shared_key_robot, robot_addr, dummy_robot_addr, new_controller_addr)
# Print Robot state.
result = get_board_state(server)
print("\nResult:", result)
if __name__ == "__main__":
main()
```
:::
:::success
**Flag: picoCTF{r1gh7_84ck_47_y4_2e0fd8d6}**
:::
## Compress and Attack (Under Maintenance)
## Summary
Cuối cùng sau bao ngày nỗ lực thì mình cũng đã dứt điểm được trang web quái quỷ này rồi 🥳🥳🥳.
Chúc các bạn một ngày vui vẻ và hạnh phúc cùng bạn bè và người thân 😊.
Không còn phần sau trong năm nay để mà mong chờ nữa đâu 😆.