#!/usr/bin/env python3
"""arq-call — universal envelope dispatcher for any registered worker.

Reads the worker-verb catalogue (arq://doc/worker-verb-catalogue-v1),
picks the right adapter by worker name + verb, dispatches via HTTP,
and emits:

  1. arq://act/envelope_sent/<verb>-<ts>      — pre-dispatch
  2. arq://act/envelope_ack/<verb>-<ts>       — post-dispatch (+outcome)
  3. arq://act/benchmark_sample/<op>-<ts>     — feeds Epic B ratchet
  4. arq://act/usage_sample/<op>-<ts>         — feeds Epic J cost router +
                                                DPO pair projector
  5. arq://act/cost_meter_emitted/<op>-<ts>   — feeds economics
                                                reconciliation (worker_address
                                                + tokens + latency, no USD
                                                computed; consumer joins with
                                                WorkerSpec.cost_profile)

Closes the Epic H.3 goal AND instruments for Epic J (data utilisation)
in one pass. Every call teaches the system more about itself.

v0.1 scope: OpenAI-compat providers (majority of the 20 LLM workers).
Native shapes (anthropic messages, minimax native) return a "use
per-platform wrapper" error in v0.1; covered in H.4 adapter breadth.

Usage:
  arq-call <worker> <verb> [--payload '<json>']
  arq-call modal-ara-agent chat.completions \\
    --payload '{"model":"ara-agent","messages":[{"role":"user","content":"hi"}]}'

Credential handling (v0.1 interim):
  Reads API keys from env vars named from the worker's control.api_key_from
  field ("vault.X" → env ARQ_VAULT_X). Version B (Epic H.2B, #3253)
  replaces this with the actor-scoped backend resolver.
"""

from __future__ import annotations

import argparse
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path

# Layer 2 (2026-04-23): daemon.act.emit routes every act through the
# daemon's in-memory Ed25519 signing key instead of spawning a twin
# subprocess (which touched the macOS keychain on every call). The
# fallback preserves backwards-compat with older daemons that don't
# register the verb — they return "unknown_verb", the wrapper falls
# back to the subprocess path. Migration path: soak on the two paths
# until daemon.act.emit is ubiquitous, then delete the subprocess
# fallback per arq://doc/principle/migrate-soak-delete-no-half-shipped-v1.
sys.path.insert(0, str(Path(__file__).parent))
try:
    from _arq_primitive import primitive_invoke as _primitive_invoke  # type: ignore
except ImportError:
    _primitive_invoke = None  # type: ignore

TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
ADDRESSING_BASE = os.environ.get("ARQERA_ADDRESSING_BASE", "https://addressing.arqera.io")
ACTOR_PEER = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)


def _utc_ts_ms() -> int:
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def _emit_act_via_daemon(class_: str, type_: str, reference: str, payload: dict) -> tuple[str, str | None]:
    """Try the daemon.act.emit verb.

    Returns (outcome, address):
      - ("ok", "arq://...")     — daemon signed + projected successfully
      - ("unknown_verb", None)  — daemon is older; caller should fall back
      - ("unavailable", None)   — daemon socket missing / opt-out; fall back
      - ("error", None)         — daemon reached but upstream (addressing)
                                  failed; caller should surface (no fallback
                                  — addressing is authoritative about its
                                  own state; a fallback would mask the bug)
    """
    if _primitive_invoke is None:
        return ("unavailable", None)
    try:
        outcome, result = _primitive_invoke(
            "daemon.act.emit",
            {
                "class": class_,
                "type": type_,
                "reference": reference,
                "payload": payload,
            },
            ACTOR_PEER,
            timeout_s=10.0,
        )
    except Exception:
        return ("unavailable", None)
    if outcome == "ok" and isinstance(result, dict):
        return ("ok", result.get("address"))
    if outcome == "unknown_verb":
        return ("unknown_verb", None)
    if outcome == "error":
        return ("error", None)
    return ("unavailable", None)


def _emit_act_via_twin_subprocess(class_: str, type_: str, reference: str, payload: dict) -> str | None:
    """Legacy path: shell out to `twin --use-keychain act emit`.

    Kept for backwards-compat with daemons that don't implement
    daemon.act.emit yet. Each call touches the macOS keychain ACL —
    on ACL-mismatched Macs (pre-Layer-1) this is the prompt-storm
    source; even post-Layer-1 it costs ~50-200ms per emission. Will
    be deleted once daemon.act.emit is ubiquitous (soak-then-delete).
    """
    if not TWIN_CLI.exists():
        return None
    try:
        result = subprocess.run(
            [
                str(TWIN_CLI),
                "--use-keychain",
                "act",
                "emit",
                class_,
                type_,
                reference,
                "--payload",
                json.dumps(payload),  # noqa: ARQ-NO-JSON-HOT-PATH twin CLI input boundary — twin accepts --payload <json>
            ],
            capture_output=True,
            text=True,
            timeout=5,
            check=False,
        )
        if result.returncode == 0:
            for line in result.stdout.splitlines():
                line = line.strip()
                if line.startswith("arq://"):
                    return line
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


def _emit_act(class_: str, type_: str, reference: str, payload: dict) -> str | None:
    """Fire-and-forget substrate act emission.

    Tries daemon.act.emit first (in-memory signing, zero subprocess
    spawns, zero keychain prompts) and falls back to the twin
    subprocess path on unknown_verb / unavailable. On daemon-reported
    "error" the fallback is skipped: the addressing service is
    authoritative, and masking its state would hide bugs.
    """
    outcome, address = _emit_act_via_daemon(class_, type_, reference, payload)
    if outcome == "ok":
        return address
    if outcome == "error":
        return None
    # unknown_verb / unavailable → legacy path
    return _emit_act_via_twin_subprocess(class_, type_, reference, payload)


# ── v0.1 CATALOGUE ──────────────────────────────────────────────────
#
# Addressing service serves only payload previews (truncated), not full
# bodies. The canonical catalogue lives on substrate at
# arq://doc/worker-verb-catalogue-v1/ — this local copy mirrors the
# OpenAI-compat subset arq-call can actually dispatch today. Native-
# shape workers (anthropic, minimax, k8s, ssh, etc.) route through
# per-platform wrappers (arq-github, arq-kube, …) in Epic H.4.
#
# When the addressing service grows a full-body endpoint, _load_catalogue
# switches to reading from there; this inline copy becomes a cache.
_LOCAL_CATALOGUE: dict = {
    # ── Tier 0 (self-hosted Modal fine-tunes) ──────────────────────
    **{
        f"modal-{m}": {
            "verbs": ["chat.completions", "models.list"],
            "platform_api_shape": "openai-compat",
            "lowest_level_url": "https://gashiru--arqera-inference-api.modal.run/v1",
            "control": {"api_key_from": "settings.modal_inference_api_key"},
        }
        for m in ["ara-agent", "ori-agent", "ore-judge", "communicator", "coder"]
    },
    # ── Tier 0.5 (free-tier OpenAI-compat) ─────────────────────────
    "groq-free": {
        "verbs": ["chat.completions", "models.list"],
        "platform_api_shape": "openai-compat",
        "lowest_level_url": "https://api.groq.com/openai/v1",
        "control": {"api_key_from": "vault.groq_api_key_free"},
    },
    "cerebras": {
        "verbs": ["chat.completions", "models.list"],
        "platform_api_shape": "openai-compat",
        "lowest_level_url": "https://api.cerebras.ai/v1",
        "control": {"api_key_from": "vault.cerebras_api_key"},
    },
    "sambanova": {
        "verbs": ["chat.completions", "models.list"],
        "platform_api_shape": "openai-compat",
        "lowest_level_url": "https://api.sambanova.ai/v1",
        "control": {"api_key_from": "vault.sambanova_api_key"},
    },
    "mistral-free": {
        "verbs": ["chat.completions", "models.list", "embeddings.create"],
        "platform_api_shape": "openai-compat",
        "lowest_level_url": "https://api.mistral.ai/v1",
        "control": {"api_key_from": "vault.mistral_api_key_free"},
    },
    # ── Tier 1 (budget paid) ───────────────────────────────────────
    "groq-paid": {
        "verbs": ["chat.completions", "models.list"],
        "platform_api_shape": "openai-compat",
        "lowest_level_url": "https://api.groq.com/openai/v1",
        "control": {"api_key_from": "vault.groq_api_key_paid"},
    },
    "deepseek": {
        "verbs": ["chat.completions", "models.list"],
        "platform_api_shape": "openai-compat",
        "lowest_level_url": "https://api.deepseek.com/v1",
        "control": {"api_key_from": "vault.deepseek_api_key"},
    },
    # ── Tier 2 ─────────────────────────────────────────────────────
    "mistral-paid": {
        "verbs": ["chat.completions", "models.list", "embeddings.create"],
        "platform_api_shape": "openai-compat",
        "lowest_level_url": "https://api.mistral.ai/v1",
        "control": {"api_key_from": "vault.mistral_api_key_paid"},
    },
    "openrouter": {
        "verbs": ["chat.completions", "models.list"],
        "platform_api_shape": "openai-compat",
        "lowest_level_url": "https://openrouter.ai/api/v1",
        "control": {"api_key_from": "vault.openrouter_api_key"},
    },
    # ── Tier 3 (shape=native; require per-platform wrapper) ────────
    "claude-anthropic": {
        "verbs": ["messages.create", "models.list"],
        "platform_api_shape": "native-anthropic",
        "lowest_level_url": "https://api.anthropic.com/v1",
        "control": {"api_key_from": "vault.anthropic_api_key_production"},
    },
    "openai": {
        "verbs": ["chat.completions", "embeddings.create", "models.list"],
        "platform_api_shape": "native-openai",
        "lowest_level_url": "https://api.openai.com/v1",
        "control": {"api_key_from": "vault.openai_api_key"},
    },
}


def _load_catalogue() -> dict:
    """v0.1 — local catalogue. Follow-up: read from substrate when
    addressing service exposes full payload bodies."""
    return _LOCAL_CATALOGUE


def _resolve_credential(worker_meta: dict, verb: str) -> str | None:
    """v0.1 — read API key from env. Version B (#3253) will call the
    actor-scoped backend resolver instead."""
    api_key_from = (worker_meta.get("control") or {}).get("api_key_from", "")
    if not api_key_from:
        return None
    # "vault.groq_api_key_free" → env ARQ_VAULT_GROQ_API_KEY_FREE
    if api_key_from.startswith("vault."):
        env_name = "ARQ_VAULT_" + api_key_from[len("vault.") :].upper()
        return os.environ.get(env_name)
    if api_key_from == "gh auth token":
        try:
            r = subprocess.run(["gh", "auth", "token"], capture_output=True, text=True, check=True, timeout=5)
            return r.stdout.strip()
        except Exception:
            return None
    if api_key_from.startswith("settings."):
        env_name = api_key_from[len("settings.") :].upper()
        return os.environ.get(env_name)
    return None


def _dispatch_openai_compat(
    worker: str, worker_meta: dict, verb: str, payload: dict, api_key: str | None
) -> tuple[int, dict | None, str]:
    """OpenAI-compat dispatch path — the majority of registered LLM workers."""
    base = worker_meta.get("lowest_level_url", "").rstrip("/")
    path = {
        "chat.completions": "/chat/completions",
        "models.list": "/models",
        "embeddings.create": "/embeddings",
    }.get(verb)
    if path is None:
        return 0, None, f"verb {verb!r} not in OpenAI-compat mapping"

    url = base + path
    method = "GET" if verb == "models.list" else "POST"
    data = json.dumps(payload).encode("utf-8") if method == "POST" and payload else None  # noqa: ARQ-NO-JSON-HOT-PATH OpenAI-compat vendor wire format
    headers = {
        "Accept": "application/json",
        "User-Agent": f"arq-call/0.1",
    }
    if api_key:
        headers["Authorization"] = f"Bearer {api_key}"
    if data is not None:
        headers["Content-Type"] = "application/json"

    req = urllib.request.Request(url, method=method, data=data, headers=headers)
    try:
        with urllib.request.urlopen(req, timeout=60) as resp:
            body = resp.read().decode("utf-8") or "{}"
            return resp.status, json.loads(body) if body.strip() else None, ""
    except urllib.error.HTTPError as exc:
        raw = exc.read().decode("utf-8") if exc.fp else ""
        try:
            parsed = json.loads(raw) if raw.strip() else None
        except json.JSONDecodeError:
            parsed = None
        return exc.code, parsed, raw[:500]
    except (urllib.error.URLError, TimeoutError, OSError) as exc:
        return 0, None, f"{type(exc).__name__}: {exc}"


def _emit_telemetry(
    worker: str,
    verb: str,
    ts_ms: int,
    latency_ms: int,
    status: int,
    sent_addr: str | None,
    payload: dict,
    response: dict | None,
) -> None:
    """Emit ack + benchmark + usage samples (J.1/J.4-ready)."""
    op = f"{worker}-{verb.replace('.', '-')}"
    ack_ref = f"{op}-{ts_ms}"

    # Envelope ack — universal dispatch record
    _emit_act(
        "act",
        "envelope_ack",
        ack_ref,
        {
            "verb": verb,
            "worker": worker,
            "actor_peer": ACTOR_PEER,
            "envelope_sent": sent_addr,
            "http_status": status,
            "latency_ms": latency_ms,
            "ts_ms": ts_ms,
            "response_preview": str(response)[:200] if response else None,
        },
    )
    # Benchmark sample — Epic B consumer
    _emit_act(
        "act",
        "benchmark_sample",
        ack_ref,
        {"op": op, "latency_ms": latency_ms, "ts_ms": ts_ms, "http_status": status},
    )
    # Usage sample — Epic J cost-router + DPO projector
    usage_entry = {
        "op": op,
        "worker": worker,
        "verb": verb,
        "http_status": status,
        "success": 200 <= status < 300,
        "latency_ms": latency_ms,
        "actor_peer": ACTOR_PEER,
        "ts_ms": ts_ms,
    }
    if isinstance(response, dict):
        usage = response.get("usage") or {}
        if usage:
            usage_entry["tokens"] = {
                "prompt": usage.get("prompt_tokens"),
                "completion": usage.get("completion_tokens"),
                "total": usage.get("total_tokens"),
            }
    _emit_act("act", "usage_sample", ack_ref, usage_entry)

    # Cost meter — fills the canonical cost_meter_emitted evidence channel
    # named by WorkerSpec.cost_profile.cost_evidence_kind. The economics
    # reconciliation on 2026-05-15 found this channel empty (zero records
    # ever); without it, substrate has no canonical signal to adjudicate
    # provider spend.
    #
    # This emitter does NOT compute USD — it carries the raw call signal
    # plus worker_address attribution so downstream consumers can join
    # with WorkerSpec.cost_profile (billing_account, cost_class,
    # monthly_ceiling_usd_cents) and external pricing tables to compute
    # cost. Compute lives at consumer-side; the emitter just makes the
    # signal substrate-visible.
    #
    # Substrate evidence:
    # arq://act/substrate_truth_reconciliation/economics-runtime-evidence-2026-05-15
    cost_entry = {
        "source": "twin-arq-call",
        "op": op,
        "worker": worker,
        "worker_address": f"arq://body/worker/{worker}",
        "verb": verb,
        "http_status": status,
        "success": 200 <= status < 300,
        "latency_ms": latency_ms,
        "actor_peer": ACTOR_PEER,
        "ts_ms": ts_ms,
        "currency": "USD",
    }
    if isinstance(response, dict):
        usage = response.get("usage") or {}
        if usage:
            cost_entry["tokens"] = {
                "prompt": usage.get("prompt_tokens"),
                "completion": usage.get("completion_tokens"),
                "total": usage.get("total_tokens"),
            }
    _emit_act("act", "cost_meter_emitted", ack_ref, cost_entry)


def main() -> int:
    ap = argparse.ArgumentParser(
        prog="arq-call",
        description=(
            "Universal envelope dispatcher. Reads the substrate worker-verb "
            "catalogue and dispatches a verb to the named worker. Every "
            "call emits envelope_sent / envelope_ack / benchmark_sample / "
            "usage_sample / cost_meter_emitted acts for Epic B + J + "
            "economics reconciliation consumers."
        ),
    )
    ap.add_argument("worker", help="name of registered worker (e.g. groq-free)")
    ap.add_argument("verb", help="verb from worker's verbs list (e.g. chat.completions)")
    ap.add_argument("--payload", default="{}", help="JSON payload body")
    args = ap.parse_args()

    # Catalogue lookup
    catalogue = _load_catalogue()
    worker_meta = catalogue.get(args.worker)
    if not worker_meta:
        print(f"arq-call: worker {args.worker!r} not in catalogue", file=sys.stderr)
        return 2
    verbs = worker_meta.get("verbs") or []
    if args.verb not in verbs:
        print(
            f"arq-call: worker {args.worker!r} does not expose verb {args.verb!r}; "
            f"available: {verbs}",
            file=sys.stderr,
        )
        return 2

    try:
        payload = json.loads(args.payload)  # noqa: ARQ-NO-JSON-HOT-PATH operator CLI input boundary — --payload arg is documented JSON
    except json.JSONDecodeError as exc:
        print(f"arq-call: --payload must be JSON ({exc})", file=sys.stderr)
        return 2

    ts_ms = _utc_ts_ms()

    # Envelope sent
    sent_addr = _emit_act(
        "act",
        "envelope_sent",
        f"{args.worker}-{args.verb.replace('.', '-')}-{ts_ms}",
        {
            "worker": args.worker,
            "verb": args.verb,
            "actor_peer": ACTOR_PEER,
            "payload_size": len(args.payload),
            "ts_ms": ts_ms,
        },
    )

    # Credential
    api_key = _resolve_credential(worker_meta, args.verb)

    # Dispatch — v0.1 handles OpenAI-compat only
    shape = worker_meta.get("platform_api_shape", "")
    t0 = time.monotonic()
    if "openai-compat" in shape:
        status, response, err = _dispatch_openai_compat(
            args.worker, worker_meta, args.verb, payload, api_key
        )
    else:
        _emit_telemetry(args.worker, args.verb, ts_ms, 0, 0, sent_addr, payload, None)
        print(
            f"arq-call: platform_api_shape {shape!r} not yet handled in v0.1. "
            f"Use per-platform wrapper (e.g. arq-github) or file Epic H.4 adapter.",
            file=sys.stderr,
        )
        return 3
    latency_ms = int((time.monotonic() - t0) * 1000)

    _emit_telemetry(args.worker, args.verb, ts_ms, latency_ms, status, sent_addr, payload, response)

    if 200 <= status < 300:
        print(json.dumps(response, indent=2) if response else "(no body)")
        return 0
    print(
        f"arq-call: {args.worker}.{args.verb} → HTTP {status} ({err or 'no details'})",
        file=sys.stderr,
    )
    return 1


if __name__ == "__main__":
    # Add urllib.parse for address encoding
    import urllib.parse  # noqa: E402

    sys.exit(main())
