#!/usr/bin/env python3
"""arq-kube — multi-cluster kubectl adapter for the mesh.

Dispatches kubectl operations to ANY cluster the user has reach into,
starting with homebase (k3s via SSH to dgx-spark, the default) and
extending to GKE (`gke_arqera-staging`) and every other context in
`~/.kube/config` via a top-level `--context <name>` flag. Every
invocation is envelope-traced on substrate so Twin + agents dispatching
work leave the same audit trail regardless of which cluster answers.

Per arq://doc/principle/every-platform-is-a-peer-v1 — homebase AND GKE
AND any other cluster are peers. This adapter is the dial-tone: one
command shape, one envelope shape, many substrates underneath. Per
arq://doc/principle/one-primitive-speaks-every-protocol-v1 — the
primitive is the single dispatch surface; the kubectl-per-context
selector is an implementation detail of this wrapper, not a new
primitive. Per arq://doc/principle/substrate-is-the-exchange-v1 — the
envelope + benchmark emissions are the observable exchange; cluster
identity is a payload field, not a separate surface.

v0.2 verbs (v0.1 surface + `--context` threaded through every verb):
  arq-kube [--context NAME] pod list [-n NAMESPACE]
  arq-kube [--context NAME] pod run IMAGE [--cmd "CMD"] [-n NAMESPACE] [--gpu ...]
  arq-kube [--context NAME] pod logs POD [-n NAMESPACE]
  arq-kube [--context NAME] pod delete POD [-n NAMESPACE]
  arq-kube [--context NAME] node list                  (capacity snapshot)
  arq-kube [--context NAME] raw VERB [ARGS...]         (pass-through)
  arq-kube shell [--host HOST] ...                     (SSH-only, homebase-only)

Context semantics:
  --context absent OR --context homebase
      → SSH to dgx-spark (DEFAULT_SSH_HOST), run `kubectl <args>` there.
      Unchanged v0.1 behaviour. Works even when the caller's local
      kubectl context is pointed elsewhere.

  --context <any-other-name>
      → Run `kubectl --context <name> <args>` LOCALLY (no SSH). The
      local kubectl reads ~/.kube/config and dispatches to whichever
      cluster that context resolves to (GKE, remote k3s, kind, minikube,
      EKS, etc.). No secrets are read, copied, or cached by arq-kube —
      whatever auth the local kubectl already has is what the call uses.

  The `shell` verb is homebase-SSH-only by construction (it runs
  arbitrary host shell commands, not kubectl), so `--context` does not
  apply to it. Passing `--context <non-homebase>` with `shell` is rejected.

Emissions per call (unchanged shapes, `context` added to payload so the
substrate trail distinguishes homebase from GKE from other clusters):
  arq://act/kube_envelope_sent/<verb>-<ts>
  arq://act/kube_envelope_ack/<verb>-<ts>
  arq://act/benchmark_sample/kube-<verb>-<ts>

Unlock (Tier 1): every downstream GKE action — HOMEBASE_FAILOVER_ENABLED
secret patch, MCP gateway ops, celery beat ops, the ~20 GKE surfaces
currently bridged through Ayo manually — becomes one arq-kube call away
from any Claude session without a `kubectl config use-context` dance.
"""

from __future__ import annotations

import argparse
import json
import os
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path

# Layer 2 (2026-04-23): route act emission through daemon.act.emit (in
# memory signing, zero subprocess spawns per envelope) with a fallback
# to the legacy twin-subprocess path for backwards-compat with older
# daemons. See arq-call for the full rationale + migration plan.
sys.path.insert(0, str(Path(__file__).parent))
try:
    from _arq_primitive import primitive_invoke as _primitive_invoke  # type: ignore
except ImportError:
    _primitive_invoke = None  # type: ignore

TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
DEFAULT_SSH_HOST = os.environ.get("ARQ_KUBE_SSH_HOST", "dgx-spark")
ACTOR_PEER = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)

# Context sentinel — "homebase" and the absent case both mean "SSH to
# dgx-spark and run kubectl there". Any other value is passed to local
# kubectl via --context.
HOMEBASE_CONTEXT_ALIASES = {"homebase", "dgx-spark", "k3s"}

# Mac -> DGX direct SSH is blocked at the network layer (no key pair, no
# port-forward, no LAN reach). PC-WSL CAN reach DGX over the LAN at
# 192.168.1.6. Wire 1 (arq://body/arqera_bug/
# mac-to-dgx-ssh-broken-no-fallback-2026-05-27) closes the gap: when
# direct ssh to a host fails (timeout / exit 255), arq-kube transparently
# re-dispatches through a jump host and emits
# `arq://act/dispatch_routed_via_jump/<from>-<to>-<via>-<ts>` so substrate
# sees the rerouting. Default jump = pc-wsl; --no-fallback opts out for
# debugging; --jump-host overrides the jump host.
DEFAULT_JUMP_HOST = os.environ.get("ARQ_KUBE_JUMP_HOST", "pc-wsl")
DEFAULT_JUMP_TARGET_IP = {
    # Logical-name -> LAN IP the jump host uses internally. Mac's known
    # SSH config points dgx-spark at a name the jump host doesn't share.
    "dgx-spark": "192.168.1.6",
}
# ssh exit codes that mean "couldn't even reach the host": 255 (any
# ssh-level error before remote shell started — connection refused,
# unreachable, key rejected) and 124 (our subprocess timeout sentinel).
SSH_DIRECT_FAILED_EXIT_CODES = {124, 255}


def _ts_ms() -> int:
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def _is_homebase(context: str | None) -> bool:
    """Return True when `context` should be served by SSH-to-dgx-spark.

    None (flag absent) and the HOMEBASE_CONTEXT_ALIASES both map to the
    SSH path. Everything else maps to local kubectl --context <name>.
    """
    if context is None:
        return True
    return context.strip().lower() in HOMEBASE_CONTEXT_ALIASES


def _emit_via_daemon(class_: str, type_: str, ref: str, payload: dict) -> tuple[str, str | None]:
    """Try daemon.act.emit. Returns (outcome, address).

    Outcomes match arq-call: "ok" / "unknown_verb" / "unavailable" /
    "error". On "error" the caller does NOT fall back — addressing
    service is authoritative and masking its failure would hide bugs.
    """
    if _primitive_invoke is None:
        return ("unavailable", None)
    try:
        outcome, result = _primitive_invoke(
            "daemon.act.emit",
            {
                "class": class_,
                "type": type_,
                "reference": ref,
                "payload": payload,
            },
            ACTOR_PEER,
            timeout_s=10.0,
        )
    except Exception:
        return ("unavailable", None)
    if outcome == "ok" and isinstance(result, dict):
        return ("ok", result.get("address"))
    if outcome == "unknown_verb":
        return ("unknown_verb", None)
    if outcome == "error":
        return ("error", None)
    return ("unavailable", None)


def _emit_via_twin_subprocess(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    """Legacy keychain-touching subprocess path (soak-then-delete)."""
    if not TWIN_CLI.exists():
        return None
    try:
        r = subprocess.run(
            [str(TWIN_CLI), "--use-keychain", "act", "emit", class_, type_, ref,
             "--payload", json.dumps(payload)],  # noqa: ARQ-NO-JSON-HOT-PATH twin CLI input boundary — twin accepts --payload <json>
            capture_output=True, text=True, timeout=5, check=False,
        )
        if r.returncode == 0:
            for line in r.stdout.splitlines():
                line = line.strip()
                if line.startswith("arq://"):
                    return line
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


def _emit(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    """Fire-and-forget substrate emission — daemon.act.emit first, twin subprocess fallback."""
    outcome, address = _emit_via_daemon(class_, type_, ref, payload)
    if outcome == "ok":
        return address
    if outcome == "error":
        return None
    return _emit_via_twin_subprocess(class_, type_, ref, payload)


def _ssh_kubectl(args: list[str], timeout: int = 30, stdin_data: str | None = None) -> tuple[int, str, str]:
    """Run kubectl on the k3s control plane via SSH. Returns (rc, stdout, stderr).

    SSH concatenates remote args into a single shell command string on
    the far end, so each kubectl arg is shlex.quote'd before the join.
    Without quoting, args with spaces or JSON (e.g. --overrides) get
    split on whitespace and kubectl rejects them. (Found 2026-04-23 while
    probing WSL2 GPU paths with a strategic-merge override payload.)
    """
    import shlex
    remote = "kubectl " + " ".join(shlex.quote(a) for a in args)
    cmd = ["ssh", "-o", "BatchMode=yes", DEFAULT_SSH_HOST, remote]
    try:
        r = subprocess.run(
            cmd, capture_output=True, text=True, timeout=timeout, check=False,
            input=stdin_data,
        )
        return r.returncode, r.stdout, r.stderr
    except subprocess.TimeoutExpired:
        return 124, "", f"timeout after {timeout}s"
    except FileNotFoundError:
        return 127, "", "ssh not on PATH"


def _local_kubectl(context: str, args: list[str], timeout: int = 30,
                   stdin_data: str | None = None) -> tuple[int, str, str]:
    """Run kubectl --context <name> LOCALLY. Returns (rc, stdout, stderr).

    Used for every context that isn't homebase. The local kubectl binary
    reads ~/.kube/config and dispatches to whichever cluster the named
    context resolves to (GKE, remote k3s, kind, minikube, EKS, etc.).
    No secrets are read, copied, or cached by arq-kube — whatever auth
    the local kubectl has (gcloud plugin, AWS IAM authenticator, plain
    certs) is what the call uses.

    Layered on top of `kubectl` directly, not `ssh … kubectl`, because
    the whole point of `--context` is "reach clusters I can't SSH into".
    """
    cmd = ["kubectl", "--context", context] + args
    try:
        r = subprocess.run(
            cmd, capture_output=True, text=True, timeout=timeout, check=False,
            input=stdin_data,
        )
        return r.returncode, r.stdout, r.stderr
    except subprocess.TimeoutExpired:
        return 124, "", f"timeout after {timeout}s"
    except FileNotFoundError:
        return 127, "", "kubectl not on PATH"


def _run_kubectl(context: str | None, args: list[str], timeout: int = 30,
                 stdin_data: str | None = None) -> tuple[int, str, str]:
    """Dispatch based on context — SSH-to-homebase or local kubectl."""
    if _is_homebase(context):
        return _ssh_kubectl(args, timeout=timeout, stdin_data=stdin_data)
    return _local_kubectl(context, args, timeout=timeout, stdin_data=stdin_data)


def _dispatch(context: str | None, verb: str, extra_envelope: dict,
              kubectl_args: list[str], timeout: int = 30) -> int:
    ts = _ts_ms()
    ref = f"{verb.replace('.', '-')}-{ts}"
    effective_context = "homebase" if _is_homebase(context) else context
    target_host = DEFAULT_SSH_HOST if _is_homebase(context) else "local"
    envelope = {
        "verb": f"kube.{verb}",
        "actor_peer": ACTOR_PEER,
        "context": effective_context,
        "target_host": target_host,
        "kubectl_args_preview": " ".join(kubectl_args)[:200],
        "ts_ms": ts,
        **extra_envelope,
    }
    sent = _emit("act", "kube_envelope_sent", ref, envelope)
    t0 = time.monotonic()
    rc, stdout, stderr = _run_kubectl(context, kubectl_args, timeout=timeout)
    latency_ms = int((time.monotonic() - t0) * 1000)

    ack = {
        "verb": f"kube.{verb}",
        "context": effective_context,
        "envelope_sent": sent,
        "exit_code": rc,
        "latency_ms": latency_ms,
        "stdout_preview": stdout[-500:],
        "stderr_preview": stderr[-500:],
        "ts_ms": _ts_ms(),
    }
    _emit("act", "kube_envelope_ack", ref, ack)
    _emit("act", "benchmark_sample", ref,
          {"op": f"kube-{verb.replace('.', '-')}", "context": effective_context,
           "latency_ms": latency_ms,
           "http_status": 200 if rc == 0 else 500, "ts_ms": ts})

    if stdout:
        sys.stdout.write(stdout)
    if stderr:
        sys.stderr.write(stderr)
    return rc


def cmd_pod_list(args: argparse.Namespace) -> int:
    kargs = ["get", "pods"]
    if args.namespace:
        kargs += ["-n", args.namespace]
    else:
        kargs += ["--all-namespaces"]
    kargs += ["-o", "wide"]
    return _dispatch(args.context, "pod.list",
                     {"namespace": args.namespace or "*"}, kargs)


def cmd_pod_logs(args: argparse.Namespace) -> int:
    kargs = ["logs", args.pod, "--tail=200"]
    if args.namespace:
        kargs += ["-n", args.namespace]
    return _dispatch(args.context, "pod.logs",
                     {"pod": args.pod, "namespace": args.namespace or "default"},
                     kargs, timeout=15)


def cmd_pod_delete(args: argparse.Namespace) -> int:
    kargs = ["delete", "pod", args.pod]
    if args.namespace:
        kargs += ["-n", args.namespace]
    return _dispatch(args.context, "pod.delete", {"pod": args.pod}, kargs)


def cmd_pod_run(args: argparse.Namespace) -> int:
    """Create a one-shot pod. Name auto-generated from --name or the image's last path segment.

    Works on any context, but the --gpu / --mount-wsl / --node flags are
    homebase-specific (they encode homebase's GPU node labels + WSL2
    driver mounts). Using them against a non-homebase context is allowed
    but the overrides likely won't match — kubectl will either error or
    run on the wrong node. A future PR can reject the combination; for
    now we emit the envelope and let kubectl return the real error.
    """
    name = args.name or (args.image.rsplit("/", 1)[-1].rsplit(":", 1)[0] + f"-{_ts_ms() % 100000}")
    kargs = [
        "run", name,
        "--image", args.image,
        "--restart=Never",
        "--attach=false",
    ]
    if args.namespace:
        kargs += ["-n", args.namespace]
    overrides: dict = {}
    if args.gpu:
        # Tell gpu-operator which GPU via node selector.
        selector = {"blackwell": "nvidia.com/gpu.product=GB10", "rtx5090": "nvidia.com/gpu.product=RTX5090"}[args.gpu]
        overrides.setdefault("spec", {})
        overrides["spec"].setdefault("containers", [{"name": name, "image": args.image,
                                                      "resources": {"limits": {"nvidia.com/gpu": "1"}}}])
        overrides["spec"]["nodeSelector"] = dict(kv.split("=") for kv in [selector])
    if args.node:
        # Pin to a specific node by hostname. Added 2026-04-23 for the
        # RTX 5090 WSL2 GPU visibility fix — probing /usr/lib/wsl and
        # /dev/dxg required landing on `ayo` specifically, not whichever
        # node gpu-operator picked.
        overrides.setdefault("spec", {})
        ns = overrides["spec"].get("nodeSelector", {})
        ns["kubernetes.io/hostname"] = args.node
        overrides["spec"]["nodeSelector"] = ns
    if args.mount_wsl:
        # Mount the WSL2 host-provided driver/libs read-only. Required
        # for any GPU workload on a WSL2 node — the NVIDIA user-space
        # libraries live on the Windows host and are exposed inside the
        # WSL distro at /usr/lib/wsl. See
        # arq://doc/principle/every-platform-is-a-peer-v1 — the peer's
        # native driver surface is what we mount, we never install one.
        overrides.setdefault("spec", {})
        vols = overrides["spec"].get("volumes", [])
        vols.append({"name": "wsl-lib",
                     "hostPath": {"path": "/usr/lib/wsl", "type": "Directory"}})
        overrides["spec"]["volumes"] = vols
        containers = overrides["spec"].get("containers", [{"name": name, "image": args.image}])
        for c in containers:
            mounts = c.get("volumeMounts", [])
            mounts.append({"name": "wsl-lib", "mountPath": "/usr/lib/wsl", "readOnly": True})
            c["volumeMounts"] = mounts
            env = c.get("env", [])
            env.append({"name": "LD_LIBRARY_PATH",
                        "value": "/usr/lib/wsl/lib:/usr/local/nvidia/lib64"})
            c["env"] = env
        overrides["spec"]["containers"] = containers
    if overrides:
        # strategic merge — without it, kubectl run interprets --overrides
        # as a JSON patch and rejects anything that isn't an array of
        # RFC-6902 ops. Strategic merge accepts a partial PodSpec, which
        # is what we construct above.
        kargs += ["--override-type=strategic", "--overrides", json.dumps(overrides)]
    if args.cmd:
        kargs += ["--", "sh", "-c", args.cmd]
    return _dispatch(args.context, "pod.run",
                     {"image": args.image, "name": name, "gpu": args.gpu,
                      "node": args.node, "mount_wsl": args.mount_wsl},
                     kargs, timeout=60)


def cmd_node_list(args: argparse.Namespace) -> int:
    # Top nodes to show capacity; falls back to `get nodes` if metrics-server misbehaves.
    return _dispatch(args.context, "node.list", {}, ["top", "nodes"])


# Deny-list for raw verbs: never allow unscoped delete / drain / cordon
# through the wrapper — those are control-plane-blinking operations and
# must come in as their own first-class verb with an explicit confirmation
# contract. Raw is for read + label + patch + apply + rollout.
RAW_FORBIDDEN_VERBS = {"drain", "cordon", "uncordon"}


def _ssh_shell_direct(cmd: list[str], host: str = DEFAULT_SSH_HOST,
                      timeout: int = 30) -> tuple[int, str, str]:
    """Run an arbitrary shell command on a homebase host via direct SSH.

    Returns (rc, stdout, stderr). On any ssh-level failure (timeout,
    connect refused, key rejected) returns 124/255 + empty stdout +
    diagnostic stderr — same shape as a remote-shell failure so callers
    can treat the path uniformly.

    Provides the mesh-compliant path for non-kubectl host operations
    (docker ps, nvidia-smi, systemctl status, apt, etc.) so `ssh <host>
    <command>` doesn't need to bypass mesh-enforce via the `!` prefix.
    """
    full = ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", host] + cmd
    try:
        r = subprocess.run(full, capture_output=True, text=True, timeout=timeout, check=False)
        return r.returncode, r.stdout, r.stderr
    except subprocess.TimeoutExpired:
        return 124, "", f"timeout after {timeout}s"
    except FileNotFoundError:
        return 127, "", "ssh not on PATH"


def _ssh_shell_via_jump(cmd: list[str], host: str, jump_host: str,
                        timeout: int = 30) -> tuple[int, str, str]:
    """Re-dispatch the same remote command through a jump host.

    Mac -> DGX direct SSH is blocked, but PC-WSL -> DGX (LAN) works. This
    helper runs the original `cmd` on `host` by SSHing into `jump_host`
    first and from there SSHing into `host`. Target IP is resolved via
    DEFAULT_JUMP_TARGET_IP when present (jump hosts often see the target
    at a LAN address the caller doesn't share); otherwise the logical
    name is used (assumes the jump host's ssh-config carries the alias).

    Returns (rc, stdout, stderr).
    """
    import shlex
    target = DEFAULT_JUMP_TARGET_IP.get(host, host)
    # Build the inner ssh command. Quote each piece so spaces / JSON
    # payloads survive the double-shell round-trip.
    inner_cmd = " ".join(shlex.quote(c) for c in cmd)
    inner = (
        f"ssh -o BatchMode=yes -o ConnectTimeout=5 {shlex.quote(target)} "
        f"{shlex.quote(inner_cmd)}"
    )
    full = ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5",
            jump_host, inner]
    try:
        r = subprocess.run(full, capture_output=True, text=True, timeout=timeout, check=False)
        return r.returncode, r.stdout, r.stderr
    except subprocess.TimeoutExpired:
        return 124, "", f"timeout after {timeout}s (via {jump_host})"
    except FileNotFoundError:
        return 127, "", "ssh not on PATH"


def _ssh_shell(cmd: list[str], host: str = DEFAULT_SSH_HOST, timeout: int = 30,
               *, allow_fallback: bool = True, jump_host: str | None = None,
               envelope_ref: str | None = None) -> tuple[int, str, str, dict]:
    """Run a remote command, transparently falling back to a jump host on failure.

    Returns (rc, stdout, stderr, routing). `routing` carries the chosen
    path so the caller can emit substrate acts that reflect the actual
    dispatch:
      {"path": "direct"}                                 -- direct ssh succeeded
      {"path": "jump", "jump_host": "pc-wsl", "direct_exit": 124,
       "direct_stderr_preview": "..."}                   -- fell through
      {"path": "direct", "fallback_disabled": True}      -- --no-fallback, no retry
      {"path": "direct_only", "reason": "same_host"}     -- host == jump_host

    Fallback fires iff:
      * allow_fallback is True (Wire 1 default), AND
      * direct exit code is in SSH_DIRECT_FAILED_EXIT_CODES (124, 255), AND
      * host != effective jump host (no point re-dispatching to self).

    When fallback fires we ALSO emit `arq://act/dispatch_routed_via_jump`
    so substrate sees the rerouting (caller passes `envelope_ref` so the
    act is correlated with its envelope_sent / envelope_ack pair).
    """
    rc, stdout, stderr = _ssh_shell_direct(cmd, host=host, timeout=timeout)
    routing: dict = {"path": "direct"}

    # Fallback short-circuits.
    if rc == 0:
        return rc, stdout, stderr, routing
    if not allow_fallback:
        routing["fallback_disabled"] = True
        return rc, stdout, stderr, routing
    if rc not in SSH_DIRECT_FAILED_EXIT_CODES:
        # Remote shell ran but the command itself failed — don't second-
        # guess the user; surface the real exit code unchanged.
        return rc, stdout, stderr, routing
    effective_jump = jump_host or DEFAULT_JUMP_HOST
    if host == effective_jump:
        routing = {"path": "direct_only", "reason": "same_host"}
        return rc, stdout, stderr, routing

    # Direct failed at ssh-level; retry via jump.
    direct_stderr_preview = stderr[-200:]
    direct_exit = rc
    rc2, stdout2, stderr2 = _ssh_shell_via_jump(
        cmd, host=host, jump_host=effective_jump, timeout=timeout,
    )
    routing = {
        "path": "jump",
        "jump_host": effective_jump,
        "direct_exit": direct_exit,
        "direct_stderr_preview": direct_stderr_preview,
    }
    # Emit substrate signal that the rerouting happened, regardless of
    # whether the jump succeeded — failures are also evidence.
    ts = _ts_ms()
    ref = envelope_ref or f"{host}-{effective_jump}-{ts}"
    _emit(
        "act", "dispatch_routed_via_jump",
        f"{host}-to-{host}-via-{effective_jump}-{ts}"
        if envelope_ref is None
        else f"{ref}-via-{effective_jump}",
        {
            "from_host": "local",
            "to_host": host,
            "via": effective_jump,
            "direct_exit_code": direct_exit,
            "direct_stderr_preview": direct_stderr_preview,
            "jump_exit_code": rc2,
            "jump_stderr_preview": stderr2[-200:],
            "ts_ms": ts,
            "envelope_ref": envelope_ref,
        },
    )
    return rc2, stdout2, stderr2, routing


def cmd_shell(args: argparse.Namespace) -> int:
    """Arbitrary shell passthrough to a homebase host via SSH.

    This verb is homebase-SSH-only by design — its purpose is running
    arbitrary host shell, not kubectl. Using `--context <non-homebase>`
    with `shell` is rejected at parse time in main().

    Example:
      arq-kube shell docker ps
      arq-kube shell --host pc-wsl nvidia-smi
      arq-kube shell --host dgx-spark systemctl status k3s
    """
    cmd = list(args.shell_args or [])
    if not cmd:
        sys.stderr.write("arq-kube shell: no command provided\n")
        return 2
    host = args.host
    ts = _ts_ms()
    op_preview = " ".join(cmd[:5])[:200]
    ref = f"shell-{host}-{ts}"
    envelope = {
        "verb": "shell",
        "actor_peer": ACTOR_PEER,
        "context": "homebase",
        "target_host": host,
        "cmd_preview": op_preview,
        "argc": len(cmd),
        "ts_ms": ts,
    }
    sent = _emit("act", "kube_envelope_sent", ref, envelope)
    t0 = time.monotonic()
    rc, stdout, stderr, routing = _ssh_shell(
        cmd, host=host, timeout=args.timeout,
        allow_fallback=not args.no_fallback,
        jump_host=args.jump_host,
        envelope_ref=ref,
    )
    latency_ms = int((time.monotonic() - t0) * 1000)
    _emit("act", "kube_envelope_ack", ref,
          {"verb": "shell", "context": "homebase", "target_host": host,
           "envelope_sent": sent,
           "exit_code": rc, "latency_ms": latency_ms,
           "stdout_preview": stdout[-500:], "stderr_preview": stderr[-500:],
           "routing": routing,
           "ts_ms": _ts_ms()})
    _emit("act", "benchmark_sample", ref,
          {"op": f"kube-shell-{host}", "context": "homebase",
           "latency_ms": latency_ms,
           "http_status": 200 if rc == 0 else 500, "ts_ms": ts})
    if stdout:
        sys.stdout.write(stdout)
    if stderr:
        sys.stderr.write(stderr)
    return rc


def cmd_raw(args: argparse.Namespace) -> int:
    """Pass-through for kubectl verbs not yet first-class on arq-kube.

    Added 2026-04-23 while unblocking the RTX 5090 WSL2 GPU visibility
    fix on homebase — ClusterPolicy inspection + node label application
    needed verbs the v0.1 wrapper didn't carry. Every raw call still
    rides the same envelope + benchmark emission path as pod/node verbs,
    so substrate captures the exact kubectl args and stdout/stderr
    previews.

    With v0.2 (`--context`), raw also threads through to GKE / any other
    context. Example:
      arq-kube --context gke_arqera-staging raw get pods -n arqera-staging

    Guardrails:
      * First arg (kube_verb) is required and appended into the envelope
        reference so acts stay greppable (raw-get, raw-label, …).
      * `drain`, `cordon`, `uncordon` rejected — control-plane-blinking
        ops need their own first-class verb with explicit confirmation.
    """
    kube_verb = args.kube_verb
    if kube_verb in RAW_FORBIDDEN_VERBS:
        sys.stderr.write(
            f"arq-kube raw: '{kube_verb}' is reserved — file a first-class "
            f"verb with explicit confirmation contract. Blocked.\n"
        )
        return 2
    kargs = [kube_verb, *args.kube_args]
    envelope = {
        "kube_verb": kube_verb,
        "arg_count": len(args.kube_args),
    }
    return _dispatch(args.context, f"raw.{kube_verb}", envelope, kargs,
                     timeout=args.timeout)


def main() -> int:
    ap = argparse.ArgumentParser(
        prog="arq-kube",
        description="Multi-cluster k8s adapter — mesh-routed kubectl dispatches.",
    )
    # Global --context flag. Parsed BEFORE the subcommand so it threads
    # into every verb. Absent OR "homebase" → SSH-to-dgx-spark (v0.1
    # behaviour). Any other value → local `kubectl --context <name>`.
    ap.add_argument(
        "--context",
        default=None,
        metavar="NAME",
        help=(
            "kube context to target. Absent or 'homebase' → SSH to "
            f"{DEFAULT_SSH_HOST} (homebase k3s). Any other value → local "
            "`kubectl --context <NAME>` (GKE, remote k3s, kind, etc.). "
            "Applies to every verb except `shell`."
        ),
    )

    sub = ap.add_subparsers(dest="verb", required=True)

    pod = sub.add_parser("pod", help="pod operations")
    pod_sub = pod.add_subparsers(dest="pod_verb", required=True)

    p_list = pod_sub.add_parser("list")
    p_list.add_argument("-n", "--namespace")
    p_list.set_defaults(func=cmd_pod_list)

    p_run = pod_sub.add_parser("run")
    p_run.add_argument("image")
    p_run.add_argument("--cmd", default=None, help="command to run inside the container")
    p_run.add_argument("--name", default=None, help="pod name (auto-generated if omitted)")
    p_run.add_argument("-n", "--namespace", default="default")
    p_run.add_argument("--gpu", choices=["blackwell", "rtx5090"], default=None,
                       help="constrain to a specific homebase GPU")
    p_run.add_argument("--node", default=None,
                       help="pin pod to a specific node by hostname")
    p_run.add_argument("--mount-wsl", action="store_true",
                       help="mount /usr/lib/wsl read-only into the pod (for WSL2 nodes)")
    p_run.set_defaults(func=cmd_pod_run)

    p_logs = pod_sub.add_parser("logs")
    p_logs.add_argument("pod")
    p_logs.add_argument("-n", "--namespace")
    p_logs.set_defaults(func=cmd_pod_logs)

    p_del = pod_sub.add_parser("delete")
    p_del.add_argument("pod")
    p_del.add_argument("-n", "--namespace")
    p_del.set_defaults(func=cmd_pod_delete)

    node = sub.add_parser("node", help="node operations")
    node_sub = node.add_subparsers(dest="node_verb", required=True)
    n_list = node_sub.add_parser("list")
    n_list.set_defaults(func=cmd_node_list)

    shell = sub.add_parser(
        "shell",
        help="arbitrary shell passthrough to homebase host (docker, nvidia-smi, systemctl, etc.)",
    )
    shell.add_argument("--host", default=DEFAULT_SSH_HOST,
                       help="SSH host (dgx-spark [default] | pc-wsl | …)")
    shell.add_argument("--timeout", type=int, default=30)
    shell.add_argument(
        "--no-fallback",
        action="store_true",
        help=(
            "Disable the auto-fallback through --jump-host when direct ssh "
            "fails (timeout / exit 255). Useful when debugging the direct "
            "path itself — without this, transient direct failures are "
            "masked by the jump path silently working."
        ),
    )
    shell.add_argument(
        "--jump-host",
        default=DEFAULT_JUMP_HOST,
        metavar="HOST",
        help=(
            "Jump host to retry through when direct ssh fails (default: "
            f"{DEFAULT_JUMP_HOST}). Override via $ARQ_KUBE_JUMP_HOST. The "
            "jump host must have ssh-config for the target alias or the "
            "target must appear in DEFAULT_JUMP_TARGET_IP (e.g. "
            "dgx-spark -> 192.168.1.6 over LAN)."
        ),
    )
    shell.add_argument("shell_args", nargs=argparse.REMAINDER)
    shell.set_defaults(func=cmd_shell)

    raw = sub.add_parser(
        "raw",
        help="pass-through kubectl verb (get/describe/label/patch/apply/rollout/events)",
    )
    raw.add_argument("kube_verb",
                     help="kubectl verb, e.g. get, describe, label, patch, apply, rollout")
    raw.add_argument("kube_args", nargs=argparse.REMAINDER,
                     help="remaining kubectl args, passed through verbatim")
    raw.add_argument("--timeout", type=int, default=30)
    raw.set_defaults(func=cmd_raw)

    args = ap.parse_args()

    # `shell` is homebase-SSH-only by construction (arbitrary host shell,
    # not kubectl). Reject a non-homebase --context combined with shell so
    # the operator hears about the mismatch instead of silently ignoring it.
    if args.verb == "shell" and not _is_homebase(args.context):
        sys.stderr.write(
            "arq-kube shell: --context is not supported (shell is SSH-to-host-only). "
            "For kubectl against a non-homebase context, use "
            "`arq-kube --context <NAME> raw ...`.\n"
        )
        return 2

    return args.func(args)


if __name__ == "__main__":
    sys.exit(main())
