# ============================================================
# SPARDA ROUTER — Auto-generated. DO NOT EDIT.
# Regenerate: npx sparda-mcp init   •   Remove: npx sparda-mcp remove
# ============================================================
from fastapi import APIRouter, Request, Response, HTTPException
from fastapi.responses import JSONResponse
import json
import urllib.request
import urllib.parse
import concurrent.futures
import re
import asyncio
import time
import datetime
import os
import zlib

# json.loads, not a Python literal: JSON true/false/null are not valid Python (E-009)
SPARDA_TOOLS = json.loads(__TOOLS_JSON__)
SPARDA_POLICIES = json.loads(__SPARDING_POLICIES__)
SPARDA_LOCAL_KEY = "__LOCAL_KEY__"
SPARDA_PORT = __PORT__

SPARDA_STATS = {}
SPARDA_EVENTS = []
SPARDA_SEQ = 0
SPARDA_START = time.time()
# immune system: tools that keep failing are quarantined so the AI can't hammer a broken route
SPARDA_QUARANTINE = {}
SPARDA_QUARANTINE_MS = int(os.environ.get("SPARDA_QUARANTINE_MS", "60000"))
# recycling gauge: calls answered from SPARDA's own knowledge vs calls that paid the host route.
# Day 1 it reads 0% — the circle fills with usage (a measure, never a promise).
SPARDA_RECYCLE = {"servedByCircle": 0, "paidFull": 0}

def sparda_recycle_rate():
    total = SPARDA_RECYCLE["servedByCircle"] + SPARDA_RECYCLE["paidFull"]
    return round(SPARDA_RECYCLE["servedByCircle"] * 100 / total) if total else 0

# thermodynamic route classification: a GET whose repeated identical args keep
# returning identical bodies is observed-pure (its result pre-exists — recyclable);
# writes erase by definition. Observation only, never a guess.
SPARDA_PURITY = {}

def sparda_observe_purity(tool, argsig, body_bytes):
    pu = SPARDA_PURITY.setdefault(tool, {"sigs": {}, "repeats": 0, "mismatches": 0})
    h = zlib.crc32(body_bytes[:65536])  # a fingerprint, not a checksum of megabytes
    known = pu["sigs"].get(argsig)
    if known is None:
        if len(pu["sigs"]) >= 20:  # bounded: enough sigs to judge
            return
        pu["sigs"][argsig] = h
    elif known == h:
        pu["repeats"] += 1
    else:
        pu["mismatches"] += 1
        pu["sigs"][argsig] = h  # the latest real answer is the truth

def sparda_purity_snapshot():
    out = {}
    for name, spec in SPARDA_TOOLS.items():
        if spec.get("method") != "GET":
            out[name] = {"class": "erasing", "repeats": 0, "mismatches": 0}
            continue
        pu = SPARDA_PURITY.get(name)
        if not pu:
            cls = "unknown"
        elif pu["mismatches"] > 0:
            cls = "volatile"
        elif pu["repeats"] >= 3:
            cls = "pure"
        else:
            cls = "unknown"
        out[name] = {"class": cls, "repeats": pu["repeats"] if pu else 0, "mismatches": pu["mismatches"] if pu else 0}
    return out

def sparda_event(source, tool, status, message):
    global SPARDA_SEQ
    SPARDA_SEQ += 1
    SPARDA_EVENTS.append({
        "seq": SPARDA_SEQ,
        "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(),
        "source": source, "tool": tool, "status": status,
        "message": str(message or "")[:500],
    })
    if len(SPARDA_EVENTS) > 100:
        SPARDA_EVENTS.pop(0)

def sparda_record(tool, status, ms):
    st = SPARDA_STATS.setdefault(tool, {"calls": 0, "errors": 0, "clientErrors": 0, "totalMs": 0, "lastStatus": None, "lastTs": None, "consecutive5xx": 0})
    # innate immunity: latency far beyond the learned baseline is an antigen
    if st["calls"] >= 5 and ms > max((st["totalMs"] / st["calls"]) * 10, 200):
        sparda_event("immune", tool, status, f"latency anomaly: {ms}ms vs ~{round(st['totalMs'] / st['calls'])}ms baseline")
    st["calls"] += 1
    st["totalMs"] += ms
    # 5xx = server failure (feeds the immune system); 4xx = a valid client answer (404 etc), tracked apart so the count doesn't lie
    if status >= 500:
        st["errors"] += 1
    elif status >= 400:
        st["clientErrors"] += 1
    st["lastStatus"] = status
    st["lastTs"] = datetime.datetime.now(datetime.timezone.utc).isoformat()
    if status >= 500:
        st["consecutive5xx"] = st.get("consecutive5xx", 0) + 1
    elif status < 400:
        st["consecutive5xx"] = 0
    if st.get("consecutive5xx", 0) >= 3 and tool not in SPARDA_QUARANTINE:
        SPARDA_QUARANTINE[tool] = {
            "since": datetime.datetime.now(datetime.timezone.utc).isoformat(),
            "until": time.time() * 1000 + SPARDA_QUARANTINE_MS,
            "reason": f"{st['consecutive5xx']} consecutive 5xx",
        }
        sparda_event("immune", tool, 503, f"quarantined after {st['consecutive5xx']} consecutive 5xx (cooldown {SPARDA_QUARANTINE_MS}ms)")

def sparda_proof(tool, spec, args):
    checks = {
        "knownTool": spec is not None,
        "enabled": spec.get("enabled", False) if spec else False,
        "loopSafe": not spec.get("path", "").startswith("/mcp") if spec else False,
        "methodClass": "read" if spec and spec.get("method") == "GET" else "write",
        "pathParamsPresent": True,
        "hasBodyForWrite": True,
        "reversibleHint": False,
        "quarantineSafe": True,
    }

    reasons = []

    if not checks["knownTool"]:
        reasons.append("unknown tool")
        return {
            "version": "sparding-proof/v0.1",
            "risk": "blocked",
            "decision": "block",
            "reasons": reasons,
            "checks": checks,
        }

    if not checks["enabled"]:
        reasons.append("tool disabled (write-safety)")
    if not checks["loopSafe"]:
        reasons.append("self-referential tool blocked (loop protection)")

    quarantined = SPARDA_QUARANTINE.get(tool)
    if quarantined:
        if (time.time() * 1000) < quarantined["until"]:
            checks["quarantineSafe"] = False
            reasons.append(f"tool quarantined (immune system): {quarantined['reason']}")
        else:
            del SPARDA_QUARANTINE[tool]
            if tool in SPARDA_STATS:
                SPARDA_STATS[tool]["consecutive5xx"] = 2

    for name in spec.get("pathParams", []):
        if args.get(name) is None:
            checks["pathParamsPresent"] = False
            reasons.append(f"missing path param: {name}")

    if checks["methodClass"] == "write":
        if args.get("body") is None:
            checks["hasBodyForWrite"] = False

    if spec.get("method") == "GET":
        checks["reversibleHint"] = True
    else:
        checks["reversibleHint"] = any(t.get("method") == "GET" and t.get("path") == spec.get("path") for t in SPARDA_TOOLS.values())

    risk = "low"
    decision = "allow"

    if not checks["enabled"] or not checks["loopSafe"] or not checks["quarantineSafe"] or not checks["pathParamsPresent"]:
        decision = "block"
        risk = "blocked"
        return {
            "version": "sparding-proof/v0.1",
            "risk": risk,
            "decision": decision,
            "reasons": reasons,
            "checks": checks,
        }

    policies = SPARDA_POLICIES or {}
    if checks["methodClass"] == "read":
        read_policy = policies.get("reads", "allow")
        if read_policy == "block":
            decision = "block"
            risk = "blocked"
            reasons.append("read policy blocks execution")
        elif read_policy == "require_human":
            decision = "require_human"
            risk = "medium"
            reasons.append("read policy requires human confirmation")
    else:
        is_delete = spec.get("method") == "DELETE"
        delete_policy = policies.get("deletes", "block")
        write_policy = policies.get("writes", "require_human")

        if is_delete and delete_policy == "block":
            decision = "block"
            risk = "blocked"
            reasons.append("delete policy blocks execution")
        elif is_delete and delete_policy == "require_human":
            decision = "require_human"
            risk = "high"
            reasons.append("delete operation requires human confirmation")
        elif is_delete and delete_policy == "allow":
            decision = "allow"
            risk = "low"
        elif write_policy == "block":
            decision = "block"
            risk = "blocked"
            reasons.append("write policy blocks execution")
        elif write_policy == "require_human":
            decision = "require_human"
            risk = "medium"
            reasons.append("write operation requires human confirmation")
        else:
            decision = "allow"
            risk = "low"

    return {
        "version": "sparding-proof/v0.1",
        "risk": risk,
        "decision": decision,
        "reasons": reasons,
        "checks": checks,
    }

sparda_router = APIRouter()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def sync_fetch(url, method, headers, body_bytes):
    req = urllib.request.Request(url, data=body_bytes, headers=headers, method=method)
    try:
        with urllib.request.urlopen(req, timeout=30) as response:
            status = response.status
            content_type = response.headers.get("content-type", "")
            data = response.read()
            return status, content_type, data
    except urllib.error.HTTPError as e:
        return e.code, e.headers.get("content-type", ""), e.read()
    except Exception as e:
        raise e

@sparda_router.get("/tools")
async def get_tools(request: Request):
    if request.headers.get("x-sparda-key") != SPARDA_LOCAL_KEY:
        return JSONResponse(status_code=401, content={"error": "unauthorized"})
    return SPARDA_TOOLS

@sparda_router.get("/stats")
async def get_stats(request: Request):
    if request.headers.get("x-sparda-key") != SPARDA_LOCAL_KEY:
        return JSONResponse(status_code=401, content={"error": "unauthorized"})
    return {"uptimeSec": round(time.time() - SPARDA_START), "stats": SPARDA_STATS, "quarantine": SPARDA_QUARANTINE, "recycle": {**SPARDA_RECYCLE, "ratePct": sparda_recycle_rate()}, "purity": sparda_purity_snapshot()}

@sparda_router.get("/events")
async def get_events(request: Request):
    if request.headers.get("x-sparda-key") != SPARDA_LOCAL_KEY:
        return JSONResponse(status_code=401, content={"error": "unauthorized"})
    try:
        since = int(request.query_params.get("since", "0"))
    except ValueError:
        since = 0
    return {"seq": SPARDA_SEQ, "events": [e for e in SPARDA_EVENTS if e["seq"] > since]}

@sparda_router.post("/invoke")
async def invoke_tool(request: Request):
    if request.headers.get("x-sparda-key") != SPARDA_LOCAL_KEY:
        return JSONResponse(status_code=401, content={"error": "unauthorized"})

    try:
        body = await request.json()
    except Exception:
        body = {}

    tool = body.get("tool")
    args = body.get("args", {})
    
    spec = SPARDA_TOOLS.get(tool)
    proof = sparda_proof(tool, spec, args)

    if proof["decision"] == "block":
        status = 400
        error = "bad request"
        if not proof["checks"]["knownTool"]:
            status = 404
            error = f"unknown tool: {tool}"
        elif not proof["checks"]["enabled"]:
            status = 403
            error = f"tool disabled (write-safety): {tool}"
            return JSONResponse(status_code=status, content={
                "error": error,
                "hint": "Enable it in sparda.json, then re-run: npx sparda-mcp init",
                "spardingProof": proof
            })
        elif not proof["checks"]["loopSafe"]:
            status = 400
            error = "self-referential tool blocked (loop protection)"
        elif not proof["checks"]["quarantineSafe"]:
            status = 503
            quarantined = SPARDA_QUARANTINE.get(tool)
            error = f"tool quarantined (immune system): {tool}"
            SPARDA_RECYCLE["servedByCircle"] += 1
            return JSONResponse(status_code=status, content={
                "error": error,
                "reason": quarantined["reason"] if quarantined else "",
                "retryInMs": round(quarantined["until"] - time.time() * 1000) if quarantined else 0,
                "spardingProof": proof
            })
        else:
            status = 400
            error = proof["reasons"][0] if proof["reasons"] else "blocked"
            if any("policy blocks execution" in r for r in proof["reasons"]):
                status = 403
        return JSONResponse(status_code=status, content={"error": error, "spardingProof": proof})

    t0 = time.time()
    try:
        path_params = spec.get("pathParams", [])
        path = spec.get("path")

        def repl(match):
            name = match.group(1)
            val = args.get(name)
            return urllib.parse.quote(str(val))

        resolved_path = re.sub(r'\{(\w+)\}', repl, path)

        query_params = []
        for k, v in args.items():
            if k == 'body' or k in path_params or v is None:
                continue
            query_params.append(f"{urllib.parse.quote(k)}={urllib.parse.quote(str(v))}")
        
        if query_params:
            resolved_path += "?" + "&".join(query_params)

        url = f"http://127.0.0.1:{SPARDA_PORT}{resolved_path}"
        method = spec.get("method", "GET")
        
        headers = {}
        body_bytes = None
        
        if method != "GET" and args.get("body") is not None:
            headers["content-type"] = "application/json"
            body_bytes = json.dumps(args["body"]).encode("utf-8")

        SPARDA_RECYCLE["paidFull"] += 1  # the host route is about to be exercised — full price
        loop = asyncio.get_running_loop()
        status, content_type, data_bytes = await loop.run_in_executor(
            executor, sync_fetch, url, method, headers, body_bytes
        )
        
        text_data = data_bytes.decode("utf-8", errors="ignore")
        try:
            data = json.loads(text_data)
        except Exception:
            data = text_data

        sparda_record(tool, status, round((time.time() - t0) * 1000))
        if method == "GET" and status == 200:
            # canonical argsig: sorted items, so the AI's argument order never splits a signature
            argsig = json.dumps(sorted((k, str(v)) for k, v in args.items() if v is not None))
            sparda_observe_purity(tool, argsig, data_bytes)
        if status >= 500:
            sparda_event("invoke", tool, status, text_data[:200])
        return {"upstreamStatus": status, "data": data, "spardingProof": proof}
    except Exception as e:
        sparda_record(tool, 502, round((time.time() - t0) * 1000))
        sparda_event("invoke", tool, 502, str(e))
        return JSONResponse(status_code=502, content={"error": str(e), "spardingProof": proof})
