# ============================================================
# SPARDA ROUTER — Auto-generated. DO NOT EDIT.
# Regenerate: npx sparda-mcp init   •   Remove: npx sparda-mcp remove
# ============================================================
from fastapi import APIRouter, Request, Response, HTTPException
from fastapi.responses import JSONResponse
import json
import urllib.request
import urllib.parse
import concurrent.futures
import re
import asyncio
import time
import datetime
import os

# json.loads, not a Python literal: JSON true/false/null are not valid Python (E-009)
SPARDA_TOOLS = json.loads(__TOOLS_JSON__)
SPARDA_LOCAL_KEY = "__LOCAL_KEY__"
SPARDA_PORT = __PORT__

SPARDA_STATS = {}
SPARDA_EVENTS = []
SPARDA_SEQ = 0
SPARDA_START = time.time()
# immune system: tools that keep failing are quarantined so the AI can't hammer a broken route
SPARDA_QUARANTINE = {}
SPARDA_QUARANTINE_MS = int(os.environ.get("SPARDA_QUARANTINE_MS", "60000"))

def sparda_event(source, tool, status, message):
    global SPARDA_SEQ
    SPARDA_SEQ += 1
    SPARDA_EVENTS.append({
        "seq": SPARDA_SEQ,
        "ts": datetime.datetime.now(datetime.timezone.utc).isoformat(),
        "source": source, "tool": tool, "status": status,
        "message": str(message or "")[:500],
    })
    if len(SPARDA_EVENTS) > 100:
        SPARDA_EVENTS.pop(0)

def sparda_record(tool, status, ms):
    st = SPARDA_STATS.setdefault(tool, {"calls": 0, "errors": 0, "totalMs": 0, "lastStatus": None, "lastTs": None, "consecutive5xx": 0})
    # innate immunity: latency far beyond the learned baseline is an antigen
    if st["calls"] >= 5 and ms > max((st["totalMs"] / st["calls"]) * 10, 200):
        sparda_event("immune", tool, status, f"latency anomaly: {ms}ms vs ~{round(st['totalMs'] / st['calls'])}ms baseline")
    st["calls"] += 1
    st["totalMs"] += ms
    if status >= 400:
        st["errors"] += 1
    st["lastStatus"] = status
    st["lastTs"] = datetime.datetime.now(datetime.timezone.utc).isoformat()
    if status >= 500:
        st["consecutive5xx"] = st.get("consecutive5xx", 0) + 1
    elif status < 400:
        st["consecutive5xx"] = 0
    if st.get("consecutive5xx", 0) >= 3 and tool not in SPARDA_QUARANTINE:
        SPARDA_QUARANTINE[tool] = {
            "since": datetime.datetime.now(datetime.timezone.utc).isoformat(),
            "until": time.time() * 1000 + SPARDA_QUARANTINE_MS,
            "reason": f"{st['consecutive5xx']} consecutive 5xx",
        }
        sparda_event("immune", tool, 503, f"quarantined after {st['consecutive5xx']} consecutive 5xx (cooldown {SPARDA_QUARANTINE_MS}ms)")

sparda_router = APIRouter()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def sync_fetch(url, method, headers, body_bytes):
    req = urllib.request.Request(url, data=body_bytes, headers=headers, method=method)
    try:
        with urllib.request.urlopen(req, timeout=30) as response:
            status = response.status
            content_type = response.headers.get("content-type", "")
            data = response.read()
            return status, content_type, data
    except urllib.error.HTTPError as e:
        return e.code, e.headers.get("content-type", ""), e.read()
    except Exception as e:
        raise e

@sparda_router.get("/tools")
async def get_tools(request: Request):
    if request.headers.get("x-sparda-key") != SPARDA_LOCAL_KEY:
        return JSONResponse(status_code=401, content={"error": "unauthorized"})
    return SPARDA_TOOLS

@sparda_router.get("/stats")
async def get_stats(request: Request):
    if request.headers.get("x-sparda-key") != SPARDA_LOCAL_KEY:
        return JSONResponse(status_code=401, content={"error": "unauthorized"})
    return {"uptimeSec": round(time.time() - SPARDA_START), "stats": SPARDA_STATS, "quarantine": SPARDA_QUARANTINE}

@sparda_router.get("/events")
async def get_events(request: Request):
    if request.headers.get("x-sparda-key") != SPARDA_LOCAL_KEY:
        return JSONResponse(status_code=401, content={"error": "unauthorized"})
    try:
        since = int(request.query_params.get("since", "0"))
    except ValueError:
        since = 0
    return {"seq": SPARDA_SEQ, "events": [e for e in SPARDA_EVENTS if e["seq"] > since]}

@sparda_router.post("/invoke")
async def invoke_tool(request: Request):
    if request.headers.get("x-sparda-key") != SPARDA_LOCAL_KEY:
        return JSONResponse(status_code=401, content={"error": "unauthorized"})

    try:
        body = await request.json()
    except Exception:
        body = {}

    tool = body.get("tool")
    args = body.get("args", {})
    
    spec = SPARDA_TOOLS.get(tool)
    if not spec:
        return JSONResponse(status_code=404, content={"error": f"unknown tool: {tool}"})

    if not spec.get("enabled"):
        return JSONResponse(status_code=403, content={
            "error": f"tool disabled (write-safety): {tool}",
            "hint": "Enable it in sparda.json, then re-run: npx sparda-mcp init"
        })

    path = spec.get("path")
    if path.startswith("/mcp"):
        return JSONResponse(status_code=400, content={"error": "self-referential tool blocked (loop protection)"})

    quarantined = SPARDA_QUARANTINE.get(tool)
    if quarantined:
        now_ms = time.time() * 1000
        if now_ms < quarantined["until"]:
            return JSONResponse(status_code=503, content={
                "error": f"tool quarantined (immune system): {tool}",
                "reason": quarantined["reason"],
                "retryInMs": round(quarantined["until"] - now_ms),
            })
        # half-open: cooldown elapsed — allow one probe; a single new 5xx re-quarantines
        del SPARDA_QUARANTINE[tool]
        if tool in SPARDA_STATS:
            SPARDA_STATS[tool]["consecutive5xx"] = 2

    t0 = time.time()
    try:
        path_params = spec.get("pathParams", [])

        def repl(match):
            name = match.group(1)
            val = args.get(name)
            if val is None:
                raise HTTPException(status_code=400, detail=f"missing path param: {name}")
            return urllib.parse.quote(str(val))

        resolved_path = re.sub(r'\{(\w+)\}', repl, path)

        query_params = []
        for k, v in args.items():
            if k == 'body' or k in path_params or v is None:
                continue
            query_params.append(f"{urllib.parse.quote(k)}={urllib.parse.quote(str(v))}")
        
        if query_params:
            resolved_path += "?" + "&".join(query_params)

        url = f"http://127.0.0.1:{SPARDA_PORT}{resolved_path}"
        method = spec.get("method", "GET")
        
        headers = {}
        body_bytes = None
        
        if method != "GET" and args.get("body") is not None:
            headers["content-type"] = "application/json"
            body_bytes = json.dumps(args["body"]).encode("utf-8")

        loop = asyncio.get_running_loop()
        status, content_type, data_bytes = await loop.run_in_executor(
            executor, sync_fetch, url, method, headers, body_bytes
        )
        
        text_data = data_bytes.decode("utf-8", errors="ignore")
        try:
            data = json.loads(text_data)
        except Exception:
            data = text_data

        sparda_record(tool, status, round((time.time() - t0) * 1000))
        if status >= 500:
            sparda_event("invoke", tool, status, text_data[:200])
        return {"upstreamStatus": status, "data": data}
    except HTTPException as e:
        sparda_record(tool, e.status_code, round((time.time() - t0) * 1000))
        sparda_event("invoke", tool, e.status_code, e.detail)
        return JSONResponse(status_code=e.status_code, content={"error": e.detail})
    except Exception as e:
        sparda_record(tool, 502, round((time.time() - t0) * 1000))
        sparda_event("invoke", tool, 502, str(e))
        return JSONResponse(status_code=502, content={"error": str(e)})
