#!/usr/bin/env python3
"""arq-gravity-audit — operational-gravity continuous-visibility primitive.

Runs the safe read-only verb (list / usage / models-list) against every
arq-<provider> primitive and produces a reachability snapshot.

Output categories per provider:
  REACHABLE     — credential present + API responded 2xx
  NO-CRED       — no API key in env / SOPS / Nango
  HTTP-ERROR    — credential present but API returned error
  UNREACHABLE   — network / DNS / timeout
  SKIP          — substrate-side verb (no provider API to probe)

Substrate emission:
  arq://act/gravity_audit_completed/<ts> with per-provider status

Usage:
  arq-gravity-audit                  # all providers
  arq-gravity-audit --provider stripe twilio   # subset
  arq-gravity-audit --json           # JSON output
  arq-gravity-audit --emit           # emit substrate act
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import time
from datetime import UTC, datetime
from pathlib import Path

BRIDGE_DIR = Path(__file__).parent

PROBE_VERBS: dict[str, list[str]] = {
    # Per-provider primitives — safe read-only verbs
    "anthropic": ["models", "list"],
    "openai": ["key", "list"],
    "modal": ["app", "list"],
    "posthog": ["project", "list"],
    "cerebras": ["models", "list"],
    "deepseek": ["models", "list"],
    "fireworks": ["models", "list"],
    "groq": ["models", "list"],
    "huggingface": ["models", "list"],
    "mistral": ["models", "list"],
    "sambanova": ["models", "list"],
    "minimax": ["models", "list"],
    "stripe": ["balance", "get"],
    "twilio": ["message", "list"],
    "workos": ["directory", "list"],
    "sendgrid": ["stats", "get"],
    "slack": ["channel", "list"],
    "r2": ["bucket", "list"],
    "neo4j": ["health"],
    "notion": ["list"],
    "linear": ["list"],
    "hubspot": ["list"],
    "intercom": ["list"],
    "airtable": ["list"],
    "pagerduty": ["list"],
    "vercel": ["list"],
    "supabase": ["list"],
    "mixpanel": ["list"],
    "calendly": ["list"],
    "mailchimp": ["list"],
    "discord": ["list"],
    "salesforce": ["list"],
    "jira": ["list"],
    "datadog": ["list"],
    "segment": ["list"],
    "amplitude": ["list"],
    "snowflake": ["list"],
    "gitlab": ["list"],
    "zendesk": ["list"],
}

SUBSTRATE_SIDE = {
    "call", "github", "kube", "config", "cli-exec",
    "audit-all", "fix-all", "nango-proxy",
}

CLOUD_PROVIDERS = {
    "cloudflare": ["zone", "list"],
    "azure": ["resource", "list"],
    "gcp": ["gcs", "object", "list"],
    "grafana": ["dashboard", "list"],
    "sentry": ["issue", "list"],
}


def _probe(provider: str, verbs: list[str], timeout: int = 15) -> dict:
    """Run one provider probe + classify outcome."""
    script = BRIDGE_DIR / f"arq-{provider}"
    if not script.exists():
        return {"provider": provider, "status": "MISSING-PRIMITIVE", "detail": str(script)}
    start = time.monotonic()
    try:
        r = subprocess.run(
            [str(script)] + verbs,
            capture_output=True, text=True, timeout=timeout, check=False,
        )
        elapsed_ms = int((time.monotonic() - start) * 1000)
    except subprocess.TimeoutExpired:
        return {"provider": provider, "status": "UNREACHABLE", "detail": f"timeout {timeout}s"}
    except Exception as e:
        return {"provider": provider, "status": "UNREACHABLE", "detail": str(e)[:100]}

    if r.returncode == 0:
        return {"provider": provider, "status": "REACHABLE", "elapsed_ms": elapsed_ms}

    stderr = (r.stderr or "")[:200]
    if "no " in stderr.lower() and "key" in stderr.lower():
        return {"provider": provider, "status": "NO-CRED", "detail": stderr[:80]}
    if "HTTP 401" in stderr or "HTTP 403" in stderr:
        return {"provider": provider, "status": "AUTH-FAILED", "detail": stderr[:80]}
    if "HTTP" in stderr:
        return {"provider": provider, "status": "HTTP-ERROR", "detail": stderr[:80]}
    return {"provider": provider, "status": "UNKNOWN", "detail": stderr[:80]}


def _emit_substrate(audit_id: str, results: list[dict]) -> None:
    """Optional: emit gravity_audit_completed act via twin CLI."""
    try:
        # Use twin CLI if available; soft-fail if not
        payload = json.dumps({
            "audit_id": audit_id,
            "total": len(results),
            "reachable": sum(1 for r in results if r["status"] == "REACHABLE"),
            "no_cred": sum(1 for r in results if r["status"] == "NO-CRED"),
            "errors": sum(1 for r in results if r["status"] not in ("REACHABLE", "NO-CRED")),
            "results": results,
        })
        subprocess.run(
            ["twin", "--use-keychain", "act", "emit",
             "--type", "gravity_audit_completed",
             "--ref", audit_id,
             "--payload", payload],
            capture_output=True, timeout=10, check=False,
        )
    except (FileNotFoundError, subprocess.TimeoutExpired):
        pass


def _save_snapshot(path: Path, results: list[dict]) -> None:
    snapshot = {
        "captured_at": datetime.now(UTC).isoformat(),
        "results": {r["provider"]: r["status"] for r in results},
    }
    path.write_text(json.dumps(snapshot, indent=2))


def _print_diff(prev_path: Path, curr_results: list[dict]) -> None:
    prev = json.loads(prev_path.read_text())
    prev_map = prev["results"]
    curr_map = {r["provider"]: r["status"] for r in curr_results}
    all_keys = sorted(set(prev_map) | set(curr_map))
    changes = []
    for k in all_keys:
        before = prev_map.get(k, "MISSING")
        after = curr_map.get(k, "MISSING")
        if before != after:
            changes.append((k, before, after))
    print(f"\n  === DIFF vs {prev_path.name} (captured {prev['captured_at']}) ===")
    if not changes:
        print("  no changes")
        return
    print(f"  {len(changes)} status changes:")
    for k, before, after in changes:
        marker = "↑" if after == "REACHABLE" else ("↓" if before == "REACHABLE" else "→")
        print(f"    {marker} {k:<14} {before:<14} → {after}")


def main() -> int:
    p = argparse.ArgumentParser(prog="arq-gravity-audit")
    p.add_argument("--provider", nargs="*", help="probe subset (default: all)")
    p.add_argument("--json", action="store_true", help="JSON output")
    p.add_argument("--emit", action="store_true", help="emit substrate act")
    p.add_argument("--timeout", type=int, default=15)
    p.add_argument("--snapshot", help="save snapshot to this path (for later --diff)")
    p.add_argument("--diff", help="compare current probe vs snapshot at this path")
    args = p.parse_args()

    all_probes = {**PROBE_VERBS, **CLOUD_PROVIDERS}
    targets = args.provider or sorted(all_probes.keys())
    results = []

    for provider in targets:
        if provider in SUBSTRATE_SIDE:
            results.append({"provider": provider, "status": "SKIP", "detail": "substrate-side verb"})
            continue
        if provider not in all_probes:
            results.append({"provider": provider, "status": "UNKNOWN-PROVIDER"})
            continue
        results.append(_probe(provider, all_probes[provider], args.timeout))

    if args.json:
        print(json.dumps({"results": results}, indent=2))
    else:
        # Compact tabular output
        print(f"{'provider':<14} {'status':<18} detail")
        print("-" * 70)
        for r in results:
            detail = (r.get("detail") or "")[:40]
            elapsed = r.get("elapsed_ms")
            suffix = f" ({elapsed}ms)" if elapsed else ""
            print(f"{r['provider']:<14} {r['status']:<18} {detail}{suffix}")
        # Tally
        reachable = sum(1 for r in results if r["status"] == "REACHABLE")
        no_cred = sum(1 for r in results if r["status"] == "NO-CRED")
        errors = sum(1 for r in results if r["status"] not in ("REACHABLE", "NO-CRED", "SKIP"))
        print()
        print(f"  REACHABLE: {reachable}  NO-CRED: {no_cred}  ERRORS: {errors}  TOTAL: {len(results)}")

    if args.snapshot:
        _save_snapshot(Path(args.snapshot), results)
        print(f"\n  snapshot saved: {args.snapshot}")

    if args.diff:
        prev = Path(args.diff)
        if not prev.exists():
            sys.stderr.write(f"  --diff path does not exist: {prev}\n")
            return 1
        _print_diff(prev, results)

    if args.emit:
        audit_id = f"gravity-audit-{datetime.now(UTC).strftime('%Y%m%dT%H%M%SZ')}"
        _emit_substrate(audit_id, results)
        print(f"\n  substrate-emit: arq://act/gravity_audit_completed/{audit_id}")

    return 0


if __name__ == "__main__":
    sys.exit(main())
