#!/usr/bin/env python3
"""arq-cli-exec — whitelisted generic CLI dispatcher for the mesh.

Generalises the `arq-github` / `arq-kube` / `arq-gcp` pattern into a
single wrapper that can front-end any CLI whose surface is large,
stable, and read-heavy — without hand-writing 450+ lines of argparse
scaffolding per tool. Instead, each CLI contributes a whitelist entry
mapping "tool + verb" to an allowed command invocation.

Shape:

    arq-cli-exec <cli> <verb> [subverb] [args...]

Whitelist gate:
  - (cli, verb) or (cli, verb, subverb) must be in WHITELIST.
  - If not: emit arq://act/cli_dispatch_rejected/<cli>-<verb>-<ts>
    with reason="not whitelisted" and exit 1.
  - If yes:  emit cli_dispatch_sent, run subprocess, emit cli_dispatch_ack
    with response_hash, print stdout/stderr, exit with CLI's exit_code.

Alignment:
  arq://doc/principle/substrate-is-the-exchange-v1  — every CLI call
      becomes an observed exchange on the ledger, not a side door
  arq://doc/principle/every-platform-is-a-peer-v1  — AWS, Modal,
      terraform, sops, gcloud are all peers; this wrapper gives them a
      dial-tone
  arq://doc/principle/one-primitive-speaks-every-protocol-v1 — once the
      daemon learns "cli.exec" as a verb, the wrapper thins further to a
      pass-through client

v0.1 whitelist (MVP):
  aws sts get-caller-identity  — canonical zero-side-effect identity read
  aws --version                — creds-free self-check (useful for
                                  verifier when ambient creds absent)

Credentials (v0.2 — wired via CREDENTIAL_RESOLVERS):
  Each whitelisted CLI may register a credential resolver in
  CREDENTIAL_RESOLVERS. On dispatch, if a resolver exists for the CLI it
  runs BEFORE the subprocess — resolving secrets from env → SOPS →
  Keychain (mirroring the source chain in credential_reader_service)
  and returning env-var overrides to inject into the child process.

  On resolve:
    emit cli_credential_resolved  — source + masked key-id (NEVER the
                                    secret); feeds the evidence chain
  On resolve-fail:
    emit cli_credential_resolution_failed — reason + sources tried;
                                            exit 1 (do NOT fall through
                                            to cred-less subprocess)

  Verbs that explicitly don't need creds (e.g. `aws --version`) can
  opt out via CREDENTIAL_OPTIONAL (set of whitelist-key strings).

  Extensibility: add a CLI by: (a) listing its allowed verbs in
  WHITELIST, (b) registering its resolver in CREDENTIAL_RESOLVERS,
  (c) declaring cred-optional verbs in CREDENTIAL_OPTIONAL. Pattern
  is deliberately thin — one dict entry per axis. No import-time
  dependency on backend app modules (those carry structlog/sqlalchemy
  and would break standalone wrapper execution).

  Why not import credential_reader_service directly: that service lives
  under backend/app/services/ and imports structlog + sqlalchemy +
  app.core.config + evidence_service, which aren't part of a Mac
  operator's PATH. Internal cluster callers (arq-daemon sidecars) go
  via the /internal/v1/credentials/resolve HTTP endpoint; local
  operators mirror the source chain in-wrapper per
  backend/app/api/internal_credentials.py (see the "External (Mac)
  arq-daemons do NOT use this endpoint" comment there).

Emissions per call:
  arq://act/cli_dispatch_sent/<cli>-<verb>-<ts>
  arq://act/cli_dispatch_ack/<cli>-<verb>-<ts>
  arq://act/cli_dispatch_rejected/<cli>-<verb>-<ts>            (denial path)
  arq://act/cli_credential_resolved/<cli>-<verb>-<ts>          (creds found)
  arq://act/cli_credential_resolution_failed/<cli>-<verb>-<ts> (no creds)
  arq://act/benchmark_sample/cli-<cli>-<verb>-<ts>             (feeds Epic B)
"""

from __future__ import annotations

import argparse
import hashlib
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path

# Shared primitive-invoke helper (daemon.act.emit via /v1/invoke).
sys.path.insert(0, str(Path(__file__).parent))
try:
    from _arq_primitive import primitive_invoke as _primitive_invoke  # type: ignore
except ImportError:
    _primitive_invoke = None  # type: ignore

TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
ACTOR_PEER = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)


# ---------------------------------------------------------------------
# Whitelist
# ---------------------------------------------------------------------
#
# Schema: dict[cli_name, set[command_key]]
# command_key is the space-joined leading positional args (verb + subverb
# + ...) that together identify the allowed invocation. Flags like
# `--json`, `--region`, `--profile` are NOT part of the key — they can
# be appended freely at call time.
#
# Adding a new CLI is ~1 line per allowed verb. Destructive / mutating
# verbs must never be added here without a first-class wrapper that
# carries a confirmation contract (see arq-gcp.RAW_FORBIDDEN_FRAGMENTS
# for precedent).

WHITELIST: dict[str, set[str]] = {
    "aws": {
        "sts get-caller-identity",
        "--version",
    },
}


# ---------------------------------------------------------------------
# Credential resolution
# ---------------------------------------------------------------------
#
# Mirrors the source chain in backend/app/services/credential_reader_service.py
# but inline so the wrapper stays standalone (no structlog / sqlalchemy /
# FastAPI deps). Internal-cluster callers go via /internal/v1/credentials/resolve;
# Mac operators read local sources directly — matches the architecture
# comment in backend/app/api/internal_credentials.py lines 10-13.
#
# Shape of a resolver:
#   def resolve() -> tuple[dict[str, str], str] | None
#     - dict: env-var overrides to inject into the subprocess
#     - str:  source label (keychain / sops / env) for the substrate act
#     - None: creds unavailable; caller emits cli_credential_resolution_failed
#
# Whitelist entries that are creds-optional (e.g. `aws --version`) opt out
# via CREDENTIAL_OPTIONAL — resolver is not even invoked for those calls.

def _sops_file() -> Path:
    # Evaluated at call time (not import) so tests can rebase HOME.
    return Path.home() / "Desktop" / "Project" / ".secrets.yaml"


def _sops_age_key() -> Path:
    return Path.home() / ".config" / "sops" / "age" / "keys.txt"


def _mask(s: str | None) -> str:
    """Redaction-safe representation of a credential-ish string.

    Shows first 4 + last 2 chars for an AKIA-style access-key-id (not a
    secret), fully masks anything under 8 chars. Used only in substrate
    acts — raw secrets NEVER emit.
    """
    if not s:
        return "***"
    if len(s) < 8:
        return "***"
    return f"{s[:4]}...{s[-2:]}"


def _sops_extract(path_expr: str) -> str | None:
    """Extract a value from the SOPS file via sops CLI. Returns None on
    any failure (file missing, age key missing, sops not on PATH, etc.).

    `path_expr` is the standard sops --extract expression, e.g.
    '["aws"]["access_key_id"]'.
    """
    sops_file = _sops_file()
    age_key = _sops_age_key()
    if not sops_file.exists() or not age_key.exists():
        return None
    try:
        r = subprocess.run(
            ["sops", "-d", "--extract", path_expr, str(sops_file)],
            capture_output=True,
            text=True,
            timeout=10,
            check=False,
            env={**os.environ, "SOPS_AGE_KEY_FILE": str(age_key)},
        )
        if r.returncode == 0:
            v = r.stdout.strip()
            return v or None
    except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
        return None
    return None


def _keychain_generic(service: str, account: str | None = None) -> str | None:
    """Read a generic password from the macOS Keychain via `security`.

    Returns None on not-found / wrong-platform / security CLI missing.
    """
    cmd = ["security", "find-generic-password", "-s", service]
    if account:
        cmd += ["-a", account]
    cmd += ["-w"]
    try:
        r = subprocess.run(cmd, capture_output=True, text=True, timeout=5, check=False)
        if r.returncode == 0:
            v = r.stdout.strip()
            return v or None
    except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
        return None
    return None


def _resolve_aws() -> tuple[dict[str, str], str] | None:
    """Resolve AWS credentials. Mirrors credential_reader_service source chain.

    Order:
      1. Ambient env vars (AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY)
      2. SOPS  ~/Desktop/Project/.secrets.yaml under ["aws"]
      3. macOS Keychain generic-password under service="aws"
         (account="access_key_id" for the ID, "secret_access_key" for
         the secret — matches `security add-generic-password -s aws
         -a <field>` convention)

    Returns env-var overrides:
      AWS_ACCESS_KEY_ID       (required)
      AWS_SECRET_ACCESS_KEY   (required)
      AWS_SESSION_TOKEN       (optional; set only if found)
      AWS_DEFAULT_REGION      (optional; set only if found)
    """
    # 1. Ambient env — if BOTH are set, already good; report as env source.
    env_id = os.environ.get("AWS_ACCESS_KEY_ID")
    env_secret = os.environ.get("AWS_SECRET_ACCESS_KEY")
    if env_id and env_secret:
        overrides: dict[str, str] = {
            "AWS_ACCESS_KEY_ID": env_id,
            "AWS_SECRET_ACCESS_KEY": env_secret,
        }
        if (tok := os.environ.get("AWS_SESSION_TOKEN")):
            overrides["AWS_SESSION_TOKEN"] = tok
        if (region := os.environ.get("AWS_DEFAULT_REGION") or os.environ.get("AWS_REGION")):
            overrides["AWS_DEFAULT_REGION"] = region
        return (overrides, "env")

    # 2. SOPS — look under ["aws"]
    sops_id = _sops_extract('["aws"]["access_key_id"]')
    sops_secret = _sops_extract('["aws"]["secret_access_key"]')
    if sops_id and sops_secret:
        overrides = {
            "AWS_ACCESS_KEY_ID": sops_id,
            "AWS_SECRET_ACCESS_KEY": sops_secret,
        }
        if (tok := _sops_extract('["aws"]["session_token"]')):
            overrides["AWS_SESSION_TOKEN"] = tok
        if (region := _sops_extract('["aws"]["default_region"]')):
            overrides["AWS_DEFAULT_REGION"] = region
        return (overrides, "sops")

    # 3. Keychain — generic-password, service=aws
    kc_id = _keychain_generic("aws", account="access_key_id")
    kc_secret = _keychain_generic("aws", account="secret_access_key")
    if kc_id and kc_secret:
        overrides = {
            "AWS_ACCESS_KEY_ID": kc_id,
            "AWS_SECRET_ACCESS_KEY": kc_secret,
        }
        if (tok := _keychain_generic("aws", account="session_token")):
            overrides["AWS_SESSION_TOKEN"] = tok
        if (region := _keychain_generic("aws", account="default_region")):
            overrides["AWS_DEFAULT_REGION"] = region
        return (overrides, "keychain")

    return None


# Registry: CLI name -> resolver. Add a new CLI by appending one entry.
# Keep the map small and explicit; prefer a separate resolver per CLI
# over a generic one-size-fits-all (each platform has its own envelope).
CREDENTIAL_RESOLVERS: dict[str, callable] = {
    "aws": _resolve_aws,
}

# Verbs that don't need creds. Skip resolver entirely for these —
# avoids spurious "secret not reachable" rejections on version checks.
CREDENTIAL_OPTIONAL: dict[str, set[str]] = {
    "aws": {"--version"},
}


# ---------------------------------------------------------------------
# Helpers (envelope emission — copied shape from arq-kube / arq-gcp)
# ---------------------------------------------------------------------


def _ts_ms() -> int:
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def _sha256(data: str) -> str:
    return hashlib.sha256(data.encode("utf-8", errors="replace")).hexdigest()


def _emit_via_daemon(class_: str, type_: str, ref: str, payload: dict) -> tuple[str, str | None]:
    """Try daemon.act.emit. Returns (outcome, address)."""
    if _primitive_invoke is None:
        return ("unavailable", None)
    try:
        outcome, result = _primitive_invoke(
            "daemon.act.emit",
            {
                "class": class_,
                "type": type_,
                "reference": ref,
                "payload": payload,
            },
            ACTOR_PEER,
            timeout_s=10.0,
        )
    except Exception:
        return ("unavailable", None)
    if outcome == "ok" and isinstance(result, dict):
        return ("ok", result.get("address"))
    if outcome in ("unknown_verb", "error"):
        return (outcome, None)
    return ("unavailable", None)


def _emit_via_twin_subprocess(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    """Legacy fallback: keychain-touching twin subprocess."""
    if not TWIN_CLI.exists():
        return None
    try:
        r = subprocess.run(
            [
                str(TWIN_CLI), "--use-keychain", "act", "emit",
                class_, type_, ref, "--payload", json.dumps(payload),  # noqa: ARQ-NO-JSON-HOT-PATH twin CLI input boundary — twin accepts --payload <json>
            ],
            capture_output=True, text=True, timeout=5, check=False,
        )
        if r.returncode == 0:
            for line in r.stdout.splitlines():
                line = line.strip()
                if line.startswith("arq://"):
                    return line
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


def _emit(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    """Fire-and-forget substrate emission — daemon first, twin subprocess fallback."""
    outcome, address = _emit_via_daemon(class_, type_, ref, payload)
    if outcome == "ok":
        return address
    if outcome == "error":
        # Addressing service is authoritative: don't mask its failure.
        return None
    return _emit_via_twin_subprocess(class_, type_, ref, payload)


# ---------------------------------------------------------------------
# Core dispatch
# ---------------------------------------------------------------------


def _is_whitelisted(cli: str, args: list[str]) -> tuple[bool, str]:
    """Check if (cli, args[0..n]) matches a whitelist entry.

    Returns (matched, command_key). Matches the longest-prefix entry so
    "sts get-caller-identity" wins over "sts" even if both are listed.
    """
    allowed = WHITELIST.get(cli)
    if not allowed:
        return (False, "")
    # Try progressively shorter prefixes of args until one matches.
    for n in range(len(args), 0, -1):
        key = " ".join(args[:n])
        if key in allowed:
            return (True, key)
    return (False, "")


def _run_cli(
    cli: str,
    args: list[str],
    timeout: int,
    env_overrides: dict[str, str] | None = None,
) -> tuple[int, str, str]:
    """Run the underlying CLI. Returns (rc, stdout, stderr).

    If env_overrides is set, merges them on top of os.environ before
    spawning the subprocess — used to inject resolved credentials
    (e.g. AWS_ACCESS_KEY_ID) without leaking them into the parent
    shell.
    """
    if env_overrides:
        env = {**os.environ, **env_overrides}
    else:
        env = None  # inherit
    try:
        r = subprocess.run(
            [cli, *args],
            capture_output=True, text=True, timeout=timeout, check=False,
            env=env,
        )
        return r.returncode, r.stdout, r.stderr
    except subprocess.TimeoutExpired:
        return 124, "", f"timeout after {timeout}s"
    except FileNotFoundError:
        return 127, "", f"{cli} not on PATH"


def dispatch(cli: str, args: list[str], timeout: int = 60) -> int:
    """Run the whitelist gate + envelope chain + CLI invocation."""
    ts = _ts_ms()
    # Reference slug: flatten args into a filename-safe suffix, bounded.
    slug = "-".join([cli] + args).replace("/", "-")[:80]
    ref = f"{slug}-{ts}"

    matched, command_key = _is_whitelisted(cli, args)

    if not matched:
        # Denial path — emit rejected act, exit 1.
        reject_payload = {
            "cli": cli,
            "args_preview": " ".join(args)[:200],
            "reason": "not whitelisted",
            "whitelisted_entries_for_cli": sorted(WHITELIST.get(cli, set())),
            "actor_peer": ACTOR_PEER,
            "ts_ms": ts,
        }
        rejected = _emit("act", "cli_dispatch_rejected", ref, reject_payload)
        sys.stderr.write(
            f"arq-cli-exec: '{cli} {' '.join(args)}' — not whitelisted.\n"
        )
        allowed_str = ", ".join(sorted(WHITELIST.get(cli, set()))) or "(cli not registered)"
        sys.stderr.write(f"  allowed for '{cli}': {allowed_str}\n")
        if rejected:
            sys.stderr.write(f"  substrate: {rejected}\n")
        return 1

    # Credential resolution (pre-dispatch hook).
    # Order:
    #   1. Skip entirely if the verb is in CREDENTIAL_OPTIONAL (e.g.
    #      `aws --version` — self-check, no creds needed).
    #   2. If a resolver is registered for this CLI, run it.
    #      - On success: emit cli_credential_resolved (source + masked
    #        key-id; NEVER the secret) and inject env into subprocess.
    #      - On failure: emit cli_credential_resolution_failed and
    #        exit 1. Do NOT fall through to a cred-less subprocess —
    #        that's the NoCredentials-253 path this wiring closes.
    #   3. If no resolver is registered: fall through (future CLIs
    #      that don't need creds can omit the resolver).
    env_overrides: dict[str, str] | None = None
    cred_optional = CREDENTIAL_OPTIONAL.get(cli, set())
    resolver = CREDENTIAL_RESOLVERS.get(cli)
    if command_key not in cred_optional and resolver is not None:
        resolved = resolver()
        if resolved is None:
            fail_payload = {
                "cli": cli,
                "command_key": command_key,
                "reason": "credentials not reachable",
                "sources_tried": ["env", "sops", "keychain"],
                "actor_peer": ACTOR_PEER,
                "ts_ms": ts,
            }
            _emit("act", "cli_credential_resolution_failed", ref, fail_payload)
            sys.stderr.write(
                f"arq-cli-exec: credentials not available for service={cli}. "
                f"Sources tried: env vars, SOPS ~/Desktop/Project/.secrets.yaml, "
                f"macOS Keychain (generic-password service={cli}).\n"
            )
            sys.stderr.write(
                f"  For AWS: set AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY, or "
                f"add an [\"aws\"] block to SOPS, or run:\n"
                f"    security add-generic-password -s aws -a access_key_id -w <key>\n"
                f"    security add-generic-password -s aws -a secret_access_key -w <secret>\n"
            )
            return 1
        env_overrides, source = resolved
        resolved_payload = {
            "cli": cli,
            "command_key": command_key,
            "source": source,
            "env_vars_injected": sorted(env_overrides.keys()),
            "access_key_masked": _mask(env_overrides.get("AWS_ACCESS_KEY_ID")),
            "actor_peer": ACTOR_PEER,
            "ts_ms": ts,
        }
        _emit("act", "cli_credential_resolved", ref, resolved_payload)

    # Happy path.
    sent_payload = {
        "cli": cli,
        "command_key": command_key,
        "args_preview": " ".join(args)[:200],
        "actor_peer": ACTOR_PEER,
        "credential_source": (
            env_overrides and "injected" or "none"
        ),
        "ts_ms": ts,
    }
    sent = _emit("act", "cli_dispatch_sent", ref, sent_payload)

    t0 = time.monotonic()
    rc, stdout, stderr = _run_cli(cli, args, timeout=timeout, env_overrides=env_overrides)
    latency_ms = int((time.monotonic() - t0) * 1000)

    ack_payload = {
        "cli": cli,
        "command_key": command_key,
        "envelope_sent": sent,
        "exit_code": rc,
        "latency_ms": latency_ms,
        "stdout_hash": _sha256(stdout),
        "stderr_hash": _sha256(stderr),
        "stdout_bytes": len(stdout),
        "stderr_bytes": len(stderr),
        "stdout_preview": stdout[-500:],
        "stderr_preview": stderr[-500:],
        "ts_ms": _ts_ms(),
    }
    _emit("act", "cli_dispatch_ack", ref, ack_payload)

    _emit("act", "benchmark_sample", ref, {
        "op": f"cli-{cli}-{command_key.replace(' ', '-')}",
        "latency_ms": latency_ms,
        "http_status": 200 if rc == 0 else 500,
        "ts_ms": ts,
    })

    if stdout:
        sys.stdout.write(stdout)
    if stderr:
        sys.stderr.write(stderr)
    return rc


# ---------------------------------------------------------------------
# CLI frontend
# ---------------------------------------------------------------------


def main() -> int:
    ap = argparse.ArgumentParser(
        prog="arq-cli-exec",
        description=(
            "Whitelisted generic CLI dispatcher for the mesh. Gates every "
            "invocation against a per-CLI whitelist; emits cli_dispatch_sent "
            "+ cli_dispatch_ack envelopes on allowed calls, cli_dispatch_rejected "
            "on denied calls."
        ),
        epilog=(
            "Examples:\n"
            "  arq-cli-exec aws sts get-caller-identity\n"
            "  arq-cli-exec aws --version\n"
            "  arq-cli-exec --list\n"
        ),
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    ap.add_argument(
        "--list", action="store_true",
        help="print the whitelist and exit",
    )
    ap.add_argument(
        "--timeout", type=int, default=60,
        help="subprocess timeout in seconds (default 60)",
    )
    ap.add_argument("cli", nargs="?", help="CLI tool (e.g. 'aws')")
    ap.add_argument(
        "cli_args", nargs=argparse.REMAINDER,
        help="verb + args passed verbatim to the CLI",
    )

    args = ap.parse_args()

    if args.list:
        if not WHITELIST:
            print("(whitelist empty)")
            return 0
        for cli in sorted(WHITELIST):
            print(f"{cli}:")
            for entry in sorted(WHITELIST[cli]):
                print(f"  {entry}")
        return 0

    if not args.cli or not args.cli_args:
        ap.print_help(sys.stderr)
        sys.stderr.write("\narq-cli-exec: <cli> and at least one arg are required\n")
        return 2

    return dispatch(args.cli, args.cli_args, timeout=args.timeout)


if __name__ == "__main__":
    sys.exit(main())
