#!/usr/bin/env python3
"""arq-fix-all — turn the capability catalog into action.

The "fix all" primitive. The catalog (~115 capabilities, queryable via
`twin capability list --tag X` / `twin capability gaps`) describes WHO
in the mesh can do WHAT. This verb composes that knowledge into a
dispatch: given a problem (address, tag, or `--all-gaps`), rank the
catalog's capable employees, invoke them per their declared
`probe_spec.kind`, compose the responses, emit a substrate outcome.

Shape (from the task spec):

    arq-fix-all <problem-address>            # specific failing thing
    arq-fix-all --tag <problem-class>        # all problems matching this tag
    arq-fix-all --all-gaps                   # enumerate twin capability gaps
    arq-fix-all --dry-run                    # show plan, don't execute
    arq-fix-all --parallelism N              # cap concurrent invocations
    arq-fix-all --law-zero-tier hard|soft|auto
    arq-fix-all --confirm                    # required when dispatching >N=10
    arq-fix-all --capability-api URL         # override catalog API (default staging)

Alignment with canonised principles:
  arq://doc/principle/substrate-is-the-exchange-v1
      — the catalog IS substrate; fix-all reads it, dispatches, writes back
  arq://doc/principle/every-platform-is-a-peer-v1
      — problem-classes route to peer employees via their declared protocol
  arq://doc/principle/one-primitive-speaks-every-protocol-v1
      — each dispatch goes through the employee's declared probe_spec.kind
        (http / command / unix-socket / mcp), all logged uniformly
  arq://doc/principle/action-owner-and-worker-v1
      — fix-all is the action-owner-as-orchestrator; each ranked employee
        is a worker with declared trust / cost / capability boundary
  arq://doc/rule/trust-graded-merge-v1
      — extended from merges to fixes: ranked by trust_score × 1/cost,
        law-zero-class dispatches always go through the hard tier

Covenantal boundaries (Law Zero — C = infinity):
  - safety-class employees (tag `law-zero-check`, `governance-evaluator`,
    `safety-gate`) are ALWAYS consulted BEFORE any fix dispatch; their
    verdict can halt the whole run
  - problems with dream_reqs.law_zero=true OR tag `law-zero-violation`
    are routed hard-tier only regardless of --law-zero-tier
  - an employee can only be routed to for a problem that matches its
    declared `dream_requirements.required_for_tasks` — the catalog's
    declared scope is the boundary, not runtime inference

Emissions per run:
  arq://act/fix_all_started/<problem>-<ts>              (pre-flight)
  arq://act/fix_all_dispatched/<problem>-<employee>-<ts> (per employee, pre)
  arq://act/fix_all_response/<problem>-<employee>-<ts>   (per employee, post)
  arq://act/fix_all_outcome/<problem>-<ts>               (final composed)
  arq://act/fix_all_halted_by_safety/<problem>-<ts>      (if safety blocks)
  arq://act/fix_all_rejected/<problem>-<ts>              (if no candidates)

Usage pattern matches arq-github / arq-cli-exec / arq-nango-proxy — this
wrapper is READ-ONLY to the catalog, ACTION-ONLY to the employees. It
does not write to the catalog itself.
"""

from __future__ import annotations

import argparse
import concurrent.futures
import hashlib
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path

# Shared primitive-invoke helper (daemon.act.emit via /v1/invoke).
sys.path.insert(0, str(Path(__file__).parent))
try:
    from _arq_primitive import primitive_invoke as _primitive_invoke  # type: ignore
except ImportError:
    _primitive_invoke = None  # type: ignore

TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
ACTOR_PEER = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)

# Default capability catalog endpoint. Can be overridden per-call with
# --capability-api for local dev or pre-merge dry-runs against a
# pre-prod replica.
DEFAULT_CAPABILITY_API = os.environ.get(
    "ARQ_CAPABILITY_API",
    "https://staging.arqera.io/api/capability",
)

# Confirm-required threshold. Dispatching more than this many employees
# in a single run requires --confirm. Not a runtime-risk gate (the
# dispatches are still governed per-employee), just a blast-radius
# safety catch so a typoed --all-gaps doesn't fan out to 100 probes.
CONFIRM_THRESHOLD_DEFAULT = 10

# Tags that identify safety-class employees. These are consulted before
# any fix dispatch; a veto from any of them halts the run. Matches
# convention used in other routing services (tag-based rather than
# magic-name, so the catalog owns which employees are safety-class).
SAFETY_CLASS_TAGS = frozenset({
    "law-zero-check",
    "governance-evaluator",
    "safety-gate",
})

# Tags / dream_reqs that force hard-tier routing regardless of flag.
LAW_ZERO_FORCE_HARD_TAGS = frozenset({
    "law-zero-violation",
    "harm-vector",
})


# ---------------------------------------------------------------------
# Helpers — substrate emission (copied shape from arq-cli-exec / arq-kube)
# ---------------------------------------------------------------------


def _ts_ms() -> int:
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def _sha256(data: str) -> str:
    return hashlib.sha256(data.encode("utf-8", errors="replace")).hexdigest()


def _emit_via_daemon(
    class_: str, type_: str, ref: str, payload: dict
) -> tuple[str, str | None]:
    """Try daemon.act.emit. Returns (outcome, address)."""
    if _primitive_invoke is None:
        return ("unavailable", None)
    try:
        outcome, result = _primitive_invoke(
            "daemon.act.emit",
            {
                "class": class_,
                "type": type_,
                "reference": ref,
                "payload": payload,
            },
            ACTOR_PEER,
            timeout_s=10.0,
        )
    except Exception:
        return ("unavailable", None)
    if outcome == "ok" and isinstance(result, dict):
        return ("ok", result.get("address"))
    if outcome in ("unknown_verb", "error"):
        return (outcome, None)
    return ("unavailable", None)


def _emit_via_twin_subprocess(
    class_: str, type_: str, ref: str, payload: dict
) -> str | None:
    """Legacy fallback: keychain-touching twin subprocess."""
    if not TWIN_CLI.exists():
        return None
    try:
        r = subprocess.run(
            [
                str(TWIN_CLI), "--use-keychain", "act", "emit",
                class_, type_, ref, "--payload", json.dumps(payload),  # noqa: ARQ-NO-JSON-HOT-PATH twin CLI input boundary — twin accepts --payload <json>
            ],
            capture_output=True, text=True, timeout=5, check=False,
        )
        if r.returncode == 0:
            for line in r.stdout.splitlines():
                line = line.strip()
                if line.startswith("arq://"):
                    return line
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


def _emit(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    """Fire-and-forget substrate emission — daemon first, twin fallback."""
    outcome, address = _emit_via_daemon(class_, type_, ref, payload)
    if outcome == "ok":
        return address
    if outcome == "error":
        return None
    return _emit_via_twin_subprocess(class_, type_, ref, payload)


# ---------------------------------------------------------------------
# Problem classification
# ---------------------------------------------------------------------


# Address-prefix -> task-class mapping. When a user passes a problem
# address like `arq://act/peer_wedged/<peer>`, we look up the prefix here
# to infer which task-class the problem falls under — and therefore
# which capabilities can fix it.
#
# This map is deliberately small. Unknown prefixes fall through to the
# `--tag` path (caller must pass an explicit tag). Extend cautiously —
# adding a prefix here means every employee declaring that task in its
# `required_for_tasks` becomes a candidate.
ADDRESS_PREFIX_TO_TASK_CLASS: dict[str, str] = {
    "arq://act/peer_wedged/": "compute-peer-recovery",
    "arq://act/peer_crashed/": "compute-peer-recovery",
    "arq://evidence/ci_failure/": "ci-failure-recovery",
    "arq://act/ci_red/": "ci-failure-recovery",
    "arq://act/kpi_breach/": "kpi-breach-response",
    "arq://act/cost_overrun/": "cost-overrun-response",
    "arq://act/security_alert/": "security-alert-response",
    "arq://act/law_zero_violation/": "law-zero-violation",
    "arq://issue/": "github-issue-response",
}


def classify_problem(
    problem: str,
    explicit_tag: str | None = None,
) -> tuple[str, dict]:
    """Infer (task_class, meta) from a problem address or explicit tag.

    If explicit_tag is set, we use it directly. Otherwise we scan
    ADDRESS_PREFIX_TO_TASK_CLASS for a prefix match. Meta carries any
    parsed context (e.g., the peer name parsed from a peer_wedged
    address) that ranker / dispatcher may use.
    """
    if explicit_tag:
        return (explicit_tag, {"source": "explicit_tag", "tag": explicit_tag})
    for prefix, task_class in ADDRESS_PREFIX_TO_TASK_CLASS.items():
        if problem.startswith(prefix):
            suffix = problem[len(prefix):]
            return (task_class, {"source": "address_prefix", "suffix": suffix})
    # Fallback: the problem's address itself can be the tag (catalog may
    # have explicit tags like `ci-failure`). Not an error — just a
    # low-confidence classification caller can inspect.
    return ("unknown", {"source": "unclassified", "problem": problem})


# ---------------------------------------------------------------------
# Catalog query — read-only, via the capability API
# ---------------------------------------------------------------------


def query_capabilities(
    task_class: str,
    api_base: str,
    state: str = "live",
    timeout_s: float = 15.0,
) -> list[dict]:
    """Fetch capabilities for a task-class from the catalog.

    Strategy: call GET /capability/state?state=live (and optionally by
    tag) then filter in-Python by dream_requirements.required_for_tasks
    matching the task_class. This matches how capability/state handles
    tag filtering server-side and keeps the task-class filter honest
    even if the server doesn't expose it directly yet.
    """
    # Optimisation: if task_class looks like a simple tag (no spaces),
    # try tag-filter server-side first — reduces wire volume.
    url = f"{api_base.rstrip('/')}/state?state={state}&tag={task_class}&limit=500"
    items = _http_get_json(url, timeout_s=timeout_s)
    if items is None:
        # Fallback: full list, client-side filter.
        url_full = f"{api_base.rstrip('/')}/state?state={state}&limit=1000"
        full = _http_get_json(url_full, timeout_s=timeout_s)
        if full is None:
            return []
        items = full

    # /state returns shape {count, items: [{name, address, state, tags, ...}]}
    if isinstance(items, dict) and "items" in items:
        raw = items["items"]
    elif isinstance(items, list):
        raw = items
    else:
        return []

    # State endpoint returns summary rows, not full records. We need
    # dream_requirements + probe_spec + cost_model — fetch the full
    # record for each candidate. Capped at 200 to bound this round-trip.
    hydrated: list[dict] = []
    for row in raw[:200]:
        name = row.get("name")
        if not name:
            continue
        detail = _http_get_json(
            f"{api_base.rstrip('/')}/{name}?liveness_limit=1",
            timeout_s=timeout_s,
        )
        if detail is None:
            continue
        hydrated.append(detail)

    # Client-side filter: employee must declare this task in its
    # required_for_tasks (the catalog's declared-capability boundary per
    # the spec's Law-Zero constraint). Safety-class employees always
    # pass — they're consulted for every fix regardless of task.
    filtered: list[dict] = []
    for cap in hydrated:
        tags = cap.get("tags") or []
        if _is_safety_class(tags):
            filtered.append(cap)
            continue
        dream_reqs = cap.get("dream_requirements") or {}
        required_tasks = dream_reqs.get("required_for_tasks") or []
        if task_class in required_tasks or task_class == "unknown":
            filtered.append(cap)

    return filtered


def query_gaps(api_base: str, timeout_s: float = 15.0) -> list[dict]:
    """Fetch the catalog's gap queue — capabilities the dream claims
    but the system hasn't delivered."""
    url = f"{api_base.rstrip('/')}/gaps?limit=500"
    items = _http_get_json(url, timeout_s=timeout_s)
    if items is None:
        return []
    if isinstance(items, dict) and "items" in items:
        return items["items"]
    if isinstance(items, list):
        return items
    return []


def _http_get_json(url: str, timeout_s: float = 15.0):
    """GET + parse JSON. Returns None on any failure (connection,
    non-200, non-JSON). Caller decides fallback behaviour."""
    try:
        req = urllib.request.Request(
            url,
            headers={"Accept": "application/json", "User-Agent": "arq-fix-all/0.1"},
        )
        with urllib.request.urlopen(req, timeout=timeout_s) as resp:
            if resp.status != 200:
                return None
            body = resp.read().decode("utf-8", errors="replace")
            return json.loads(body)
    except (urllib.error.URLError, json.JSONDecodeError, OSError, TimeoutError):
        return None


# ---------------------------------------------------------------------
# Ranking + gating
# ---------------------------------------------------------------------


def _is_safety_class(tags: list[str] | None) -> bool:
    if not tags:
        return False
    return any(t in SAFETY_CLASS_TAGS for t in tags)


def _is_law_zero_forced(
    tags: list[str] | None,
    dream_reqs: dict | None,
    problem_tag: str | None = None,
) -> bool:
    """Should this problem force hard-tier routing regardless of flag?

    Any of the following makes it law-zero-forced:
    - problem_tag IS 'law-zero-violation' (explicit)
    - any employee tag in LAW_ZERO_FORCE_HARD_TAGS (structural)
    - dream_reqs.law_zero == true (per-capability declaration)
    """
    if problem_tag in LAW_ZERO_FORCE_HARD_TAGS:
        return True
    if tags:
        if any(t in LAW_ZERO_FORCE_HARD_TAGS for t in tags):
            return True
    if dream_reqs and dream_reqs.get("law_zero") is True:
        return True
    return False


def _cost_scalar(cost_model: dict | None) -> float:
    """Reduce a cost_model dict to a ranking scalar.

    cost_model schema is free-form across capabilities, but the
    following keys are conventional:
      - cost_per_call_usd (float) — explicit dollar cost
      - latency_p50_ms (int)       — expected latency
      - score (float, [0,1])       — pre-computed cost score (lower = cheaper)

    We prefer the explicit score when present. Otherwise fall back to
    cost_per_call_usd (clamped to [0.0001, 1.0] to keep the ratio
    meaningful). If nothing is declared we assume mid-tier cost (0.5) —
    not zero, so zero-cost employees don't automatically dominate.
    """
    if not cost_model:
        return 0.5
    if isinstance(cost_model.get("score"), (int, float)):
        return max(0.0001, float(cost_model["score"]))
    per_call = cost_model.get("cost_per_call_usd")
    if isinstance(per_call, (int, float)):
        return max(0.0001, min(1.0, float(per_call)))
    return 0.5


def _trust_scalar(cap: dict) -> float:
    """Reduce trust signals on the capability to a [0,1] scalar.

    The catalog doesn't carry a dedicated trust_score column — the
    state field is the primary trust signal (live > candidate; dormant
    / reaped filtered out before we get here). We use:
      - state == 'live'      -> 1.0
      - state == 'candidate' -> 0.5 (unproven but allowed)
      - anything else        -> 0.1 (shouldn't happen post-filter)
    """
    state = cap.get("state", "candidate")
    if state == "live":
        return 1.0
    if state == "candidate":
        return 0.5
    return 0.1


def rank_candidates(candidates: list[dict]) -> list[dict]:
    """Sort candidates by trust_score / cost, descending.

    Safety-class employees are always pinned to the top — they run
    first regardless of cost/trust so they can veto before fan-out.
    """
    def score(cap: dict) -> tuple[int, float]:
        tags = cap.get("tags") or []
        safety_rank = 1 if _is_safety_class(tags) else 0
        trust = _trust_scalar(cap)
        cost = _cost_scalar(cap.get("cost_model"))
        return (safety_rank, trust / cost)

    return sorted(candidates, key=score, reverse=True)


# ---------------------------------------------------------------------
# Dispatch — per probe_spec.kind
# ---------------------------------------------------------------------


def dispatch_one(
    cap: dict,
    problem: str,
    problem_meta: dict,
    action_tier: str,
    timeout_s: float = 60.0,
) -> dict:
    """Dispatch one employee per its probe_spec.kind.

    Returns a result dict with:
      - capability: capability name
      - outcome: 'ok' / 'error' / 'timeout' / 'unsupported_kind' / 'skipped'
      - status_code: HTTP status or CLI exit code (None if not applicable)
      - latency_ms: wall-clock latency for the call
      - response_preview: last 500 chars of response body / stdout
      - response_hash: sha256 of full response
    """
    t0 = time.monotonic()
    probe_spec = cap.get("probe_spec") or {}
    kind = probe_spec.get("kind") or probe_spec.get("type", "http_get")
    payload = {
        "problem": problem,
        "problem_meta": problem_meta,
        "action_tier": action_tier,
        "actor_peer": ACTOR_PEER,
    }

    try:
        if kind in ("http", "http_get", "http_post"):
            result = _dispatch_http(probe_spec, payload, timeout_s)
        elif kind == "command":
            result = _dispatch_command(probe_spec, payload, timeout_s)
        elif kind in ("unix-socket", "unix_socket"):
            result = _dispatch_unix_socket(probe_spec, payload, timeout_s)
        elif kind == "mcp":
            result = _dispatch_mcp(probe_spec, payload, timeout_s)
        else:
            result = {
                "outcome": "unsupported_kind",
                "status_code": None,
                "response_preview": f"probe_spec.kind='{kind}' not supported",
                "response_hash": None,
            }
    except Exception as exc:  # noqa: BLE001 — dispatcher must never raise
        result = {
            "outcome": "error",
            "status_code": None,
            "response_preview": f"dispatcher exception: {type(exc).__name__}: {exc}"[:500],
            "response_hash": None,
        }

    result["capability"] = cap.get("name", "<unknown>")
    result["latency_ms"] = int((time.monotonic() - t0) * 1000)
    return result


def _dispatch_http(
    probe_spec: dict, payload: dict, timeout_s: float
) -> dict:
    """HTTP dispatch — POST the payload to the declared URL."""
    url = probe_spec.get("url") or probe_spec.get("endpoint")
    if not url:
        return {
            "outcome": "error",
            "status_code": None,
            "response_preview": "probe_spec missing url/endpoint",
            "response_hash": None,
        }
    method = probe_spec.get("method", "POST").upper()
    body = json.dumps(payload).encode("utf-8")  # noqa: ARQ-NO-JSON-HOT-PATH vendor HTTP probe wire format (Content-Type: application/json)
    try:
        req = urllib.request.Request(
            url,
            data=body if method == "POST" else None,
            headers={
                "Content-Type": "application/json",
                "Accept": "application/json",
                "User-Agent": "arq-fix-all/0.1",
                "X-ARQ-Actor-Peer": ACTOR_PEER,
            },
            method=method,
        )
        with urllib.request.urlopen(req, timeout=timeout_s) as resp:
            raw = resp.read().decode("utf-8", errors="replace")
            return {
                "outcome": "ok" if 200 <= resp.status < 300 else "error",
                "status_code": resp.status,
                "response_preview": raw[-500:],
                "response_hash": _sha256(raw),
            }
    except urllib.error.HTTPError as e:
        raw = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else ""
        return {
            "outcome": "error",
            "status_code": e.code,
            "response_preview": raw[-500:],
            "response_hash": _sha256(raw),
        }
    except (urllib.error.URLError, OSError, TimeoutError) as e:
        return {
            "outcome": "timeout" if "timed out" in str(e).lower() else "error",
            "status_code": None,
            "response_preview": str(e)[:500],
            "response_hash": None,
        }


def _dispatch_command(
    probe_spec: dict, payload: dict, timeout_s: float
) -> dict:
    """Command dispatch — run the declared CLI via arq-cli-exec so the
    call itself is mesh-governed + whitelisted.

    probe_spec must contain:
      - cli: tool name (e.g., 'aws', 'kubectl', 'gh')
      - args: list of args (will be appended verbatim)
    Payload is JSON-serialised and passed via env var ARQ_FIX_ALL_PAYLOAD
    so the receiving command can read structured context without
    depending on its own stdin parser. (Commands without payload support
    will ignore the env var.)
    """
    cli = probe_spec.get("cli")
    args = probe_spec.get("args") or []
    if not cli or not isinstance(args, list):
        return {
            "outcome": "error",
            "status_code": None,
            "response_preview": "probe_spec command requires cli + args list",
            "response_hash": None,
        }
    arq_cli_exec = Path(__file__).parent / "arq-cli-exec"
    if not arq_cli_exec.exists():
        return {
            "outcome": "error",
            "status_code": None,
            "response_preview": "arq-cli-exec not found beside arq-fix-all",
            "response_hash": None,
        }
    env = {**os.environ, "ARQ_FIX_ALL_PAYLOAD": json.dumps(payload)}  # noqa: ARQ-NO-JSON-HOT-PATH arq-cli-exec env-var boundary — receiving CLI parses JSON from env
    try:
        r = subprocess.run(
            [str(arq_cli_exec), cli, *args],
            capture_output=True, text=True, timeout=timeout_s, check=False, env=env,
        )
        combined = (r.stdout or "") + (r.stderr or "")
        return {
            "outcome": "ok" if r.returncode == 0 else "error",
            "status_code": r.returncode,
            "response_preview": combined[-500:],
            "response_hash": _sha256(combined),
        }
    except subprocess.TimeoutExpired:
        return {
            "outcome": "timeout",
            "status_code": 124,
            "response_preview": f"arq-cli-exec timeout after {timeout_s}s",
            "response_hash": None,
        }


def _dispatch_unix_socket(
    probe_spec: dict, payload: dict, timeout_s: float
) -> dict:
    """Unix-socket dispatch — POST the payload to the declared socket +
    path via curl.

    probe_spec must contain:
      - socket: filesystem path to the unix socket
      - path: HTTP path to hit (e.g., '/v1/invoke')
    """
    socket_path = probe_spec.get("socket")
    path = probe_spec.get("path", "/")
    if not socket_path:
        return {
            "outcome": "error",
            "status_code": None,
            "response_preview": "probe_spec unix-socket requires 'socket'",
            "response_hash": None,
        }
    if not Path(socket_path).exists():
        return {
            "outcome": "error",
            "status_code": None,
            "response_preview": f"socket not found: {socket_path}",
            "response_hash": None,
        }
    body = json.dumps(payload)  # noqa: ARQ-NO-JSON-HOT-PATH curl unix-socket vendor wire format (Content-Type: application/json)
    try:
        r = subprocess.run(
            [
                "curl", "-s", "--unix-socket", socket_path,
                "-H", "Content-Type: application/json",
                "-d", body,
                f"http://local{path}",
            ],
            capture_output=True, text=True, timeout=timeout_s, check=False,
        )
        return {
            "outcome": "ok" if r.returncode == 0 else "error",
            "status_code": r.returncode,
            "response_preview": (r.stdout or r.stderr)[-500:],
            "response_hash": _sha256(r.stdout or ""),
        }
    except subprocess.TimeoutExpired:
        return {
            "outcome": "timeout",
            "status_code": 124,
            "response_preview": f"curl unix-socket timeout after {timeout_s}s",
            "response_hash": None,
        }


def _dispatch_mcp(
    probe_spec: dict, payload: dict, timeout_s: float
) -> dict:
    """MCP dispatch — forward the payload to mcp.arqera.io/jsonrpc via
    the mesh-adapter. Requires a Cloudflare Access service token env
    var (ARQ_MCP_ACCESS_TOKEN) since the MCP endpoint is Access-
    protected. Without the token we emit a clear unsupported result
    rather than silently failing."""
    access_token = os.environ.get("ARQ_MCP_ACCESS_TOKEN")
    if not access_token:
        return {
            "outcome": "unsupported_kind",
            "status_code": None,
            "response_preview": (
                "MCP dispatch requires ARQ_MCP_ACCESS_TOKEN (Cloudflare "
                "Access service token). Logged as follow-up."
            ),
            "response_hash": None,
        }
    url = probe_spec.get("url") or "https://mcp.arqera.io/jsonrpc"
    method = probe_spec.get("mcp_method", "tools/call")
    body = json.dumps({
        "jsonrpc": "2.0",
        "id": 1,
        "method": method,
        "params": payload,
    }).encode("utf-8")
    try:
        req = urllib.request.Request(
            url, data=body,
            headers={
                "Content-Type": "application/json",
                "User-Agent": "arq-fix-all/0.1",
                "CF-Access-Client-Id": access_token.split(":", 1)[0] if ":" in access_token else access_token,
                "CF-Access-Client-Secret": access_token.split(":", 1)[1] if ":" in access_token else "",
            },
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=timeout_s) as resp:
            raw = resp.read().decode("utf-8", errors="replace")
            return {
                "outcome": "ok" if 200 <= resp.status < 300 else "error",
                "status_code": resp.status,
                "response_preview": raw[-500:],
                "response_hash": _sha256(raw),
            }
    except (urllib.error.URLError, OSError, TimeoutError) as e:
        return {
            "outcome": "error",
            "status_code": None,
            "response_preview": f"MCP dispatch failed: {e}"[:500],
            "response_hash": None,
        }


# ---------------------------------------------------------------------
# Orchestration — safety gate -> rank -> dispatch -> compose
# ---------------------------------------------------------------------


def run_safety_gate(
    candidates: list[dict],
    problem: str,
    problem_meta: dict,
    dispatch_timeout_s: float,
) -> tuple[bool, list[dict]]:
    """Consult safety-class employees BEFORE fan-out.

    Returns (permitted, safety_results). If any safety employee
    responded with outcome='error' OR status_code >= 400 OR
    response_preview contains a veto-marker, we halt (return False).

    This runs sequentially (not in parallel) — one veto is enough to
    halt, so we gain nothing by racing them and may waste latency.
    """
    safety = [c for c in candidates if _is_safety_class(c.get("tags"))]
    results: list[dict] = []
    if not safety:
        return (True, results)

    for cap in safety:
        res = dispatch_one(
            cap, problem, problem_meta,
            action_tier="hard",  # safety always runs hard-tier
            timeout_s=dispatch_timeout_s,
        )
        results.append(res)
        preview = (res.get("response_preview") or "").lower()
        vetoed = (
            res.get("outcome") == "error"
            or (res.get("status_code") or 0) >= 400
            or "veto" in preview
            or "law_zero_violation" in preview
            or "halt" in preview
        )
        if vetoed:
            return (False, results)
    return (True, results)


def dispatch_fan_out(
    candidates: list[dict],
    problem: str,
    problem_meta: dict,
    action_tier: str,
    parallelism: int,
    dispatch_timeout_s: float,
    on_dispatch: callable = None,
) -> list[dict]:
    """Dispatch non-safety candidates with bounded parallelism.

    on_dispatch is an optional callback invoked with (cap, result) after
    each dispatch completes — used by the runner to emit per-employee
    substrate acts as they finish rather than all at end.
    """
    non_safety = [c for c in candidates if not _is_safety_class(c.get("tags"))]
    if not non_safety:
        return []

    results: list[dict] = []
    with concurrent.futures.ThreadPoolExecutor(
        max_workers=max(1, parallelism)
    ) as executor:
        futures = {
            executor.submit(
                dispatch_one, cap, problem, problem_meta,
                action_tier, dispatch_timeout_s,
            ): cap
            for cap in non_safety
        }
        for fut in concurrent.futures.as_completed(futures):
            cap = futures[fut]
            try:
                res = fut.result()
            except Exception as exc:  # noqa: BLE001
                res = {
                    "capability": cap.get("name", "<unknown>"),
                    "outcome": "error",
                    "status_code": None,
                    "latency_ms": 0,
                    "response_preview": f"future raised: {exc}"[:500],
                    "response_hash": None,
                }
            results.append(res)
            if on_dispatch is not None:
                try:
                    on_dispatch(cap, res)
                except Exception:  # noqa: BLE001 — callback MUST NOT halt fan-out
                    pass
    return results


def compose_outcome(
    problem: str,
    task_class: str,
    action_tier: str,
    safety_results: list[dict],
    dispatch_results: list[dict],
) -> dict:
    """Merge all results into a single outcome summary."""
    all_results = safety_results + dispatch_results
    ok = sum(1 for r in all_results if r.get("outcome") == "ok")
    errors = sum(1 for r in all_results if r.get("outcome") == "error")
    timeouts = sum(1 for r in all_results if r.get("outcome") == "timeout")
    unsupported = sum(1 for r in all_results if r.get("outcome") == "unsupported_kind")
    return {
        "problem": problem,
        "task_class": task_class,
        "action_tier": action_tier,
        "dispatched": len(all_results),
        "ok": ok,
        "errors": errors,
        "timeouts": timeouts,
        "unsupported_kind": unsupported,
        "safety_results": safety_results,
        "dispatch_results": dispatch_results,
        "verdict": (
            "resolved" if errors == 0 and timeouts == 0 and ok > 0
            else "partial" if ok > 0
            else "failed"
        ),
    }


# ---------------------------------------------------------------------
# Top-level runner
# ---------------------------------------------------------------------


def run_for_problem(
    problem: str,
    explicit_tag: str | None,
    action_tier_flag: str,
    parallelism: int,
    dry_run: bool,
    confirm: bool,
    confirm_threshold: int,
    api_base: str,
    dispatch_timeout_s: float = 60.0,
    stream: callable = None,
) -> dict:
    """End-to-end: classify -> query -> rank -> safety -> fan-out -> compose.

    stream is an optional callable invoked with status strings; used by
    the CLI frontend to print progress without coupling logic + UI.
    """
    def log(msg: str):
        if stream:
            stream(msg)

    task_class, problem_meta = classify_problem(problem, explicit_tag)
    ts = _ts_ms()
    ref = f"{_slug(problem)[:60]}-{ts}"

    _emit("act", "fix_all_started", ref, {
        "problem": problem,
        "task_class": task_class,
        "explicit_tag": explicit_tag,
        "action_tier_flag": action_tier_flag,
        "actor_peer": ACTOR_PEER,
        "ts_ms": ts,
    })

    log(f"[classify] problem={problem} -> task_class={task_class}")

    candidates = query_capabilities(task_class, api_base)
    if not candidates:
        log(f"[query] 0 candidates for task_class={task_class}")
        outcome = {
            "problem": problem,
            "task_class": task_class,
            "action_tier": action_tier_flag,
            "dispatched": 0,
            "verdict": "no_candidates",
        }
        _emit("act", "fix_all_rejected", ref, {
            "reason": "no_candidates",
            "task_class": task_class,
            "actor_peer": ACTOR_PEER,
            "ts_ms": ts,
        })
        return outcome

    ranked = rank_candidates(candidates)
    log(f"[rank] {len(ranked)} candidates after rank "
        f"(top={ranked[0].get('name') if ranked else None})")

    # Law-Zero force-hard check: if ANY candidate's tags or dream_reqs
    # match the hard-tier force list, OR explicit problem_tag is law-zero,
    # override action_tier to 'hard' regardless of flag.
    action_tier = action_tier_flag
    force_hard = any(
        _is_law_zero_forced(c.get("tags"), c.get("dream_requirements"), explicit_tag)
        for c in ranked
    )
    if force_hard and action_tier != "hard":
        log(f"[law-zero] forcing action_tier=hard (was {action_tier})")
        action_tier = "hard"
    elif action_tier == "auto":
        # Auto: hard if any safety candidate, soft otherwise.
        has_safety = any(_is_safety_class(c.get("tags")) for c in ranked)
        action_tier = "hard" if has_safety else "soft"
        log(f"[action-tier] auto resolved to {action_tier}")

    # Confirm-threshold check.
    if len(ranked) > confirm_threshold and not confirm and not dry_run:
        log(f"[confirm] would dispatch {len(ranked)} employees — "
            f"exceeds {confirm_threshold}. Pass --confirm to proceed, "
            f"or --dry-run to preview.")
        _emit("act", "fix_all_rejected", ref, {
            "reason": "confirm_required",
            "candidate_count": len(ranked),
            "threshold": confirm_threshold,
            "actor_peer": ACTOR_PEER,
            "ts_ms": ts,
        })
        return {
            "problem": problem,
            "task_class": task_class,
            "action_tier": action_tier,
            "dispatched": 0,
            "verdict": "confirm_required",
            "candidate_count": len(ranked),
            "threshold": confirm_threshold,
        }

    if dry_run:
        plan = [
            {
                "capability": c.get("name"),
                "address": c.get("address"),
                "kind": (c.get("probe_spec") or {}).get("kind")
                        or (c.get("probe_spec") or {}).get("type"),
                "is_safety": _is_safety_class(c.get("tags")),
                "tags": c.get("tags"),
            }
            for c in ranked
        ]
        log(f"[dry-run] would dispatch {len(plan)} employees "
            f"(action_tier={action_tier}, parallelism={parallelism})")
        return {
            "problem": problem,
            "task_class": task_class,
            "action_tier": action_tier,
            "verdict": "dry_run",
            "dispatched": 0,
            "plan": plan,
        }

    # Safety gate — sequential.
    permitted, safety_results = run_safety_gate(
        ranked, problem, problem_meta, dispatch_timeout_s,
    )
    for res in safety_results:
        cap_name = res.get("capability", "<unknown>")
        _emit("act", "fix_all_dispatched", f"{ref}-{_slug(cap_name)}", {
            "problem": problem, "capability": cap_name,
            "action_tier": "hard", "safety_class": True,
            "actor_peer": ACTOR_PEER, "ts_ms": _ts_ms(),
        })
        _emit("act", "fix_all_response", f"{ref}-{_slug(cap_name)}", {
            "problem": problem, "capability": cap_name,
            "outcome": res.get("outcome"),
            "status_code": res.get("status_code"),
            "latency_ms": res.get("latency_ms"),
            "response_hash": res.get("response_hash"),
            "actor_peer": ACTOR_PEER, "ts_ms": _ts_ms(),
        })

    if not permitted:
        log("[safety-gate] HALTED — safety-class employee vetoed this run")
        _emit("act", "fix_all_halted_by_safety", ref, {
            "problem": problem,
            "safety_results": safety_results,
            "actor_peer": ACTOR_PEER, "ts_ms": _ts_ms(),
        })
        return {
            "problem": problem, "task_class": task_class,
            "action_tier": action_tier, "dispatched": len(safety_results),
            "verdict": "halted_by_safety", "safety_results": safety_results,
        }

    # Fan-out dispatch.
    def _on_dispatch(cap: dict, res: dict):
        cap_name = cap.get("name", "<unknown>")
        _emit("act", "fix_all_dispatched", f"{ref}-{_slug(cap_name)}", {
            "problem": problem, "capability": cap_name,
            "action_tier": action_tier, "safety_class": False,
            "actor_peer": ACTOR_PEER, "ts_ms": _ts_ms(),
        })
        _emit("act", "fix_all_response", f"{ref}-{_slug(cap_name)}", {
            "problem": problem, "capability": cap_name,
            "outcome": res.get("outcome"),
            "status_code": res.get("status_code"),
            "latency_ms": res.get("latency_ms"),
            "response_hash": res.get("response_hash"),
            "actor_peer": ACTOR_PEER, "ts_ms": _ts_ms(),
        })

    dispatch_results = dispatch_fan_out(
        ranked, problem, problem_meta, action_tier,
        parallelism, dispatch_timeout_s,
        on_dispatch=_on_dispatch,
    )

    outcome = compose_outcome(
        problem, task_class, action_tier,
        safety_results, dispatch_results,
    )

    _emit("act", "fix_all_outcome", ref, {
        **outcome,
        "actor_peer": ACTOR_PEER, "ts_ms": _ts_ms(),
    })

    log(
        f"[compose] verdict={outcome['verdict']} ok={outcome['ok']} "
        f"errors={outcome['errors']} timeouts={outcome['timeouts']}"
    )
    return outcome


def _slug(s: str) -> str:
    """Filesystem-safe slug for substrate-reference IDs."""
    out = []
    for ch in s:
        if ch.isalnum() or ch in "-_.":
            out.append(ch)
        else:
            out.append("-")
    return "".join(out)[:80]


# ---------------------------------------------------------------------
# CLI frontend
# ---------------------------------------------------------------------


def main() -> int:
    ap = argparse.ArgumentParser(
        prog="arq-fix-all",
        description=(
            "Turn the capability catalog into action. Given a problem "
            "(address, tag, or --all-gaps), rank the catalog's capable "
            "employees, dispatch per probe_spec.kind, compose outcomes, "
            "emit substrate acts."
        ),
        epilog=(
            "Examples:\n"
            "  arq-fix-all arq://act/peer_wedged/dgx-spark --dry-run\n"
            "  arq-fix-all --tag ci-failure --parallelism 3\n"
            "  arq-fix-all --all-gaps --dry-run\n"
            "  arq-fix-all arq://act/law_zero_violation/xyz  (forces hard-tier)\n"
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    ap.add_argument(
        "problem", nargs="?",
        help="problem address (arq://...) — optional if --tag/--all-gaps",
    )
    ap.add_argument(
        "--tag", help="problem class tag (e.g., ci-failure, peer-wedge)",
    )
    ap.add_argument(
        "--all-gaps", action="store_true",
        help="enumerate gaps via twin capability gaps + attempt each",
    )
    ap.add_argument(
        "--dry-run", action="store_true",
        help="show dispatch plan without executing",
    )
    ap.add_argument(
        "--parallelism", type=int, default=5,
        help="max concurrent employee invocations (default 5)",
    )
    ap.add_argument(
        "--law-zero-tier", choices=["hard", "soft", "auto"], default="auto",
        dest="action_tier_flag",
        help="action-tier override (default auto)",
    )
    ap.add_argument(
        "--confirm", action="store_true",
        help=f"required when >{CONFIRM_THRESHOLD_DEFAULT} candidates would dispatch",
    )
    ap.add_argument(
        "--confirm-threshold", type=int, default=CONFIRM_THRESHOLD_DEFAULT,
        help="blast-radius threshold (default 10)",
    )
    ap.add_argument(
        "--capability-api", default=DEFAULT_CAPABILITY_API,
        help=f"catalog API base (default {DEFAULT_CAPABILITY_API})",
    )
    ap.add_argument(
        "--timeout", type=float, default=60.0,
        help="per-dispatch timeout in seconds (default 60)",
    )
    ap.add_argument(
        "--json", action="store_true", dest="json_out",
        help="emit the outcome as JSON on stdout",
    )

    args = ap.parse_args()

    if not args.problem and not args.tag and not args.all_gaps:
        ap.print_help(sys.stderr)
        sys.stderr.write(
            "\narq-fix-all: provide a problem address, --tag, or --all-gaps\n"
        )
        return 2

    def stream(msg: str):
        if not args.json_out:
            sys.stderr.write(f"{msg}\n")

    outcomes: list[dict] = []

    if args.all_gaps:
        stream("[gaps] enumerating capability gaps...")
        gaps = query_gaps(args.capability_api)
        stream(f"[gaps] {len(gaps)} gaps to attempt")
        for gap in gaps:
            gap_address = gap.get("address") or f"arq://doc/capability/{gap.get('name','unknown')}"
            outcome = run_for_problem(
                problem=gap_address,
                explicit_tag=None,
                action_tier_flag=args.action_tier_flag,
                parallelism=args.parallelism,
                dry_run=args.dry_run,
                confirm=args.confirm,
                confirm_threshold=args.confirm_threshold,
                api_base=args.capability_api,
                dispatch_timeout_s=args.timeout,
                stream=stream,
            )
            outcomes.append(outcome)
    else:
        problem = args.problem or f"arq://tag/{args.tag}"
        outcome = run_for_problem(
            problem=problem,
            explicit_tag=args.tag,
            action_tier_flag=args.action_tier_flag,
            parallelism=args.parallelism,
            dry_run=args.dry_run,
            confirm=args.confirm,
            confirm_threshold=args.confirm_threshold,
            api_base=args.capability_api,
            dispatch_timeout_s=args.timeout,
            stream=stream,
        )
        outcomes.append(outcome)

    # Output — JSON mode prints structured result; human mode prints
    # a short summary (detail is in stderr stream + substrate acts).
    if args.json_out:
        print(json.dumps(outcomes if args.all_gaps else outcomes[0], indent=2))
    else:
        # Plain summary. No emoji, no decoration.
        for o in outcomes:
            print(
                f"problem={o.get('problem')} "
                f"verdict={o.get('verdict')} "
                f"dispatched={o.get('dispatched', 0)} "
                f"ok={o.get('ok', 0)} "
                f"errors={o.get('errors', 0)} "
                f"timeouts={o.get('timeouts', 0)}"
            )

    # Exit code:
    #   0 — at least one outcome verdict in {dry_run, resolved}
    #   1 — any outcome failed or halted_by_safety
    #   2 — invalid invocation (handled earlier)
    any_failed = any(
        o.get("verdict") in ("failed", "halted_by_safety")
        for o in outcomes
    )
    return 1 if any_failed else 0


if __name__ == "__main__":
    sys.exit(main())
