#!/usr/bin/env python3
"""arq-mesh — JSON-RPC client for the local mesh-daemon Unix socket.

Closes arq://body/arqera_bug/no-mesh-daemon-rpc-client-2026-05-27.

DISCOVERED OPS (probed 2026-05-27 via the live socket):
  The daemon at ~/.arqera/mesh-daemon.sock is a *claim coordinator* for
  cross-session file/branch/pane coordination. It is NOT a peer mesh in
  the libp2p / addressing-service sense. The original bug report
  hypothesised ops like `peers`, `ping`, `version` — these do NOT exist.

  Authoritative op set (parsed from the daemon's own error response):
    claim · query · release · status · list_claims · list_conflicts
    pane_register · pane_load · pane_close · pane_compose_revise
    pane_share · pane_unshare · pane_fork · pane_list

  Round-trip examples (verified live):
    {"op":"status"}         -> {"decision":"status","claims":{...}}
    {"op":"list_claims"}    -> {"decision":"claim_list","claims":[...]}
    {"op":"list_conflicts"} -> {"decision":"conflict_list","conflicts":[...]}
    {"op":"pane_list"}      -> error: missing field `identity_owner`
                               (requires identity context — not exposed
                               by this CLI in v0.1)

SUBCOMMAND MAPPING (intent → daemon op):
  arq-mesh status            -> {"op":"status"}
  arq-mesh peers             -> {"op":"list_claims"}   (closest concept:
                                 who is holding what across sibling sessions —
                                 the daemon has no "peer" notion, only claim
                                 holders identified by cli_instance_id)
  arq-mesh conflicts         -> {"op":"list_conflicts"}
  arq-mesh ping              -> {"op":"status"} round-trip used as a
                                 connectivity probe (returns ok/fail + latency)
  arq-mesh raw <json>        -> escape hatch — send arbitrary payload

SUBSTRATE ATTESTATION:
  Each invocation emits arq://act/mesh_query/<op>-<ts> with the request
  payload, response decision, and a truncated response body. Failure
  modes (socket missing, parse error, timeout) are also attested so the
  audit trail captures negative space too. Emission uses the same
  daemon.act.emit primitive as arq-call, with a twin --use-keychain
  subprocess fallback.

CONTRACT:
  Exit code 0 on round-trip success (any daemon decision counts — even
  daemon-reported errors are a successful round-trip, the CLI did its
  job). Exit code 1 on transport failure (socket missing, timeout,
  malformed JSON from daemon). Exit code 2 on CLI usage error.
"""

from __future__ import annotations

import argparse
import json
import os
import socket
import sys
import time
from datetime import datetime, timezone
from pathlib import Path

# Reuse the substrate-emission primitive used by arq-call so we get
# daemon.act.emit (zero keychain prompts) with twin-subprocess fallback.
sys.path.insert(0, str(Path(__file__).parent))
try:
    from _arq_primitive import primitive_invoke as _primitive_invoke  # type: ignore
except ImportError:  # pragma: no cover — best-effort emission only
    _primitive_invoke = None  # type: ignore

DEFAULT_SOCKET = Path.home() / ".arqera" / "mesh-daemon.sock"
TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
ACTOR_PEER = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)

# Daemon-supported ops (discovered via probing — see module docstring).
# Kept here for fast validation + --help discoverability.
DAEMON_OPS = (
    "claim",
    "query",
    "release",
    "status",
    "list_claims",
    "list_conflicts",
    "pane_register",
    "pane_load",
    "pane_close",
    "pane_compose_revise",
    "pane_share",
    "pane_unshare",
    "pane_fork",
    "pane_list",
)


def _utc_ts_ms() -> int:
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def _send(socket_path: Path, payload: dict, timeout_s: float = 3.0) -> tuple[str, dict | str]:
    """Send one JSON request to the mesh-daemon socket.

    Returns (outcome, body):
      - ("ok", parsed_dict)         — got valid JSON response
      - ("invalid_json", raw_text)  — daemon responded but body isn't JSON
      - ("no_socket", reason)       — socket file missing
      - ("transport_error", reason) — connect/send/recv failed
    """
    if not socket_path.exists():
        return ("no_socket", f"socket missing at {socket_path}")
    s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    s.settimeout(timeout_s)
    try:
        s.connect(str(socket_path))
        s.sendall((json.dumps(payload) + "\n").encode("utf-8"))
        chunks: list[bytes] = []
        while True:
            try:
                chunk = s.recv(8192)
            except socket.timeout:
                break
            if not chunk:
                break
            chunks.append(chunk)
            if b"\n" in chunk:
                break
        raw = b"".join(chunks).decode("utf-8", errors="replace").strip()
    except (OSError, socket.error) as e:
        return ("transport_error", repr(e))
    finally:
        try:
            s.close()
        except Exception:
            pass
    if not raw:
        return ("transport_error", "empty response")
    try:
        return ("ok", json.loads(raw))
    except json.JSONDecodeError:
        return ("invalid_json", raw)


def _emit_act(op: str, request: dict, outcome: str, response_summary: dict) -> str | None:
    """Emit arq://act/mesh_query/<op>-<ts>. Fire-and-forget."""
    ts = _utc_ts_ms()
    reference = f"{op}-{ts}"
    payload = {
        "op": op,
        "request": request,
        "outcome": outcome,
        "response_summary": response_summary,
        "actor_peer": ACTOR_PEER,
        "client": "arq-mesh",
        "ts_ms": ts,
    }
    # Try daemon.act.emit first (zero keychain prompts).
    if _primitive_invoke is not None:
        try:
            d_outcome, d_result = _primitive_invoke(
                "daemon.act.emit",
                {
                    "class": "act",
                    "type": "mesh_query",
                    "reference": reference,
                    "payload": payload,
                },
                ACTOR_PEER,
                timeout_s=5.0,
            )
            if d_outcome == "ok" and isinstance(d_result, dict):
                return d_result.get("address")
        except Exception:
            pass
    # Fallback: twin subprocess. Skipped if twin CLI not present —
    # the CLI must remain useful pre-bootstrap.
    if TWIN_CLI.exists():
        try:
            import subprocess

            res = subprocess.run(
                [
                    str(TWIN_CLI),
                    "--use-keychain",
                    "act",
                    "emit",
                    "act",
                    "mesh_query",
                    reference,
                    "--payload",
                    json.dumps(payload),
                ],
                capture_output=True,
                text=True,
                timeout=5,
                check=False,
            )
            if res.returncode == 0:
                for line in res.stdout.splitlines():
                    line = line.strip()
                    if line.startswith("arq://"):
                        return line
        except Exception:
            pass
    return None


def _summarise(body: dict | str) -> dict:
    """Compact a response body for substrate emission.

    Substrate payloads should stay small; truncate the response so we
    keep the decision + a sample without ballooning queue size.
    """
    if isinstance(body, dict):
        out: dict = {"decision": body.get("decision")}
        # Capture top-level keys + truncated repr for audit context.
        out["keys"] = list(body.keys())[:10]
        truncated = json.dumps(body)[:512]
        out["body_preview"] = truncated
        return out
    return {"decision": None, "body_preview": str(body)[:512]}


def _print_response(outcome: str, body: dict | str, pretty: bool) -> None:
    if outcome == "ok" and isinstance(body, dict):
        if pretty:
            print(json.dumps(body, indent=2, sort_keys=True))
        else:
            print(json.dumps(body))
    elif outcome == "invalid_json":
        sys.stderr.write(f"daemon returned non-JSON body:\n{body}\n")
    elif outcome == "no_socket":
        sys.stderr.write(f"mesh-daemon socket not available: {body}\n")
    else:
        sys.stderr.write(f"transport error: {body}\n")


def _run(op: str, request: dict, args: argparse.Namespace) -> int:
    """Execute one op + emit the audit act + return exit code."""
    socket_path = Path(args.socket).expanduser()
    t0 = time.monotonic()
    outcome, body = _send(socket_path, request, timeout_s=args.timeout)
    latency_ms = int((time.monotonic() - t0) * 1000)

    if not args.no_emit:
        summary = _summarise(body) if outcome == "ok" else {"error": str(body)[:512]}
        summary["latency_ms"] = latency_ms
        _emit_act(op, request, outcome, summary)

    _print_response(outcome, body, pretty=args.pretty)
    return 0 if outcome == "ok" else 1


def _cmd_status(args: argparse.Namespace) -> int:
    return _run("status", {"op": "status"}, args)


def _cmd_peers(args: argparse.Namespace) -> int:
    # No "peers" op in this daemon — list_claims is the closest concept.
    # Document the intent inline so operators reading --help know.
    return _run("list_claims", {"op": "list_claims"}, args)


def _cmd_conflicts(args: argparse.Namespace) -> int:
    return _run("list_conflicts", {"op": "list_conflicts"}, args)


def _cmd_ping(args: argparse.Namespace) -> int:
    """Connectivity probe — does a status round-trip and reports timing."""
    socket_path = Path(args.socket).expanduser()
    t0 = time.monotonic()
    outcome, body = _send(socket_path, {"op": "status"}, timeout_s=args.timeout)
    latency_ms = int((time.monotonic() - t0) * 1000)

    target = args.peer or "self"
    if not args.no_emit:
        summary = _summarise(body) if outcome == "ok" else {"error": str(body)[:512]}
        summary["latency_ms"] = latency_ms
        summary["peer"] = target
        _emit_act("ping", {"op": "status", "peer": target}, outcome, summary)

    if outcome == "ok":
        print(json.dumps({"ok": True, "latency_ms": latency_ms, "peer": target}))
        return 0
    sys.stderr.write(f"ping failed: outcome={outcome} body={body!r}\n")
    return 1


def _cmd_raw(args: argparse.Namespace) -> int:
    try:
        payload = json.loads(args.payload)
    except json.JSONDecodeError as e:
        sys.stderr.write(f"--payload is not valid JSON: {e}\n")
        return 2
    if not isinstance(payload, dict) or "op" not in payload:
        sys.stderr.write("payload must be a JSON object with an 'op' field\n")
        return 2
    return _run(str(payload.get("op")), payload, args)


def _build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        prog="arq-mesh",
        description=(
            "JSON-RPC client for the local mesh-daemon socket. "
            "The daemon is a claim-coordinator across sibling Claude "
            "sessions — it tracks file/branch/pane claims, not network peers."
        ),
        epilog=(
            "Daemon-supported ops (discovered live): "
            + ", ".join(DAEMON_OPS)
        ),
    )
    parser.add_argument(
        "--socket",
        default=str(DEFAULT_SOCKET),
        help=f"path to mesh-daemon Unix socket (default: {DEFAULT_SOCKET})",
    )
    parser.add_argument(
        "--timeout",
        type=float,
        default=3.0,
        help="socket I/O timeout in seconds (default: 3.0)",
    )
    parser.add_argument(
        "--pretty",
        action="store_true",
        help="pretty-print JSON responses",
    )
    parser.add_argument(
        "--no-emit",
        action="store_true",
        help="skip substrate act emission (useful in tests + CI)",
    )

    sub = parser.add_subparsers(dest="cmd", required=True)

    p_status = sub.add_parser("status", help="dump current daemon state ({op:status})")
    p_status.set_defaults(func=_cmd_status)

    p_peers = sub.add_parser(
        "peers",
        help="list current claim-holders ({op:list_claims}) — closest analog to 'peers'",
    )
    p_peers.set_defaults(func=_cmd_peers)

    p_conflicts = sub.add_parser(
        "conflicts",
        help="list outstanding claim conflicts ({op:list_conflicts})",
    )
    p_conflicts.set_defaults(func=_cmd_conflicts)

    p_ping = sub.add_parser(
        "ping",
        help="connectivity probe (status round-trip + latency)",
    )
    p_ping.add_argument(
        "peer",
        nargs="?",
        default=None,
        help="optional peer label (recorded in the audit act; daemon has no peer op)",
    )
    p_ping.set_defaults(func=_cmd_ping)

    p_raw = sub.add_parser(
        "raw",
        help="send a raw JSON payload (must include 'op' field)",
    )
    p_raw.add_argument("payload", help="JSON string, e.g. '{\"op\":\"status\"}'")
    p_raw.set_defaults(func=_cmd_raw)

    return parser


def main(argv: list[str] | None = None) -> int:
    parser = _build_parser()
    args = parser.parse_args(argv)
    try:
        return int(args.func(args))
    except KeyboardInterrupt:
        return 130


if __name__ == "__main__":
    sys.exit(main())
