#!/usr/bin/env python3
"""arq-audit-all — the diagnostic-surface counterpart to arq-fix-all.

Where arq-fix-all is the ACTION verb for the capability catalog,
arq-audit-all is the AUDIT verb. Given a class (`ai-provider` for this
seed), it walks every capability in the class, runs a two-layer audit,
and emits per-capability + per-sweep substrate acts so the catalog's
declared state can be reconciled against its observed state.

Layer 1 — Mechanical probe (cheap, deterministic):
    Runs each capability's declared probe_spec (kind = http / command /
    unix-socket / mcp). Measures: reachable? latency p50/p99? explicit
    status-code matches `expect_status`? Returns structured result.

Layer 2 — LLM interrogation (judgment, only if Layer 1 is green):
    Composes a prompt from (a) the capability's declared dream_requirements
    + its employee-type KPI spec (arq://doc/kpi-spec/ai-provider-v1 for
    this seed) + (b) the observed Layer-1 signal + live metadata. Dispatches
    the prompt to a Tier-0.5 free-tier LLM via arq-call. Returns a verdict:
    `satisfies-dream` / `partial` / `breach` + reasoning.

Composite scoring:
    Per capability, combine (mechanical_green=1.0, yellow=0.5, red=0.0)
    with LLM readiness (yes=1.0, partial=0.5, no=0.0) into a composite
    [0, 1]. Law-Zero floor-breaches (safety-class capability failed probe,
    OR LLM flagged law-zero breach) are pinned to priority 0 (highest)
    regardless of composite score.

Substrate emission:
    Per capability:       arq://act/capability_audited/<cap>-<ts>
    Per sweep (summary):  arq://act/arq_audit_all_sweep/<class>-<ts>

Alignment with canonised + drafted principles:
  arq://doc/principle/mesh-diagnostic-surface-keeps-pace-with-action-surface-v1
      — the principle this verb exists to satisfy (Fork-2 seed)
  arq://doc/principle/substrate-is-the-exchange-v1
      — audit reads the catalog, writes results back as signed acts
  arq://doc/principle/one-primitive-speaks-every-protocol-v1
      — probe dispatch uses each capability's declared probe_spec.kind
  arq://doc/principle/worker-output-is-substrate-mediated-v1
      — the sweep's findings land as substrate acts first, stdout second
  arq://doc/principle/claim-on-address-v1
      — each capability is claimed before audit to prevent sibling duplicates
  arq://doc/principle/action-owner-and-worker-v1
      — arq-audit-all is action-owner-as-orchestrator; probes + LLMs
        are workers with declared trigger/strength/weakness

Covenantal boundaries (Law Zero — C = infinity):
  - safety-class capabilities (tag `law-zero-check` / `governance-evaluator`
    / `safety-gate`) that fail their probe get floor-breach priority
  - audit-time LLM interrogation NEVER routes through Anthropic / paid
    providers in this first cut — audit must not inflate the bill it
    measures. Only Tier-0.5 free-tier providers are used for interrogation.
  - an LLM verdict that flags any law-zero concern bypasses composite
    ranking and is pinned as floor-breach

Shape:
    arq-audit-all --class ai-provider                   # full sweep, emit acts
    arq-audit-all --class ai-provider --dry-run         # no substrate emission
    arq-audit-all --class ai-provider --limit 3         # only first N
    arq-audit-all --class ai-provider --no-llm          # probe only, skip LLM
    arq-audit-all --class ai-provider --json            # JSON output
    arq-audit-all --class ai-provider --capability-api URL
    arq-audit-all --class ai-provider --parallelism N
"""

from __future__ import annotations

import argparse
import concurrent.futures
import hashlib
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent))
try:
    from _arq_primitive import primitive_invoke as _primitive_invoke  # type: ignore
except ImportError:
    _primitive_invoke = None  # type: ignore

TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
ACTOR_PEER = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)

DEFAULT_CAPABILITY_API = os.environ.get(
    "ARQ_CAPABILITY_API",
    "https://staging.arqera.io/api/capability",
)

# Tags identifying safety-class capabilities. Failing probe on any of
# these forces floor-breach priority regardless of composite score.
SAFETY_CLASS_TAGS = frozenset({
    "law-zero-check",
    "governance-evaluator",
    "safety-gate",
})

# Tier-0.5 free providers acceptable for audit-interrogation dispatch.
# Order matters — first in the list is tried first; on 5xx / rate-limit
# we fall through to the next. Paid providers are EXCLUDED on purpose:
# audit must not inflate the bill it measures.
TIER_0_5_INTERROGATORS = (
    "cerebras",       # wafer-scale, free-tier 30 RPM
    "groq-free",      # fastest, 30 RPM free
    "sambanova",      # llama free-tier
    "mistral-free",   # eu-region free
)

# Capability classes the employee-type KPI spec map recognises. Add here
# as new specs land; the LLM prompt composer reads the spec URL from
# this map so audit can compose against the canonical KPI contract.
CLASS_TO_KPI_SPEC: dict[str, str] = {
    "ai-provider": "arq://doc/kpi-spec/ai-provider-v1",
    # Future seeds (compute-peer, mesh, payments, ...) plug in here.
}

# Capability class -> summary of the "dream" for LLM prompts. Short,
# canonical. When the substrate projection of a kpi-spec is fetchable
# server-side, this inline copy becomes a fallback.
CLASS_DREAM_SUMMARY: dict[str, str] = {
    "ai-provider": (
        "Every ai-provider is expected to (1) accept completion requests "
        "via HTTP, (2) respond within bounded latency, (3) produce quality "
        "proportional to tier/cost, (4) respect declared rate limits, "
        "(5) honour declared context window, (6) return structured errors, "
        "(7) authenticate via declared mechanism, (8) bill at declared rate, "
        "(9) log official uptime. KPIs: latency_p50_ms, latency_p99_ms, "
        "error_rate_30d, cost_per_1k_tokens_gbp, quality_score, "
        "context_window_tokens, uptime_percent_30d, rate_limit_hit_rate, "
        "auth_failures_30d. Trust = weighted sum. Law Zero: trust cannot "
        "silence a legitimate safety gate."
    ),
}


# ---------------------------------------------------------------------
# Helpers — ts, hashing, slug
# ---------------------------------------------------------------------


def _ts_ms() -> int:
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def _sha256(data: str) -> str:
    return hashlib.sha256(data.encode("utf-8", errors="replace")).hexdigest()


def _slug(s: str) -> str:
    """Filesystem-safe slug for substrate-reference IDs."""
    out = []
    for ch in s:
        if ch.isalnum() or ch in "-_.":
            out.append(ch)
        else:
            out.append("-")
    return "".join(out)[:80]


# ---------------------------------------------------------------------
# Substrate emission — daemon first, twin subprocess fallback
# ---------------------------------------------------------------------


def _emit_via_daemon(
    class_: str, type_: str, ref: str, payload: dict
) -> tuple[str, str | None]:
    if _primitive_invoke is None:
        return ("unavailable", None)
    try:
        outcome, result = _primitive_invoke(
            "daemon.act.emit",
            {
                "class": class_,
                "type": type_,
                "reference": ref,
                "payload": payload,
            },
            ACTOR_PEER,
            timeout_s=10.0,
        )
    except Exception:
        return ("unavailable", None)
    if outcome == "ok" and isinstance(result, dict):
        return ("ok", result.get("address"))
    if outcome in ("unknown_verb", "error"):
        return (outcome, None)
    return ("unavailable", None)


def _emit_via_twin_subprocess(
    class_: str, type_: str, ref: str, payload: dict
) -> str | None:
    if not TWIN_CLI.exists():
        return None
    try:
        r = subprocess.run(
            [
                str(TWIN_CLI), "--use-keychain", "act", "emit",
                class_, type_, ref, "--payload", json.dumps(payload),  # noqa: ARQ-NO-JSON-HOT-PATH twin CLI input boundary — twin accepts --payload <json>
            ],
            capture_output=True, text=True, timeout=5, check=False,
        )
        if r.returncode == 0:
            for line in r.stdout.splitlines():
                line = line.strip()
                if line.startswith("arq://"):
                    return line
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


def _emit(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    """Fire-and-forget emission — daemon first, twin fallback."""
    outcome, address = _emit_via_daemon(class_, type_, ref, payload)
    if outcome == "ok":
        return address
    if outcome == "error":
        return None
    return _emit_via_twin_subprocess(class_, type_, ref, payload)


# ---------------------------------------------------------------------
# Claim-on-address — prevent sibling duplicate audits
# ---------------------------------------------------------------------


def claim_capability(
    address: str, task: str = "audit", ttl_s: int = 300,
) -> tuple[bool, str | None]:
    """Attempt to claim this capability for audit via the primitive.

    Returns (claimed, claim_token). claimed=False means either a sibling
    already holds the claim OR the daemon doesn't implement the verb yet.
    Behaviour: non-claim is logged but does not halt the audit — we emit
    a `capability_audit_claim_miss` act so substrate records the race,
    and proceed best-effort. Full enforcement lands when the daemon
    exposes capability.claim as a blocking verb.
    """
    if _primitive_invoke is None:
        return (True, None)  # no daemon, no race, proceed
    try:
        outcome, result = _primitive_invoke(
            "capability.claim",
            {"address": address, "task": task, "ttl_s": ttl_s},
            ACTOR_PEER,
            timeout_s=5.0,
        )
    except Exception:
        return (True, None)
    if outcome == "ok" and isinstance(result, dict):
        return (True, result.get("claim_token"))
    # unknown_verb / error / unavailable — substrate-log the miss and
    # proceed. We don't want the audit to become less reliable than
    # arq-fix-all just because the claim verb isn't live yet.
    _emit("act", "capability_audit_claim_miss", _slug(address)[:60], {
        "address": address,
        "task": task,
        "outcome": outcome,
        "actor_peer": ACTOR_PEER,
        "ts_ms": _ts_ms(),
    })
    return (True, None)


# ---------------------------------------------------------------------
# HTTP helpers — read-only
# ---------------------------------------------------------------------


def _http_get_json(url: str, timeout_s: float = 15.0):
    try:
        req = urllib.request.Request(
            url,
            headers={"Accept": "application/json", "User-Agent": "arq-audit-all/0.1"},
        )
        with urllib.request.urlopen(req, timeout=timeout_s) as resp:
            if resp.status != 200:
                return None
            body = resp.read().decode("utf-8", errors="replace")
            return json.loads(body)
    except (urllib.error.URLError, json.JSONDecodeError, OSError, TimeoutError):
        return None


# ---------------------------------------------------------------------
# Catalog query
# ---------------------------------------------------------------------


def query_class_capabilities(
    class_tag: str,
    api_base: str,
    state: str = "live",
    timeout_s: float = 15.0,
    limit: int = 200,
) -> list[dict]:
    """Fetch all live capabilities with the given class tag, hydrated
    with full probe_spec + dream_requirements + cost_model."""
    url = (
        f"{api_base.rstrip('/')}/state?state={state}&tag={class_tag}"
        f"&limit={limit}"
    )
    listing = _http_get_json(url, timeout_s=timeout_s)
    if listing is None:
        return []
    if isinstance(listing, dict) and "items" in listing:
        raw = listing["items"]
    elif isinstance(listing, list):
        raw = listing
    else:
        return []

    hydrated: list[dict] = []
    for row in raw[:limit]:
        name = row.get("name")
        if not name:
            continue
        detail = _http_get_json(
            f"{api_base.rstrip('/')}/{name}?liveness_limit=5",
            timeout_s=timeout_s,
        )
        if detail is None:
            continue
        hydrated.append(detail)
    return hydrated


# ---------------------------------------------------------------------
# Layer 1 — Mechanical probes
# ---------------------------------------------------------------------


def _is_safety_class(tags: list[str] | None) -> bool:
    if not tags:
        return False
    return any(t in SAFETY_CLASS_TAGS for t in tags)


def _expand_env_in_header(auth_spec: str | None) -> dict:
    """Expand ${ENV_VAR} placeholders in an `auth` string like
    'header:Authorization:Bearer ${GROQ_API_KEY}'.

    Returns a {header_name: header_value} dict. Returns empty dict if
    the placeholder resolves to nothing (we'll probe unauthenticated
    and let the server respond — 401/403 is a valid reachability signal
    for many providers).
    """
    if not auth_spec or not auth_spec.startswith("header:"):
        return {}
    parts = auth_spec.split(":", 2)
    if len(parts) != 3:
        return {}
    _, hname, hval = parts
    # Expand ${VAR}
    if "${" in hval and "}" in hval:
        start = hval.index("${")
        end = hval.index("}", start)
        var = hval[start + 2:end]
        val = os.environ.get(var, "")
        hval = hval[:start] + val + hval[end + 1:]
    if not hval or "${" in hval:
        return {}
    return {hname: hval}


def probe_http(probe_spec: dict, timeout_s: float = 10.0) -> dict:
    """HTTP reachability probe.

    Returns: {outcome: ok|yellow|red|skipped, status_code, latency_ms,
    detail, response_hash}.
    - ok: HTTP 2xx/3xx, OR matches explicit expect_status
    - yellow: auth-required response (401/403) — endpoint is reachable
    - red: connection error, 5xx, or timeout
    """
    url = probe_spec.get("url") or probe_spec.get("endpoint")
    if not url:
        return {
            "outcome": "skipped",
            "status_code": None,
            "latency_ms": 0,
            "detail": "probe_spec missing url/endpoint",
            "response_hash": None,
        }
    method = probe_spec.get("method", "GET").upper()
    expect = probe_spec.get("expect_status", 200)
    headers = {"User-Agent": "arq-audit-all/0.1", "Accept": "application/json"}
    headers.update(_expand_env_in_header(probe_spec.get("auth")))

    t0 = time.monotonic()
    try:
        req = urllib.request.Request(url, headers=headers, method=method)
        with urllib.request.urlopen(req, timeout=timeout_s) as resp:
            raw = resp.read(2048).decode("utf-8", errors="replace")
            latency_ms = int((time.monotonic() - t0) * 1000)
            if resp.status == expect or 200 <= resp.status < 400:
                return {
                    "outcome": "ok",
                    "status_code": resp.status,
                    "latency_ms": latency_ms,
                    "detail": f"reachable, status {resp.status}",
                    "response_hash": _sha256(raw),
                }
            return {
                "outcome": "yellow",
                "status_code": resp.status,
                "latency_ms": latency_ms,
                "detail": f"unexpected status {resp.status}",
                "response_hash": _sha256(raw),
            }
    except urllib.error.HTTPError as e:
        latency_ms = int((time.monotonic() - t0) * 1000)
        # 401/403 means reachable + auth-gated. Good reachability signal.
        if e.code in (401, 403):
            return {
                "outcome": "yellow",
                "status_code": e.code,
                "latency_ms": latency_ms,
                "detail": f"auth-required (status {e.code})",
                "response_hash": None,
            }
        if e.code == expect:
            return {
                "outcome": "ok",
                "status_code": e.code,
                "latency_ms": latency_ms,
                "detail": f"expected status {e.code}",
                "response_hash": None,
            }
        return {
            "outcome": "red" if e.code >= 500 else "yellow",
            "status_code": e.code,
            "latency_ms": latency_ms,
            "detail": f"HTTP error {e.code}",
            "response_hash": None,
        }
    except (urllib.error.URLError, TimeoutError, OSError) as e:
        latency_ms = int((time.monotonic() - t0) * 1000)
        msg = str(e).lower()
        is_timeout = "timed out" in msg or "timeout" in msg
        return {
            "outcome": "red",
            "status_code": None,
            "latency_ms": latency_ms,
            "detail": f"{'timeout' if is_timeout else 'unreachable'}: {str(e)[:200]}",
            "response_hash": None,
        }


def probe_command(probe_spec: dict, timeout_s: float = 30.0) -> dict:
    """Command-kind probe — run the declared CLI via arq-cli-exec."""
    cli = probe_spec.get("cli")
    args = probe_spec.get("args") or []
    if not cli or not isinstance(args, list):
        return {
            "outcome": "skipped",
            "status_code": None,
            "latency_ms": 0,
            "detail": "probe_spec command requires cli + args list",
            "response_hash": None,
        }
    arq_cli_exec = Path(__file__).parent / "arq-cli-exec"
    if not arq_cli_exec.exists():
        return {
            "outcome": "skipped",
            "status_code": None,
            "latency_ms": 0,
            "detail": "arq-cli-exec not found beside arq-audit-all",
            "response_hash": None,
        }
    t0 = time.monotonic()
    try:
        r = subprocess.run(
            [str(arq_cli_exec), cli, *args],
            capture_output=True, text=True, timeout=timeout_s, check=False,
        )
        latency_ms = int((time.monotonic() - t0) * 1000)
        combined = (r.stdout or "") + (r.stderr or "")
        return {
            "outcome": "ok" if r.returncode == 0 else "red",
            "status_code": r.returncode,
            "latency_ms": latency_ms,
            "detail": combined[-300:],
            "response_hash": _sha256(combined),
        }
    except subprocess.TimeoutExpired:
        latency_ms = int((time.monotonic() - t0) * 1000)
        return {
            "outcome": "red",
            "status_code": 124,
            "latency_ms": latency_ms,
            "detail": f"arq-cli-exec timeout after {timeout_s}s",
            "response_hash": None,
        }


def probe_unix_socket(probe_spec: dict, timeout_s: float = 10.0) -> dict:
    """Unix-socket reachability probe."""
    socket_path = probe_spec.get("socket")
    path = probe_spec.get("path", "/")
    if not socket_path:
        return {
            "outcome": "skipped",
            "status_code": None,
            "latency_ms": 0,
            "detail": "probe_spec unix-socket requires 'socket'",
            "response_hash": None,
        }
    if not Path(socket_path).exists():
        return {
            "outcome": "red",
            "status_code": None,
            "latency_ms": 0,
            "detail": f"socket not found: {socket_path}",
            "response_hash": None,
        }
    t0 = time.monotonic()
    try:
        r = subprocess.run(
            [
                "curl", "-s", "-o", "/dev/null", "-w", "%{http_code}",
                "--unix-socket", socket_path, f"http://local{path}",
            ],
            capture_output=True, text=True, timeout=timeout_s, check=False,
        )
        latency_ms = int((time.monotonic() - t0) * 1000)
        code = (r.stdout or "").strip()
        try:
            code_int = int(code)
        except ValueError:
            code_int = 0
        return {
            "outcome": "ok" if 200 <= code_int < 500 else "red",
            "status_code": code_int,
            "latency_ms": latency_ms,
            "detail": f"unix-socket status {code_int}",
            "response_hash": None,
        }
    except subprocess.TimeoutExpired:
        latency_ms = int((time.monotonic() - t0) * 1000)
        return {
            "outcome": "red",
            "status_code": 124,
            "latency_ms": latency_ms,
            "detail": f"curl unix-socket timeout after {timeout_s}s",
            "response_hash": None,
        }


def probe_mcp(probe_spec: dict, timeout_s: float = 10.0) -> dict:
    """MCP JSON-RPC reachability probe. Requires ARQ_MCP_ACCESS_TOKEN
    (Cloudflare Access service token)."""
    access_token = os.environ.get("ARQ_MCP_ACCESS_TOKEN")
    if not access_token:
        return {
            "outcome": "skipped",
            "status_code": None,
            "latency_ms": 0,
            "detail": (
                "MCP probe requires ARQ_MCP_ACCESS_TOKEN "
                "(Cloudflare Access service token)"
            ),
            "response_hash": None,
        }
    url = probe_spec.get("url") or "https://mcp.arqera.io/jsonrpc"
    body = json.dumps({
        "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {},
    }).encode("utf-8")
    t0 = time.monotonic()
    try:
        cid = access_token.split(":", 1)[0] if ":" in access_token else access_token
        csec = access_token.split(":", 1)[1] if ":" in access_token else ""
        req = urllib.request.Request(
            url, data=body,
            headers={
                "Content-Type": "application/json",
                "User-Agent": "arq-audit-all/0.1",
                "CF-Access-Client-Id": cid,
                "CF-Access-Client-Secret": csec,
            },
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=timeout_s) as resp:
            latency_ms = int((time.monotonic() - t0) * 1000)
            raw = resp.read(1024).decode("utf-8", errors="replace")
            return {
                "outcome": "ok" if 200 <= resp.status < 300 else "yellow",
                "status_code": resp.status,
                "latency_ms": latency_ms,
                "detail": f"MCP status {resp.status}",
                "response_hash": _sha256(raw),
            }
    except (urllib.error.URLError, OSError, TimeoutError) as e:
        latency_ms = int((time.monotonic() - t0) * 1000)
        return {
            "outcome": "red",
            "status_code": None,
            "latency_ms": latency_ms,
            "detail": f"MCP probe failed: {str(e)[:200]}",
            "response_hash": None,
        }


def run_mechanical_probe(cap: dict, timeout_s: float = 10.0) -> dict:
    """Run the capability's declared probe_spec. Returns a dict shaped:
    {outcome, status_code, latency_ms, detail, response_hash, kind}."""
    probe_spec = cap.get("probe_spec") or {}
    kind = probe_spec.get("kind") or probe_spec.get("type", "http")
    try:
        if kind in ("http", "http_get", "http_post"):
            result = probe_http(probe_spec, timeout_s=timeout_s)
        elif kind == "command":
            result = probe_command(probe_spec, timeout_s=max(timeout_s, 30.0))
        elif kind in ("unix-socket", "unix_socket"):
            result = probe_unix_socket(probe_spec, timeout_s=timeout_s)
        elif kind == "mcp":
            result = probe_mcp(probe_spec, timeout_s=timeout_s)
        else:
            result = {
                "outcome": "skipped",
                "status_code": None,
                "latency_ms": 0,
                "detail": f"probe_spec.kind='{kind}' not supported",
                "response_hash": None,
            }
    except Exception as exc:  # noqa: BLE001 — probe must never raise
        result = {
            "outcome": "red",
            "status_code": None,
            "latency_ms": 0,
            "detail": f"probe exception: {type(exc).__name__}: {exc}"[:300],
            "response_hash": None,
        }
    result["kind"] = kind
    return result


# ---------------------------------------------------------------------
# Layer 2 — LLM interrogation
# ---------------------------------------------------------------------


def _compose_interrogation_prompt(
    cap: dict, class_tag: str, probe_result: dict,
) -> str:
    """Compose the interrogation prompt. Short + structured.

    The prompt feeds: (1) dream summary for the class, (2) KPI spec
    anchor URL, (3) declared capability metadata, (4) observed probe
    signal. Asks for a structured verdict the caller can parse.
    """
    dream = CLASS_DREAM_SUMMARY.get(class_tag, f"Expected behaviour for class '{class_tag}'.")
    kpi_anchor = CLASS_TO_KPI_SPEC.get(class_tag, "(no kpi spec registered)")
    declared = {
        "name": cap.get("name"),
        "tags": cap.get("tags"),
        "state": cap.get("state"),
        "cost_model": cap.get("cost_model"),
        "dream_requirements": cap.get("dream_requirements"),
    }
    observed = {
        "probe_kind": probe_result.get("kind"),
        "probe_outcome": probe_result.get("outcome"),
        "status_code": probe_result.get("status_code"),
        "latency_ms": probe_result.get("latency_ms"),
        "detail": probe_result.get("detail"),
        "recent_liveness_count": len(cap.get("recent_liveness") or []),
    }
    return (
        "You are auditing one capability in the ARQERA substrate. Given "
        "the dream (what this class is expected to do), the KPI spec, "
        "the capability's own declared metadata, and the live probe "
        "signal, render a short structured verdict. No hedging.\n\n"
        f"CLASS: {class_tag}\n"
        f"KPI_SPEC: {kpi_anchor}\n"
        f"DREAM: {dream}\n\n"
        f"DECLARED: {json.dumps(declared, default=str)}\n"
        f"OBSERVED: {json.dumps(observed, default=str)}\n\n"
        "Respond with ONLY a single JSON object matching this schema:\n"
        "{\n"
        '  "verdict": "satisfies-dream" | "partial" | "breach",\n'
        '  "confidence": 0.0-1.0,\n'
        '  "reasoning": "one sentence, <200 chars",\n'
        '  "law_zero_concern": true | false,\n'
        '  "top_kpi_risk": "<one KPI name from the spec>"\n'
        "}"
    )


def _parse_interrogation_response(raw: str) -> dict | None:
    """Parse the LLM's JSON response. Tolerates fenced blocks + prose."""
    if not raw:
        return None
    raw = raw.strip()
    # Strip markdown fences
    if raw.startswith("```"):
        lines = raw.splitlines()
        # Drop first fence line + optional language, drop last fence.
        lines = lines[1:]
        if lines and lines[-1].startswith("```"):
            lines = lines[:-1]
        raw = "\n".join(lines).strip()
    # Find first JSON object.
    start = raw.find("{")
    end = raw.rfind("}")
    if start < 0 or end < 0 or end <= start:
        return None
    try:
        obj = json.loads(raw[start:end + 1])
    except json.JSONDecodeError:
        return None
    if not isinstance(obj, dict):
        return None
    verdict = obj.get("verdict")
    if verdict not in ("satisfies-dream", "partial", "breach"):
        return None
    return obj


def _call_arq_call(worker: str, verb: str, payload: dict, timeout_s: float = 30.0) -> tuple[int, str]:
    """Dispatch via the arq-call wrapper. Returns (returncode, stdout)."""
    arq_call = Path(__file__).parent / "arq-call"
    if not arq_call.exists():
        return (127, "")
    try:
        r = subprocess.run(
            [str(arq_call), worker, verb, "--payload", json.dumps(payload)],  # noqa: ARQ-NO-JSON-HOT-PATH arq-call CLI input boundary — accepts --payload <json>
            capture_output=True, text=True, timeout=timeout_s, check=False,
        )
        return (r.returncode, r.stdout or "")
    except subprocess.TimeoutExpired:
        return (124, "")
    except OSError:
        return (127, "")


def run_llm_interrogation(
    cap: dict,
    class_tag: str,
    probe_result: dict,
    interrogators: tuple[str, ...] = TIER_0_5_INTERROGATORS,
    timeout_s: float = 30.0,
) -> dict:
    """Dispatch an interrogation prompt to a Tier-0.5 free-tier LLM.

    Iterates `interrogators` in order; on 5xx / rate-limit / parse fail,
    falls through to next. Returns a dict shaped:
        {outcome: ok|skipped|error,
         interrogator: <worker name or None>,
         verdict, confidence, reasoning, law_zero_concern,
         top_kpi_risk, latency_ms, raw_preview}
    """
    if probe_result.get("outcome") == "red":
        return {
            "outcome": "skipped",
            "interrogator": None,
            "verdict": None,
            "reasoning": "mechanical probe red — skipping LLM interrogation",
            "law_zero_concern": False,
            "latency_ms": 0,
        }

    prompt = _compose_interrogation_prompt(cap, class_tag, probe_result)

    for worker in interrogators:
        # Pick a sensible default model per worker. These match arq-call's
        # catalogue expectations — unknown models get rejected by upstream,
        # at which point we fall through.
        default_model = {
            "cerebras": "llama-3.3-70b",
            "groq-free": "llama-3.3-70b-versatile",
            "sambanova": "Meta-Llama-3.1-70B-Instruct",
            "mistral-free": "mistral-small-latest",
        }.get(worker, "llama-3.3-70b")
        payload = {
            "model": default_model,
            "messages": [
                {
                    "role": "system",
                    "content": (
                        "You are a capability auditor. Respond with ONLY "
                        "the requested JSON object, no prose."
                    ),
                },
                {"role": "user", "content": prompt},
            ],
            "max_tokens": 400,
            "temperature": 0.0,
        }
        t0 = time.monotonic()
        rc, stdout = _call_arq_call(
            worker, "chat.completions", payload, timeout_s=timeout_s,
        )
        latency_ms = int((time.monotonic() - t0) * 1000)
        if rc != 0 or not stdout:
            continue
        # arq-call prints the provider response body on success. Parse the
        # OpenAI-compat envelope to extract the assistant message.
        content = _extract_openai_compat_content(stdout)
        if content is None:
            continue
        parsed = _parse_interrogation_response(content)
        if parsed is None:
            continue
        return {
            "outcome": "ok",
            "interrogator": worker,
            "verdict": parsed.get("verdict"),
            "confidence": parsed.get("confidence"),
            "reasoning": (parsed.get("reasoning") or "")[:200],
            "law_zero_concern": bool(parsed.get("law_zero_concern")),
            "top_kpi_risk": parsed.get("top_kpi_risk"),
            "latency_ms": latency_ms,
            "raw_preview": content[-500:],
        }

    # All interrogators exhausted.
    return {
        "outcome": "error",
        "interrogator": None,
        "verdict": None,
        "reasoning": "all Tier-0.5 interrogators failed or rate-limited",
        "law_zero_concern": False,
        "latency_ms": 0,
    }


def _extract_openai_compat_content(raw_stdout: str) -> str | None:
    """arq-call prints the provider response as JSON. Extract the
    assistant's message.content from an OpenAI-compat envelope."""
    # The provider response may be emitted alongside envelope metadata.
    # Try to find the first JSON object that has choices[].message.content.
    candidates = []
    start = 0
    while True:
        idx = raw_stdout.find("{", start)
        if idx < 0:
            break
        candidates.append(idx)
        start = idx + 1
    for c in candidates:
        # Greedy: take from c to end, let json.JSONDecoder find the balanced object.
        try:
            dec = json.JSONDecoder()
            obj, _ = dec.raw_decode(raw_stdout[c:])
        except (json.JSONDecodeError, ValueError):
            continue
        if not isinstance(obj, dict):
            continue
        choices = obj.get("choices")
        if isinstance(choices, list) and choices:
            msg = (choices[0] or {}).get("message") or {}
            content = msg.get("content")
            if isinstance(content, str) and content.strip():
                return content
    return None


# ---------------------------------------------------------------------
# Composite scoring
# ---------------------------------------------------------------------


def compute_composite(
    cap: dict, probe_result: dict, llm_result: dict,
) -> dict:
    """Combine mechanical + LLM into a composite readiness score.

    Returns: {score: 0.0-1.0, priority: 0-3, floor_breach: bool,
              reason: str}
    Priority 0 = floor-breach (highest urgency); 3 = healthy.
    """
    mech_map = {"ok": 1.0, "yellow": 0.5, "red": 0.0, "skipped": 0.5}
    mech_scalar = mech_map.get(probe_result.get("outcome"), 0.0)

    llm_map = {"satisfies-dream": 1.0, "partial": 0.5, "breach": 0.0}
    reasoning = llm_result.get("reasoning") or ""
    llm_skipped_by_flag = llm_result.get("outcome") == "skipped" and "--no-llm" in reasoning
    llm_skipped_red_probe = llm_result.get("outcome") == "skipped" and not llm_skipped_by_flag

    if llm_result.get("outcome") == "ok":
        llm_scalar = llm_map.get(llm_result.get("verdict"), 0.5)
    elif llm_skipped_red_probe:
        # LLM skipped because probe was red — mechanical already contributes
        # the failure signal; keep llm_scalar=0.0 so composite is not inflated.
        llm_scalar = 0.0
    elif llm_skipped_by_flag:
        # LLM skipped because operator passed --no-llm. The probe is the
        # ONLY observable signal; mirror mech_scalar so the composite
        # reflects mechanical reality (avoids capping a healthy --no-llm
        # run at 0.6, which would misclassify it as "mixed" at the 0.7
        # healthy threshold). Surfaced by Sentry on PR #3454.
        llm_scalar = mech_scalar
    else:
        # LLM error — treat as unknown; weight 0.5 to avoid double-counting
        # a probe signal that already contributes via mech_scalar.
        llm_scalar = 0.5

    # 60% mechanical, 40% LLM — probe is cheap + deterministic, LLM is
    # judgment. Weights intentionally rough for the seed; next cut wires
    # per-KPI normalisation per the KPI spec.
    score = 0.6 * mech_scalar + 0.4 * llm_scalar

    # Floor-breach conditions — bypass cost ranking entirely.
    is_safety = _is_safety_class(cap.get("tags"))
    floor_breach = False
    reason_parts: list[str] = []
    if is_safety and probe_result.get("outcome") == "red":
        floor_breach = True
        reason_parts.append("safety-class probe red")
    if llm_result.get("law_zero_concern"):
        floor_breach = True
        reason_parts.append("LLM flagged law-zero concern")
    if (cap.get("dream_requirements") or {}).get("law_zero") is True and probe_result.get("outcome") == "red":
        floor_breach = True
        reason_parts.append("law-zero-declared capability unreachable")

    if floor_breach:
        priority = 0
    elif score < 0.3:
        priority = 1
    elif score < 0.7:
        priority = 2
    else:
        priority = 3

    return {
        "score": round(score, 3),
        "priority": priority,
        "floor_breach": floor_breach,
        "reason": "; ".join(reason_parts) if reason_parts else (
            f"mech={probe_result.get('outcome')} "
            f"llm={llm_result.get('verdict') or llm_result.get('outcome')}"
        ),
    }


# ---------------------------------------------------------------------
# Per-capability audit
# ---------------------------------------------------------------------


def audit_one(
    cap: dict,
    class_tag: str,
    sweep_ref: str,
    run_llm: bool = True,
    probe_timeout_s: float = 10.0,
    llm_timeout_s: float = 30.0,
    dry_run: bool = False,
) -> dict:
    """Claim → probe → interrogate → score → emit."""
    address = cap.get("address") or f"arq://doc/capability/{cap.get('name')}"

    # Claim (non-blocking on miss — see claim_capability)
    claimed, claim_token = claim_capability(address, task="audit")

    # Layer 1 — mechanical probe
    probe_result = run_mechanical_probe(cap, timeout_s=probe_timeout_s)

    # Layer 2 — LLM interrogation (only if probe was not red + run_llm)
    if run_llm:
        llm_result = run_llm_interrogation(
            cap, class_tag, probe_result, timeout_s=llm_timeout_s,
        )
    else:
        llm_result = {
            "outcome": "skipped",
            "interrogator": None,
            "verdict": None,
            "reasoning": "--no-llm flag",
            "law_zero_concern": False,
            "latency_ms": 0,
        }

    # Composite
    composite = compute_composite(cap, probe_result, llm_result)

    # Per-capability substrate act
    ts = _ts_ms()
    payload = {
        "address": address,
        "capability": cap.get("name"),
        "class": class_tag,
        "sweep_ref": sweep_ref,
        "claimed": claimed,
        "claim_token": claim_token,
        "probe": probe_result,
        "llm": llm_result,
        "composite": composite,
        "actor_peer": ACTOR_PEER,
        "ts_ms": ts,
    }
    emitted_address: str | None = None
    if not dry_run:
        emitted_address = _emit(
            "act", "capability_audited",
            f"{_slug(cap.get('name', 'unknown'))}-{ts}",
            payload,
        )

    return {
        "capability": cap.get("name"),
        "address": address,
        "probe": probe_result,
        "llm": llm_result,
        "composite": composite,
        "emitted_address": emitted_address,
        "claimed": claimed,
    }


# ---------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------


def run_sweep(
    class_tag: str,
    api_base: str,
    limit: int | None,
    parallelism: int,
    run_llm: bool,
    dry_run: bool,
    probe_timeout_s: float = 10.0,
    llm_timeout_s: float = 30.0,
    stream: callable = None,
) -> dict:
    """Sweep every capability of the given class, emit summary act."""

    def log(msg: str):
        if stream:
            stream(msg)

    ts = _ts_ms()
    sweep_ref = f"{_slug(class_tag)}-{ts}"

    if class_tag not in CLASS_TO_KPI_SPEC:
        log(f"[audit] warning: class '{class_tag}' has no KPI spec registered; "
            "LLM prompts will use inline defaults only.")

    # Started act
    if not dry_run:
        _emit("act", "arq_audit_all_started", sweep_ref, {
            "class": class_tag,
            "kpi_spec": CLASS_TO_KPI_SPEC.get(class_tag),
            "actor_peer": ACTOR_PEER,
            "ts_ms": ts,
            "limit": limit,
            "run_llm": run_llm,
        })

    capabilities = query_class_capabilities(class_tag, api_base)
    log(f"[query] {len(capabilities)} live capabilities in class={class_tag}")
    if limit is not None:
        capabilities = capabilities[:limit]
        log(f"[query] limited to {len(capabilities)}")

    if not capabilities:
        summary = {
            "class": class_tag,
            "sweep_ref": sweep_ref,
            "count": 0,
            "verdict": "no_capabilities",
            "results": [],
        }
        if not dry_run:
            _emit("act", "arq_audit_all_sweep", sweep_ref, {
                **summary,
                "actor_peer": ACTOR_PEER,
                "ts_ms": _ts_ms(),
            })
        return summary

    results: list[dict] = []
    with concurrent.futures.ThreadPoolExecutor(
        max_workers=max(1, parallelism)
    ) as executor:
        futures = {
            executor.submit(
                audit_one, cap, class_tag, sweep_ref,
                run_llm, probe_timeout_s, llm_timeout_s, dry_run,
            ): cap
            for cap in capabilities
        }
        for fut in concurrent.futures.as_completed(futures):
            cap = futures[fut]
            try:
                res = fut.result()
            except Exception as exc:  # noqa: BLE001
                res = {
                    "capability": cap.get("name"),
                    "address": cap.get("address"),
                    "probe": {
                        "outcome": "red",
                        "detail": f"audit future raised: {exc}"[:200],
                    },
                    "llm": {"outcome": "error", "reasoning": "audit raised"},
                    "composite": {"score": 0.0, "priority": 1, "floor_breach": False},
                    "emitted_address": None,
                }
            results.append(res)
            log(
                f"[audit] {res['capability']} "
                f"probe={res['probe'].get('outcome')} "
                f"llm={res['llm'].get('verdict') or res['llm'].get('outcome')} "
                f"score={res['composite'].get('score')} "
                f"priority={res['composite'].get('priority')}"
                f"{' FLOOR-BREACH' if res['composite'].get('floor_breach') else ''}"
            )

    # Sort: floor-breaches first (priority 0), then ascending priority.
    results.sort(key=lambda r: (
        r["composite"].get("priority", 3),
        -r["composite"].get("score", 0.0),
    ))

    floor_breaches = sum(
        1 for r in results if r["composite"].get("floor_breach")
    )
    healthy = sum(1 for r in results if r["composite"].get("priority") == 3)
    degraded = sum(1 for r in results if r["composite"].get("priority") == 1)

    summary = {
        "class": class_tag,
        "sweep_ref": sweep_ref,
        "count": len(results),
        "floor_breaches": floor_breaches,
        "healthy": healthy,
        "degraded": degraded,
        "verdict": (
            "law_zero_breach" if floor_breaches > 0
            else "healthy" if healthy == len(results)
            else "mixed"
        ),
        "results": [
            {
                "capability": r["capability"],
                "address": r["address"],
                "probe_outcome": r["probe"].get("outcome"),
                "probe_latency_ms": r["probe"].get("latency_ms"),
                "llm_verdict": r["llm"].get("verdict"),
                "llm_interrogator": r["llm"].get("interrogator"),
                "composite_score": r["composite"].get("score"),
                "priority": r["composite"].get("priority"),
                "floor_breach": r["composite"].get("floor_breach"),
                "reason": r["composite"].get("reason"),
                "emitted_address": r.get("emitted_address"),
            }
            for r in results
        ],
    }

    if not dry_run:
        _emit("act", "arq_audit_all_sweep", sweep_ref, {
            **summary,
            "actor_peer": ACTOR_PEER,
            "ts_ms": _ts_ms(),
        })

    log(
        f"[sweep] verdict={summary['verdict']} "
        f"floor_breaches={floor_breaches} healthy={healthy} "
        f"degraded={degraded} count={len(results)}"
    )

    return summary


# ---------------------------------------------------------------------
# CLI frontend
# ---------------------------------------------------------------------


def main() -> int:
    ap = argparse.ArgumentParser(
        prog="arq-audit-all",
        description=(
            "Diagnostic-surface counterpart to arq-fix-all. Walks every "
            "capability in the given class, runs mechanical probe + "
            "(if green) LLM interrogation against the KPI spec, emits "
            "per-capability + per-sweep substrate acts."
        ),
        epilog=(
            "Examples:\n"
            "  arq-audit-all --class ai-provider --dry-run\n"
            "  arq-audit-all --class ai-provider --limit 3\n"
            "  arq-audit-all --class ai-provider --no-llm\n"
            "  arq-audit-all --class ai-provider --json\n"
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    ap.add_argument(
        "--class", dest="class_tag", required=True,
        help="capability class to audit (e.g., ai-provider)",
    )
    ap.add_argument(
        "--limit", type=int, default=None,
        help="audit only the first N capabilities (default: all)",
    )
    ap.add_argument(
        "--parallelism", type=int, default=3,
        help="max concurrent capability audits (default 3)",
    )
    ap.add_argument(
        "--no-llm", action="store_true",
        help="skip Layer-2 LLM interrogation (mechanical probe only)",
    )
    ap.add_argument(
        "--dry-run", action="store_true",
        help="run audit but do not emit substrate acts",
    )
    ap.add_argument(
        "--capability-api", default=DEFAULT_CAPABILITY_API,
        help=f"catalog API base (default {DEFAULT_CAPABILITY_API})",
    )
    ap.add_argument(
        "--probe-timeout", type=float, default=10.0,
        help="mechanical probe timeout per capability (default 10s)",
    )
    ap.add_argument(
        "--llm-timeout", type=float, default=30.0,
        help="LLM interrogation timeout per capability (default 30s)",
    )
    ap.add_argument(
        "--json", action="store_true", dest="json_out",
        help="emit sweep summary as JSON on stdout",
    )

    args = ap.parse_args()

    def stream(msg: str):
        if not args.json_out:
            sys.stderr.write(f"{msg}\n")

    summary = run_sweep(
        class_tag=args.class_tag,
        api_base=args.capability_api,
        limit=args.limit,
        parallelism=args.parallelism,
        run_llm=not args.no_llm,
        dry_run=args.dry_run,
        probe_timeout_s=args.probe_timeout,
        llm_timeout_s=args.llm_timeout,
        stream=stream,
    )

    if args.json_out:
        print(json.dumps(summary, indent=2))
    else:
        print(
            f"class={summary['class']} "
            f"count={summary.get('count', 0)} "
            f"verdict={summary.get('verdict')} "
            f"floor_breaches={summary.get('floor_breaches', 0)} "
            f"healthy={summary.get('healthy', 0)} "
            f"degraded={summary.get('degraded', 0)}"
        )
        for r in summary.get("results", []):
            mark = "!!" if r.get("floor_breach") else (
                "??" if r.get("priority", 3) <= 2 else "ok"
            )
            print(
                f"  [{mark}] {r['capability']:30s} "
                f"probe={r.get('probe_outcome'):7s} "
                f"llm={str(r.get('llm_verdict')):18s} "
                f"score={r.get('composite_score')} "
                f"({r.get('reason')})"
            )

    # Exit code: 0 = healthy / mixed, 1 = any floor-breach, 2 = invalid.
    return 1 if summary.get("floor_breaches", 0) > 0 else 0


if __name__ == "__main__":
    sys.exit(main())
