#!/usr/bin/env python3
"""AgentBridge — Claude↔Codex realtime bridge CLI.

Local file-based message bus. Subcommands:
  init                  bootstrap .agent-bus/ layout
  send                  append message to messages.jsonl
  watch                 tail messages for an agent, persist cursor
  ack                   reply to a message (sets reply_to)
  lock acquire          claim per-file edit lock
  lock heartbeat        refresh lock liveness
  lock release          drop lock
  lock reap             force-delete stale lock
"""
from __future__ import annotations

import argparse
import errno
import fcntl
import json
import os
import re
import subprocess
import sys
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable

# ----- Constants -----------------------------------------------------------

AGENT_ENUM = {"claude", "codex"}
MSG_TYPE_ENUM = {"task", "question", "answer", "handoff", "status", "artifact", "warning"}
PRIORITY_ENUM = {"low", "normal", "high"}
ARTIFACT_KIND_ENUM = {"file", "diff", "command-output", "url"}
EVENT_ENUM = {
    "lock_acquired", "lock_released", "lock_heartbeat",
    "watch_started", "watch_crashed", "ack_sent",
    "stale_lock_reaped", "send_completed", "corrupt_line_skipped",
    "override_recorded",
}
UUID_RE = re.compile(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$")
ISO_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?Z$")


# ----- Paths ---------------------------------------------------------------

def bus_root() -> Path:
    root = os.environ.get("AGENT_BUS_DIR")
    return Path(root) if root else Path.cwd() / ".agent-bus"


def msg_path() -> Path:
    return bus_root() / "messages.jsonl"


def evt_path() -> Path:
    return bus_root() / "events.jsonl"


def locks_dir() -> Path:
    return bus_root() / "locks"


def state_dir() -> Path:
    return bus_root() / "state"


def sessions_dir() -> Path:
    return bus_root() / "sessions"


def cursor_path(agent: str) -> Path:
    return state_dir() / f"{agent}.cursor"


def lock_key(repo_relative_path: str) -> str:
    """Normalize a repo-relative path into a unique, filesystem-safe lock key.

    Rules:
      - Lowercase, forward slashes.
      - Strip leading `./` segments. Reject any `..` segment to keep
        the key space rooted in the working tree.
      - Escape literal `_` in path components so `/` (encoded as `__`)
        is an unambiguous separator: any pre-existing `_` becomes `_5f`.
        Result: `a/b__c` → `a__b_5f_5fc`, distinct from `a__b__c` → `a_5f_5fb_5f_5fc`.
    """
    norm = repo_relative_path.strip().lower().replace("\\", "/")
    while norm.startswith("./"):
        norm = norm[2:]
    parts = [p for p in norm.split("/") if p != ""]
    if any(p == ".." for p in parts):
        raise ValidationError(
            f"lock path {repo_relative_path!r} contains '..' — must stay inside repo"
        )
    escaped = [p.replace("_", "_5f") for p in parts]
    return "__".join(escaped)


def lock_path(repo_relative_path: str) -> Path:
    return locks_dir() / f"{lock_key(repo_relative_path)}.lock"


# ----- Env defaults --------------------------------------------------------

VERSION = "0.2.4"
ENV_IDENTITY = "AGENT_BRIDGE_IDENTITY"
ENV_SESSION = "AGENT_BRIDGE_SESSION"


def env_identity() -> str | None:
    v = os.environ.get(ENV_IDENTITY)
    return v if v in AGENT_ENUM else None


def env_session() -> str | None:
    return os.environ.get(ENV_SESSION) or None


def require_identity(args: argparse.Namespace, attr: str) -> str:
    """Return identity from args.<attr> or env, raising with clear msg if absent."""
    v = getattr(args, attr, None)
    if v:
        return v
    env = env_identity()
    if env:
        return env
    raise ValidationError(
        f"missing identity: pass --{attr.replace('_agent','').replace('_','-')} or "
        f"set {ENV_IDENTITY}={'|'.join(sorted(AGENT_ENUM))}"
    )


def require_session(args: argparse.Namespace) -> str:
    v = getattr(args, "session", None)
    if v:
        return v
    env = env_session()
    if env:
        return env
    raise ValidationError(
        f"missing session: pass --session <id> or set {ENV_SESSION}=<id>"
    )


def resolve_path_arg(p: str) -> str:
    """Validate that lock path is repo-relative (not absolute)."""
    if os.path.isabs(p):
        raise ValidationError(
            f"lock path must be repo-relative, got absolute path {p!r}. "
            f"Strip the leading '/' or use a path relative to the repo root."
        )
    return p


# ----- Time / IDs ----------------------------------------------------------

def now_iso() -> str:
    n = datetime.now(timezone.utc)
    return n.strftime("%Y-%m-%dT%H:%M:%S.") + f"{n.microsecond // 1000:03d}Z"


def new_uuid() -> str:
    return str(uuid.uuid4())


def parse_iso(s: str) -> datetime:
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"
    return datetime.fromisoformat(s)


# ----- Validation ----------------------------------------------------------

class ValidationError(Exception):
    pass


def _require(obj: dict, key: str, typ) -> Any:
    if key not in obj:
        raise ValidationError(f"missing field: {key}")
    v = obj[key]
    if not isinstance(v, typ):
        raise ValidationError(f"field {key} must be {typ.__name__}, got {type(v).__name__}")
    return v


def _require_enum(obj: dict, key: str, enum: set) -> str:
    v = _require(obj, key, str)
    if v not in enum:
        raise ValidationError(f"field {key}={v!r} not in {sorted(enum)}")
    return v


def _require_uuid(obj: dict, key: str) -> str:
    v = _require(obj, key, str)
    if not UUID_RE.match(v):
        raise ValidationError(f"field {key}={v!r} not a UUIDv4")
    return v


def _require_iso(obj: dict, key: str) -> str:
    v = _require(obj, key, str)
    if not ISO_RE.match(v):
        raise ValidationError(f"field {key}={v!r} not ISO-8601 UTC (YYYY-MM-DDTHH:MM:SS[.sss]Z)")
    return v


def validate_message(m: dict) -> None:
    _require_uuid(m, "id")
    _require_iso(m, "ts")
    _require(m, "session_id", str)
    _require_enum(m, "from", AGENT_ENUM)
    _require_enum(m, "to", AGENT_ENUM)
    _require_enum(m, "type", MSG_TYPE_ENUM)
    _require(m, "subject", str)
    _require(m, "body", str)
    _require(m, "requires_ack", bool)
    if "priority" in m and m["priority"] is not None:
        if m["priority"] not in PRIORITY_ENUM:
            raise ValidationError(f"priority={m['priority']!r} not in {sorted(PRIORITY_ENUM)}")
    if "reply_to" in m and m["reply_to"] is not None:
        if not UUID_RE.match(m["reply_to"]):
            raise ValidationError(f"reply_to={m['reply_to']!r} not a UUIDv4")
    if "tags" in m and m["tags"] is not None:
        if not isinstance(m["tags"], list) or not all(isinstance(t, str) for t in m["tags"]):
            raise ValidationError("tags must be list[str]")
    if "artifacts" in m and m["artifacts"] is not None:
        if not isinstance(m["artifacts"], list):
            raise ValidationError("artifacts must be list")
        for i, a in enumerate(m["artifacts"]):
            if not isinstance(a, dict):
                raise ValidationError(f"artifacts[{i}] must be object")
            if a.get("kind") not in ARTIFACT_KIND_ENUM:
                raise ValidationError(f"artifacts[{i}].kind not in {sorted(ARTIFACT_KIND_ENUM)}")
            if not isinstance(a.get("ref"), str):
                raise ValidationError(f"artifacts[{i}].ref must be str")


def validate_event(e: dict) -> None:
    _require_uuid(e, "id")
    _require_iso(e, "ts")
    _require(e, "session_id", str)
    actor = _require(e, "actor", str)
    if actor not in AGENT_ENUM and actor != "agentbridge":
        raise ValidationError(f"actor={actor!r} not in claude|codex|agentbridge")
    _require_enum(e, "event", EVENT_ENUM)
    if "meta" in e and e["meta"] is not None and not isinstance(e["meta"], dict):
        raise ValidationError("meta must be object")


# ----- Atomic JSONL append -------------------------------------------------

def atomic_append(path: Path, obj: dict) -> int:
    """Append one JSON line. Returns byte offset where the line was written.

    Uses fcntl.flock(LOCK_EX) on the target file so concurrent writers
    serialize. fsync after write ensures durability before lock release.
    """
    line = json.dumps(obj, separators=(",", ":"), ensure_ascii=False) + "\n"
    data = line.encode("utf-8")
    path.parent.mkdir(parents=True, exist_ok=True)
    fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o600)
    try:
        fcntl.flock(fd, fcntl.LOCK_EX)
        offset = os.lseek(fd, 0, os.SEEK_END)
        os.write(fd, data)
        os.fsync(fd)
    finally:
        try:
            fcntl.flock(fd, fcntl.LOCK_UN)
        finally:
            os.close(fd)
    return offset


# ----- Event helper --------------------------------------------------------

def emit_event(actor: str, event: str, session_id: str, meta: dict | None = None) -> dict:
    e = {
        "id": new_uuid(),
        "ts": now_iso(),
        "session_id": session_id,
        "actor": actor,
        "event": event,
        "meta": meta or {},
    }
    validate_event(e)
    atomic_append(evt_path(), e)
    return e


# ----- Commands ------------------------------------------------------------

def cmd_init(args: argparse.Namespace) -> int:
    root = bus_root()
    already = root.exists() and msg_path().exists() and msg_path().stat().st_size > 0
    for d in (root, locks_dir(), state_dir(), sessions_dir()):
        d.mkdir(parents=True, exist_ok=True)
    for f in (msg_path(), evt_path()):
        if not f.exists():
            f.touch()
    for p in (root, locks_dir(), state_dir(), sessions_dir()):
        os.chmod(p, 0o700)
    for f in (msg_path(), evt_path()):
        os.chmod(f, 0o600)
    if already:
        print(f"bus already initialized at {root} (perms refreshed, data preserved)")
        return 0
    print(f"initialized bus at {root}")
    if sys.stdout.isatty():
        print()
        print("next steps:")
        print(f"  export AGENT_BRIDGE_IDENTITY=claude   # or codex")
        print(f"  export AGENT_BRIDGE_SESSION=<id>      # any name you pick")
        print(f"  {os.path.basename(sys.argv[0])} agent-prompt --as claude --session <id>")
        print(f"  {os.path.basename(sys.argv[0])} tmux --session <id>   # 4-pane workspace")
    return 0


def cmd_send(args: argparse.Namespace) -> int:
    sender = getattr(args, "from") or env_identity()
    if not sender:
        raise ValidationError(
            f"missing sender: pass --from or set {ENV_IDENTITY}"
        )
    if sender == args.to:
        raise ValidationError(f"self-message refused: --from and --to are both {sender!r}")
    session = require_session(args)

    body = args.body or ""
    if args.body_file:
        if args.body:
            raise ValidationError("--body and --body-file are mutually exclusive")
        if args.body_file == "-":
            body = sys.stdin.read()
        else:
            body = Path(args.body_file).read_text()

    msg = {
        "id": new_uuid(),
        "ts": now_iso(),
        "session_id": session,
        "from": sender,
        "to": args.to,
        "type": args.type,
        "subject": args.subject,
        "body": body,
        "requires_ack": bool(args.ack),
        "priority": args.priority,
        "tags": args.tag or [],
    }
    if args.reply_to:
        if _find_message(args.reply_to) is None:
            raise ValidationError(f"--reply-to {args.reply_to} not found in messages.jsonl")
        msg["reply_to"] = args.reply_to
    if args.artifact:
        msg["artifacts"] = [_parse_artifact(a) for a in args.artifact]
    validate_message(msg)
    if args.dry_run:
        # Print the validated payload without committing. Useful for
        # agents previewing structured messages before send.
        print(json.dumps(msg, indent=2, ensure_ascii=False))
        return 0
    atomic_append(msg_path(), msg)
    emit_event("agentbridge", "send_completed", session, {
        "msg_id": msg["id"], "from": msg["from"], "to": msg["to"], "type": msg["type"],
    })
    print(msg["id"])
    if args.ack and sys.stderr.isatty():
        print(f"sent → {args.to} (awaiting ack)", file=sys.stderr)
    return 0


def _parse_artifact(spec: str) -> dict:
    parts = dict(p.split("=", 1) for p in spec.split(",") if "=" in p)
    if "kind" not in parts or "ref" not in parts:
        raise ValidationError(f"artifact spec {spec!r} needs kind=...,ref=...")
    out = {"kind": parts["kind"], "ref": parts["ref"]}
    if "sha256" in parts:
        out["sha256"] = parts["sha256"]
    return out


def _read_cursor(agent: str) -> int:
    p = cursor_path(agent)
    if not p.exists():
        return 0
    try:
        return int(p.read_text().strip() or "0")
    except ValueError:
        return 0


def _write_cursor(agent: str, offset: int) -> None:
    p = cursor_path(agent)
    p.parent.mkdir(parents=True, exist_ok=True)
    tmp = p.with_suffix(p.suffix + ".tmp")
    tmp.write_text(str(offset))
    os.replace(tmp, p)


def _scan_new(path: Path, start: int) -> Iterable[tuple[int, int, bytes]]:
    """Yield (line_start_offset, line_end_offset, raw_bytes) for new lines."""
    if not path.exists():
        return
    with open(path, "rb") as f:
        f.seek(start)
        while True:
            line_start = f.tell()
            line = f.readline()
            if not line:
                break
            if not line.endswith(b"\n"):
                # partial line at end-of-file; do NOT consume
                break
            yield line_start, f.tell(), line


def cmd_watch(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    session = args.session or env_session() or ""
    is_tty = sys.stdout.isatty()
    # --pretty defaults to True on TTY, False when piped. --json forces JSON.
    if args.json:
        pretty = False
    elif args.pretty:
        pretty = True
    else:
        pretty = is_tty
    color = pretty and is_tty

    emit_event(agent, "watch_started", session, {"session": session})
    cursor = _read_cursor(agent)
    interval = args.interval / 1000.0
    try:
        while True:
            new_cursor = cursor
            for start, end, raw in _scan_new(msg_path(), cursor):
                try:
                    obj = json.loads(raw)
                    validate_message(obj)
                except (json.JSONDecodeError, ValidationError) as ex:
                    emit_event(agent, "corrupt_line_skipped", session, {
                        "byte_offset": start, "error": str(ex),
                    })
                    new_cursor = end
                    continue
                if obj.get("to") == agent and (not session or obj.get("session_id") == session):
                    if pretty:
                        sys.stdout.write(format_message_pretty(obj, color=color) + "\n")
                    else:
                        sys.stdout.write(json.dumps(obj) + "\n")
                    sys.stdout.flush()
                    if args.bell and obj.get("requires_ack") and is_tty:
                        sys.stderr.write("\a")
                        sys.stderr.flush()
                new_cursor = end
            if new_cursor != cursor:
                _write_cursor(agent, new_cursor)
                cursor = new_cursor
            if args.once:
                return 0
            time.sleep(interval)
    except KeyboardInterrupt:
        return 0


def _find_message(msg_id: str) -> dict | None:
    if not msg_path().exists():
        return None
    with open(msg_path(), "rb") as f:
        for raw in f:
            try:
                obj = json.loads(raw)
            except json.JSONDecodeError:
                continue
            if obj.get("id") == msg_id:
                return obj
    return None


def cmd_ack(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    original = _find_message(args.reply_to)
    if original is None:
        print(f"error: message {args.reply_to} not found", file=sys.stderr)
        return 2
    if original.get("to") != agent:
        print(f"error: only recipient ({original.get('to')}) may ack", file=sys.stderr)
        return 2
    body = args.body or ""
    if args.body_file:
        if args.body:
            raise ValidationError("--body and --body-file are mutually exclusive")
        body = sys.stdin.read() if args.body_file == "-" else Path(args.body_file).read_text()
    ack_msg = {
        "id": new_uuid(),
        "ts": now_iso(),
        "session_id": original["session_id"],
        "from": agent,
        "to": original["from"],
        "type": "answer",
        "subject": f"ack: {original.get('subject', '')}",
        "body": body,
        "requires_ack": False,
        "reply_to": args.reply_to,
        "priority": "normal",
        "tags": ["ack"],
    }
    validate_message(ack_msg)
    atomic_append(msg_path(), ack_msg)
    emit_event(agent, "ack_sent", original["session_id"], {
        "ack_id": ack_msg["id"], "reply_to": args.reply_to,
    })
    print(ack_msg["id"])
    return 0


# ----- Locks ---------------------------------------------------------------

def _read_lock(path: Path) -> dict | None:
    if not path.exists():
        return None
    try:
        return json.loads(path.read_text())
    except json.JSONDecodeError:
        return None


def _lock_is_stale(lock: dict) -> bool:
    try:
        hb = parse_iso(lock["heartbeat_at"])
        ttl = int(lock["ttl_seconds"])
    except (KeyError, ValueError):
        return True
    return (datetime.now(timezone.utc) - hb).total_seconds() > ttl


def _write_lock_atomic(path: Path, body: dict, exclusive: bool) -> bool:
    """Write lock file. If exclusive, fail when path exists. Returns True on success."""
    path.parent.mkdir(parents=True, exist_ok=True)
    flags = os.O_WRONLY | os.O_CREAT
    if exclusive:
        flags |= os.O_EXCL
    try:
        fd = os.open(path, flags, 0o600)
    except OSError as e:
        if e.errno == errno.EEXIST:
            return False
        raise
    try:
        os.write(fd, json.dumps(body, indent=2).encode("utf-8"))
        os.fsync(fd)
    finally:
        os.close(fd)
    return True


def cmd_lock_acquire(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    rel = resolve_path_arg(args.path)
    path = lock_path(rel)
    session = args.session or env_session() or ""
    deadline = time.monotonic() + max(args.wait, 0)

    while True:
        existing = _read_lock(path)
        if existing is not None and not _lock_is_stale(existing):
            if time.monotonic() >= deadline:
                print(f"error: lock held by {existing.get('owner')} since {existing.get('acquired_at')}",
                      file=sys.stderr)
                return 1
            time.sleep(0.1)
            continue
        if existing is not None and _lock_is_stale(existing):
            emit_event(agent, "stale_lock_reaped", session, {
                "path": rel, "prior_lock_id": existing.get("lock_id"),
                "prior_owner": existing.get("owner"),
            })
            path.unlink(missing_ok=True)
        body = {
            "lock_id": new_uuid(),
            "owner": agent,
            "path": rel,
            "acquired_at": now_iso(),
            "heartbeat_at": now_iso(),
            "ttl_seconds": args.ttl,
            "reason": args.reason or "",
        }
        if _write_lock_atomic(path, body, exclusive=True):
            emit_event(agent, "lock_acquired", session, {
                "path": rel, "lock_id": body["lock_id"],
            })
            print(body["lock_id"])
            return 0
        # Race: another writer beat us between unlink and create.
        if time.monotonic() >= deadline:
            print("error: lock acquire raced", file=sys.stderr)
            return 1
        time.sleep(0.1)


def cmd_lock_heartbeat(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    rel = resolve_path_arg(args.path)
    path = lock_path(rel)
    session = args.session or env_session() or ""
    existing = _read_lock(path)
    if existing is None:
        print("error: no lock to heartbeat", file=sys.stderr)
        return 1
    if existing.get("owner") != agent:
        print(f"error: lock owned by {existing.get('owner')}, not {agent}", file=sys.stderr)
        return 1
    existing["heartbeat_at"] = now_iso()
    tmp = path.with_suffix(path.suffix + ".tmp")
    tmp.write_text(json.dumps(existing, indent=2))
    os.replace(tmp, path)
    emit_event(agent, "lock_heartbeat", session, {
        "path": rel, "lock_id": existing.get("lock_id"),
    })
    return 0


def cmd_lock_release(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    rel = resolve_path_arg(args.path)
    path = lock_path(rel)
    session = args.session or env_session() or ""
    existing = _read_lock(path)
    if existing is None:
        print("error: no lock to release", file=sys.stderr)
        return 1
    if existing.get("owner") != agent and not args.force:
        print(f"error: lock owned by {existing.get('owner')}, not {agent}; use --force to override", file=sys.stderr)
        return 1
    path.unlink()
    emit_event(agent, "lock_released", session, {
        "path": rel, "lock_id": existing.get("lock_id"),
    })
    return 0


def cmd_lock_reap(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    rel = resolve_path_arg(args.path)
    path = lock_path(rel)
    session = args.session or env_session() or ""
    existing = _read_lock(path)
    if existing is None:
        print("error: no lock at path", file=sys.stderr)
        return 1
    stale = _lock_is_stale(existing)
    if not stale:
        if not args.force_stale:
            print("error: lock not stale; emergency override requires --force-stale "
                  f"AND prior warning msg tagged 'override:{lock_key(rel)}'",
                  file=sys.stderr)
            return 1
        owner = existing.get("owner")
        warn = _find_override_warning(agent, owner, lock_key(rel), args.window)
        if warn is None:
            print(f"error: override refused — no recent warning msg from {agent}"
                  f" to {owner} tagged 'override:{lock_key(rel)}' within {args.window}s",
                  file=sys.stderr)
            return 1
        emit_event(agent, "override_recorded", session, {
            "path": rel, "lock_id": existing.get("lock_id"),
            "prior_owner": owner, "warning_msg_id": warn["id"],
        })
    path.unlink()
    emit_event(agent, "stale_lock_reaped", session, {
        "path": rel, "lock_id": existing.get("lock_id"),
        "prior_owner": existing.get("owner"), "was_stale": stale,
    })
    return 0


# ----- New UX commands: key, with-hold, inbox, agent-prompt, version -------

def cmd_lock_key(args: argparse.Namespace) -> int:
    rel = resolve_path_arg(args.path)
    print(lock_key(rel))
    return 0


def cmd_lock_with_hold(args: argparse.Namespace) -> int:
    """Acquire lock, run user command, auto-heartbeat in background, release on exit."""
    import signal
    import subprocess as sp
    import threading

    agent = require_identity(args, "as_agent")
    rel = resolve_path_arg(args.path)
    session = args.session or env_session() or ""

    cmd = list(args.cmd)
    if cmd and cmd[0] == "--":
        cmd.pop(0)
    if not cmd:
        print("error: no command given after `--`", file=sys.stderr)
        return 2

    # Block signals during acquire so a SIGINT/SIGTERM cannot land
    # between acquire success and handler install (would orphan the lock).
    blocked = {signal.SIGINT, signal.SIGTERM}
    prev_mask = signal.pthread_sigmask(signal.SIG_BLOCK, blocked)

    fake_args = argparse.Namespace(
        as_agent=agent, path=rel, ttl=args.ttl, reason=args.reason,
        session=session, wait=args.wait,
    )
    rc = cmd_lock_acquire(fake_args)
    if rc != 0:
        signal.pthread_sigmask(signal.SIG_SETMASK, prev_mask)
        return rc

    stop = threading.Event()
    hb_interval = max(1, args.ttl // 3)

    def heartbeat_loop():
        while not stop.wait(hb_interval):
            try:
                cmd_lock_heartbeat(argparse.Namespace(
                    as_agent=agent, path=rel, session=session,
                ))
            except Exception:
                pass

    hb_thread = threading.Thread(target=heartbeat_loop, daemon=True)
    hb_thread.start()

    proc = None
    cleaned = threading.Event()

    def cleanup(*_):
        if cleaned.is_set():
            return
        cleaned.set()
        stop.set()
        if proc is not None and proc.poll() is None:
            proc.terminate()
        try:
            cmd_lock_release(argparse.Namespace(
                as_agent=agent, path=rel, force=False, session=session,
            ))
        except Exception as ex:
            print(f"warning: release failed: {ex}", file=sys.stderr)

    signal.signal(signal.SIGINT, lambda *a: (cleanup(), sys.exit(130)))
    signal.signal(signal.SIGTERM, lambda *a: (cleanup(), sys.exit(143)))
    # Handlers installed — safe to unblock signals.
    signal.pthread_sigmask(signal.SIG_SETMASK, prev_mask)

    try:
        proc = sp.Popen(cmd)
        rc = proc.wait()
    finally:
        cleanup()
    return rc


def cmd_inbox(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    session = args.session or env_session() or ""
    cursor_off = _read_cursor(agent) if args.unread else 0
    is_tty = sys.stdout.isatty()
    pretty = (not args.json) and (args.pretty or is_tty)
    color = pretty and is_tty

    count = 0
    for start, end, raw in _scan_new(msg_path(), cursor_off):
        try:
            obj = json.loads(raw)
        except json.JSONDecodeError:
            continue
        if obj.get("to") != agent:
            continue
        if session and obj.get("session_id") != session:
            continue
        if args.type and obj.get("type") != args.type:
            continue
        if args.needs_ack and not obj.get("requires_ack"):
            continue
        if pretty:
            print(format_message_pretty(obj, color=color))
        else:
            print(json.dumps(obj))
        count += 1
    if pretty and is_tty:
        print(f"\n— {count} message(s) {'unread' if args.unread else 'total'} for {agent}"
              f"{' in session '+session if session else ''}",
              file=sys.stderr)
    return 0


def cmd_agent_prompt(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    session = args.session or env_session() or "<your-session-id>"
    other = next(iter(AGENT_ENUM - {agent}))
    self_path = os.path.abspath(sys.argv[0])
    print(f"""# AgentBridge — operator instructions for {agent}

You share a workspace with **{other}**. All inter-agent coordination
goes through the local CLI `agentbridge` (no network).

Your identity is `{agent}` and your session is `{session}`.
Set these so you don't pass them on every call:

```bash
export AGENT_BRIDGE_IDENTITY={agent}
export AGENT_BRIDGE_SESSION={session}
```

## Inbox

- Check pending work: `{self_path} inbox`
- Watch for new messages live: `{self_path} watch`
- For ack-required messages, reply with:
  `{self_path} ack --reply-to <msg-id> --body "..."`

## Sending (prefer these high-level verbs)

The three intent verbs cover ~90% of agent-to-agent traffic:

- **Hand off a task** (with optional wait-for-ack):
  `{self_path} handoff --to {other} --task "subject" --body "details" [--wait-ack]`
  Returns the message id, or (with `--wait-ack`) blocks up to 300s
  for the recipient's answer and prints a JSON summary.

- **Ask a question and wait for an answer** (single call):
  `{self_path} ask --to {other} --subject "..." --body "..."`
  Blocks until the answer arrives, prints the answer body on stdout.
  Add `--json` for the full reply object.

- **Edit a file under a lock** (auto heartbeat + release):
  `{self_path} lock with-hold --path <repo/relative/path> -- <your-editor-cmd>`

Fall through to the lower-level primitives when you need them:
- `{self_path} send --to {other} --type {{task|question|answer|handoff|status|artifact|warning}} --subject "..." --body "..." [--ack]`
- For long bodies pipe stdin: `... --body-file -` and paste, end with Ctrl-D.
- `{self_path} ack --reply-to <msg-id> --body "..."` to reply to an `--ack` message.

## Edit locks (REQUIRED before editing any file {other} might touch)

- Quick path: `{self_path} lock with-hold --path <repo/relative/path> -- <your-editor-cmd>`
  - Acquires lock, auto-heartbeats, releases on command exit.
- Manual:
  - `{self_path} lock acquire --path <p> --ttl 120 [--wait 30] --reason "..."`
  - heartbeat every 20s if you hold longer than ttl/3
  - `{self_path} lock release --path <p>`
- If contended, send a question to {other} and wait for ack rather than overriding.
- Emergency only: send a `warning` msg tagged `override:<key>` (see
  `{self_path} lock key <path>`), then `lock reap --force-stale`.

## Observability

- `{self_path} status` — health snapshot
- `{self_path} transcript` — full session timeline
""")
    return 0


# ----- Composite verbs (Phase 7: agent-facing one-shot intents) -----------

def _wait_for_reply(parent_id: str, agent: str, session: str,
                    timeout_s: int, poll_ms: int = 200) -> dict | None:
    """Block until a message with reply_to == parent_id (addressed to `agent`)
    appears in messages.jsonl, or timeout expires. Returns the reply or None."""
    deadline = time.monotonic() + max(timeout_s, 0)
    seen_offset = msg_path().stat().st_size if msg_path().exists() else 0
    while True:
        for _, end, raw in _scan_new(msg_path(), seen_offset):
            seen_offset = end
            try:
                obj = json.loads(raw)
            except json.JSONDecodeError:
                continue
            if obj.get("reply_to") != parent_id:
                continue
            if obj.get("to") != agent:
                continue
            if session and obj.get("session_id") != session:
                continue
            return obj
        if time.monotonic() >= deadline:
            return None
        time.sleep(poll_ms / 1000.0)


def cmd_handoff(args: argparse.Namespace) -> int:
    """Send a task to the other agent. Optionally block until they ack.

    Intent verb that bundles `send --type task [--ack]` + (optional)
    poll for the answer reply. Designed so an agent issues one call
    per handoff rather than orchestrating send + watch separately.
    """
    sender = require_identity(args, "from_agent")
    session = require_session(args)
    if sender == args.to:
        raise ValidationError(f"handoff target must differ from sender ({sender})")

    body = args.body or ""
    if args.body_file:
        body = sys.stdin.read() if args.body_file == "-" else Path(args.body_file).read_text()

    wait = args.wait_ack
    msg = {
        "id": new_uuid(),
        "ts": now_iso(),
        "session_id": session,
        "from": sender,
        "to": args.to,
        "type": "task",
        "subject": args.task,
        "body": body,
        "requires_ack": wait,
        "priority": args.priority,
        "tags": (args.tag or []) + ["handoff"],
    }
    validate_message(msg)
    atomic_append(msg_path(), msg)
    emit_event("agentbridge", "send_completed", session, {
        "msg_id": msg["id"], "from": sender, "to": args.to, "type": "task",
    })

    if not wait:
        print(msg["id"])
        return 0

    if sys.stderr.isatty():
        print(f"handoff → {args.to} (waiting up to {args.timeout}s for ack)",
              file=sys.stderr)
    reply = _wait_for_reply(msg["id"], sender, session, args.timeout)
    if reply is None:
        print(f"error: no ack from {args.to} within {args.timeout}s "
              f"(msg_id={msg['id']})", file=sys.stderr)
        return 1
    print(json.dumps({
        "handoff_id": msg["id"],
        "ack_id": reply["id"],
        "ack_from": reply["from"],
        "ack_body": reply.get("body", ""),
    }, indent=2))
    return 0


def cmd_ask(args: argparse.Namespace) -> int:
    """Send a question and block until an answer arrives (or timeout).

    Single-call request/response. Returns the answer body on stdout
    so an agent can capture it directly without polling watch.
    """
    sender = require_identity(args, "from_agent")
    session = require_session(args)
    if sender == args.to:
        raise ValidationError(f"ask target must differ from sender ({sender})")

    body = args.body or ""
    if args.body_file:
        body = sys.stdin.read() if args.body_file == "-" else Path(args.body_file).read_text()

    msg = {
        "id": new_uuid(),
        "ts": now_iso(),
        "session_id": session,
        "from": sender,
        "to": args.to,
        "type": "question",
        "subject": args.subject,
        "body": body,
        "requires_ack": True,
        "priority": args.priority,
        "tags": args.tag or [],
    }
    validate_message(msg)
    atomic_append(msg_path(), msg)
    emit_event("agentbridge", "send_completed", session, {
        "msg_id": msg["id"], "from": sender, "to": args.to, "type": "question",
    })

    if sys.stderr.isatty():
        print(f"ask → {args.to} (waiting up to {args.timeout}s)",
              file=sys.stderr)
    reply = _wait_for_reply(msg["id"], sender, session, args.timeout)
    if reply is None:
        print(f"error: no answer from {args.to} within {args.timeout}s "
              f"(msg_id={msg['id']})", file=sys.stderr)
        return 1
    if args.json:
        print(json.dumps(reply, indent=2))
    else:
        print(reply.get("body", ""))
    return 0


_BASH_COMPLETION = r"""# agentbridge bash completion
# Install: agentbridge completion bash > ~/.local/share/bash-completion/completions/agentbridge
_agentbridge() {
    local cur prev words cword
    _init_completion || return

    local top="init version status send watch ack inbox lock transcript tmux agent-prompt cursor completion"
    local lock_sub="acquire heartbeat release reap key with-hold"
    local cursor_sub="show reset"
    local agents="claude codex"
    local types="task question answer handoff status artifact warning"
    local priorities="low normal high"

    if (( cword == 1 )); then
        COMPREPLY=( $(compgen -W "$top" -- "$cur") )
        return
    fi

    case "${words[1]}" in
        lock)
            if (( cword == 2 )); then
                COMPREPLY=( $(compgen -W "$lock_sub" -- "$cur") ); return
            fi ;;
        cursor)
            if (( cword == 2 )); then
                COMPREPLY=( $(compgen -W "$cursor_sub" -- "$cur") ); return
            fi ;;
        completion)
            if (( cword == 2 )); then
                COMPREPLY=( $(compgen -W "bash zsh" -- "$cur") ); return
            fi ;;
    esac

    case "$prev" in
        --as|--from|--to)  COMPREPLY=( $(compgen -W "$agents" -- "$cur") ); return ;;
        --type)            COMPREPLY=( $(compgen -W "$types" -- "$cur") ); return ;;
        --priority)        COMPREPLY=( $(compgen -W "$priorities" -- "$cur") ); return ;;
        --format)          COMPREPLY=( $(compgen -W "pretty jsonl" -- "$cur") ); return ;;
        --body-file|--path) _filedir; return ;;
    esac

    if [[ "$cur" == -* ]]; then
        local flags="--as --from --to --type --subject --body --body-file --session \
                     --ack --reply-to --priority --tag --artifact --dry-run \
                     --once --pretty --json --bell --interval \
                     --path --ttl --reason --wait --force --force-stale --window \
                     --name --format --unread --needs-ack --to --help"
        COMPREPLY=( $(compgen -W "$flags" -- "$cur") )
    fi
}
complete -F _agentbridge agentbridge
"""

_ZSH_COMPLETION = r"""#compdef agentbridge
# agentbridge zsh completion
# Install: agentbridge completion zsh > "${fpath[1]}/_agentbridge" && compinit
_agentbridge() {
    local -a top lock_sub cursor_sub
    top=(init version status send watch ack inbox lock transcript tmux agent-prompt cursor completion)
    lock_sub=(acquire heartbeat release reap key with-hold)
    cursor_sub=(show reset)

    if (( CURRENT == 2 )); then
        _describe 'command' top
        return
    fi
    case "$words[2]" in
        lock)       (( CURRENT == 3 )) && { _describe 'lock subcommand' lock_sub; return } ;;
        cursor)     (( CURRENT == 3 )) && { _describe 'cursor subcommand' cursor_sub; return } ;;
        completion) (( CURRENT == 3 )) && { _values 'shell' bash zsh; return } ;;
    esac
    _arguments \
        '--as=[identity]:agent:(claude codex)' \
        '--from=[sender]:agent:(claude codex)' \
        '--to=[recipient]:agent:(claude codex)' \
        '--type=[message type]:type:(task question answer handoff status artifact warning)' \
        '--priority=[priority]:priority:(low normal high)' \
        '--format=[transcript format]:format:(pretty jsonl)' \
        '--body-file=[body file]:file:_files' \
        '--path=[repo-relative path]:file:_files' \
        '*:: :->args'
}
_agentbridge "$@"
"""


def cmd_completion(args: argparse.Namespace) -> int:
    if args.shell == "bash":
        sys.stdout.write(_BASH_COMPLETION)
    else:
        sys.stdout.write(_ZSH_COMPLETION)
    return 0


def cmd_cursor_show(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    off = _read_cursor(agent)
    p = cursor_path(agent)
    exists = p.exists()
    msg_size = msg_path().stat().st_size if msg_path().exists() else 0
    print(f"cursor: {p}")
    print(f"agent: {agent}")
    print(f"offset: {off}")
    print(f"file present: {exists}")
    print(f"messages.jsonl size: {msg_size}  ({max(0, msg_size - off)} bytes unscanned)")
    return 0


def cmd_cursor_reset(args: argparse.Namespace) -> int:
    agent = require_identity(args, "as_agent")
    p = cursor_path(agent)
    if not p.exists():
        print(f"no cursor for {agent} (already absent)")
        return 0
    if args.to is not None:
        if args.to < 0:
            raise ValidationError(f"--to must be >= 0, got {args.to}")
        _write_cursor(agent, args.to)
        print(f"cursor for {agent} set to offset {args.to}")
    else:
        p.unlink()
        print(f"cursor for {agent} removed (next watch starts from offset 0)")
    return 0


# ----- Pretty rendering ----------------------------------------------------

def _color(enabled: bool):
    if enabled:
        return {
            "msg": "\033[36m", "evt": "\033[35m", "ts": "\033[2m",
            "ack": "\033[33m", "task": "\033[1;32m", "warn": "\033[1;31m",
            "rst": "\033[0m",
        }
    return {k: "" for k in ("msg", "evt", "ts", "ack", "task", "warn", "rst")}


def format_message_pretty(m: dict, color: bool = False) -> str:
    c = _color(color)
    kind_color = {
        "task": c["task"], "warning": c["warn"], "answer": c["ack"],
    }.get(m.get("type"), c["msg"])
    line = (
        f"{c['ts']}{m['ts']}{c['rst']}  "
        f"{kind_color}{m.get('from')}→{m.get('to')} [{m.get('type')}]{c['rst']}  "
        f"{m.get('subject','')}"
    )
    if m.get("reply_to"):
        line += f"  {c['ts']}(reply_to={m['reply_to'][:8]}){c['rst']}"
    if m.get("requires_ack"):
        line += f"  {c['ack']}[needs-ack]{c['rst']}"
    if m.get("body"):
        body = m["body"]
        if len(body) > 200:
            body = body[:200] + "…"
        line += f"\n    {body}"
    return line


def format_event_pretty(e: dict, color: bool = False) -> str:
    c = _color(color)
    meta = e.get("meta") or {}
    bits = []
    for k in ("path", "msg_id", "ack_id", "lock_id", "reply_to",
             "prior_owner", "byte_offset", "warning_msg_id"):
        if k in meta:
            v = str(meta[k])
            if len(v) > 36:
                v = v[:8] + "…"
            bits.append(f"{k}={v}")
    return (
        f"{c['ts']}{e['ts']}{c['rst']}  "
        f"{c['evt']}EVT {e.get('actor')} {e.get('event')}{c['rst']}  "
        + " ".join(bits)
    ).rstrip()


# ----- Iteration helpers ---------------------------------------------------

def _iter_jsonl(path: Path) -> Iterable[dict]:
    if not path.exists():
        return
    with open(path, "rb") as f:
        for raw in f:
            raw = raw.strip()
            if not raw:
                continue
            try:
                yield json.loads(raw)
            except json.JSONDecodeError:
                continue


def _iter_jsonl_with_offset(path: Path) -> Iterable[tuple[int, dict]]:
    """Yield (byte_offset, parsed_obj) for each well-formed JSONL line."""
    if not path.exists():
        return
    with open(path, "rb") as f:
        while True:
            off = f.tell()
            raw = f.readline()
            if not raw:
                break
            stripped = raw.strip()
            if not stripped:
                continue
            try:
                yield off, json.loads(stripped)
            except json.JSONDecodeError:
                continue


def _iter_messages() -> Iterable[dict]:
    return _iter_jsonl(msg_path())


def _iter_events() -> Iterable[dict]:
    return _iter_jsonl(evt_path())


# ----- Override guardrail (Phase 3.4) --------------------------------------

OVERRIDE_TAG_PREFIX = "override:"


def _find_override_warning(reaper: str, owner: str, lkey: str, window_s: int) -> dict | None:
    """Locate recent `warning` msg from reaper→owner authorizing override of lkey."""
    cutoff = datetime.now(timezone.utc).timestamp() - window_s
    target_tag = f"{OVERRIDE_TAG_PREFIX}{lkey}"
    latest = None
    for m in _iter_messages():
        if m.get("type") != "warning":
            continue
        if m.get("from") != reaper or m.get("to") != owner:
            continue
        if target_tag not in (m.get("tags") or []):
            continue
        try:
            ts = parse_iso(m["ts"]).timestamp()
        except Exception:
            continue
        if ts < cutoff:
            continue
        if latest is None or ts > parse_iso(latest["ts"]).timestamp():
            latest = m
    return latest


# ----- Transcript (Phase 4.2) ---------------------------------------------

def cmd_transcript(args: argparse.Namespace) -> int:
    session = args.session
    # Sort key: (ts, kind_rank, byte_offset). Messages and events come
    # from separate files, so byte offsets aren't comparable across them;
    # break inter-file ties via kind_rank (msg < evt) for determinism.
    entries: list[tuple[str, int, int, str, dict]] = []
    for off, m in _iter_jsonl_with_offset(msg_path()):
        if session and m.get("session_id") != session:
            continue
        entries.append((m["ts"], 0, off, "msg", m))
    for off, e in _iter_jsonl_with_offset(evt_path()):
        if session and e.get("session_id") != session:
            continue
        entries.append((e["ts"], 1, off, "evt", e))
    entries.sort(key=lambda x: (x[0], x[1], x[2]))
    entries = [(ts, kind, obj) for ts, _, _, kind, obj in entries]

    if args.format == "jsonl":
        for ts, kind, obj in entries:
            sys.stdout.write(json.dumps({"kind": kind, **obj}) + "\n")
        return 0

    for ts, kind, obj in entries:
        if kind == "msg":
            line = (
                f"{ts}  MSG  {obj.get('from')}→{obj.get('to')}  "
                f"[{obj.get('type')}] {obj.get('subject', '')}"
            )
            if obj.get("reply_to"):
                line += f"  (reply_to={obj['reply_to'][:8]})"
            if obj.get("requires_ack"):
                line += "  [needs-ack]"
        else:
            meta = obj.get("meta") or {}
            extra = " ".join(f"{k}={v}" for k, v in meta.items() if k in
                             {"path", "lock_id", "msg_id", "reply_to", "ack_id",
                              "prior_owner", "byte_offset"})
            line = f"{ts}  EVT  {obj.get('actor')}  {obj.get('event')}  {extra}".rstrip()
        print(line)
    return 0


# ----- tmux helper (Phase 4.3) --------------------------------------------

def cmd_tmux(args: argparse.Namespace) -> int:
    import shutil
    import subprocess as sp
    if not shutil.which("tmux"):
        print("error: tmux not on PATH", file=sys.stderr)
        return 2
    session = args.session
    log_claude = sessions_dir() / f"{session}.claude.log"
    log_codex = sessions_dir() / f"{session}.codex.log"
    sessions_dir().mkdir(parents=True, exist_ok=True)
    self_path = os.path.abspath(sys.argv[0])
    name = args.name or f"agentbridge-{session}"
    try:
        sp.check_call(["tmux", "kill-session", "-t", name],
                      stderr=sp.DEVNULL, stdout=sp.DEVNULL)
    except sp.CalledProcessError:
        pass
    # 4 panes: claude shell, codex shell, watch claude, watch codex
    claude_cmd = f"script -q -f {log_claude} bash"
    codex_cmd = f"script -q -f {log_codex} bash"
    watch_claude = f"{self_path} watch --as claude --session {session}"
    watch_codex = f"{self_path} watch --as codex --session {session}"
    sp.check_call(["tmux", "new-session", "-d", "-s", name, claude_cmd])
    sp.check_call(["tmux", "split-window", "-t", name, "-h", codex_cmd])
    sp.check_call(["tmux", "split-window", "-t", f"{name}.0", "-v", watch_claude])
    sp.check_call(["tmux", "split-window", "-t", f"{name}.2", "-v", watch_codex])
    sp.check_call(["tmux", "select-layout", "-t", name, "tiled"])
    print(f"tmux session: {name}")
    print(f"attach: tmux attach -t {name}")
    print(f"logs: {log_claude}  {log_codex}")
    return 0


# ----- Status --------------------------------------------------------------

def cmd_status(args: argparse.Namespace) -> int:
    root = bus_root()
    print(f"bus: {root}  (exists={root.exists()})")
    ident = env_identity() or os.environ.get(ENV_IDENTITY) or "unset"
    sess = env_session() or "unset"
    raw_ident = os.environ.get(ENV_IDENTITY)
    if raw_ident and raw_ident not in AGENT_ENUM:
        ident = f"{raw_ident!r} (invalid — must be {'|'.join(sorted(AGENT_ENUM))})"
    print(f"{ENV_IDENTITY}: {ident}")
    print(f"{ENV_SESSION}: {sess}")
    if not root.exists():
        return 0
    mc = sum(1 for _ in _iter_messages())
    ec = sum(1 for _ in _iter_events())
    locks = list(locks_dir().glob("*.lock")) if locks_dir().exists() else []
    print(f"messages: {mc}  events: {ec}  active locks: {len(locks)}")
    for p in locks:
        body = _read_lock(p)
        stale = body and _lock_is_stale(body)
        if body:
            print(f"  - {body.get('path')}  owner={body.get('owner')}  "
                  f"stale={stale}  heartbeat={body.get('heartbeat_at')}")
    if state_dir().exists():
        for c in state_dir().glob("*.cursor"):
            print(f"  cursor {c.name}: {c.read_text().strip()}")
    return 0


# ----- Interactive TUI -----------------------------------------------------

def _fmt_msg_row(obj: dict) -> str:
    """One-line summary of a message for the live stream pane."""
    ts = obj.get("ts", "")[11:19]
    arrow = f"{obj.get('from', '?')}→{obj.get('to', '?')}"
    typ = obj.get("type", "?")
    subj = obj.get("subject", "")
    if len(subj) > 80:
        subj = subj[:77] + "..."
    ack = " [ack]" if obj.get("requires_ack") else ""
    return f"{ts}  {arrow:<14}  {typ:<8} {subj}{ack}"


def _fmt_lock_row(body: dict) -> tuple[str, str]:
    """Two-line summary of an active lock: (path-line, meta-line)."""
    stale = _lock_is_stale(body)
    age = ""
    try:
        hb = parse_iso(body["heartbeat_at"])
        secs = int((datetime.now(timezone.utc) - hb).total_seconds())
        age = f"{secs}s"
    except (KeyError, ValueError):
        pass
    path = body.get("path", "?")
    meta = f"  by {body.get('owner','?'):<6} ttl={body.get('ttl_seconds','?')}s age={age}"
    if stale:
        meta += "  STALE"
    return path, meta


def _read_active_locks() -> list[dict]:
    out: list[dict] = []
    d = locks_dir()
    if not d.exists():
        return out
    for p in sorted(d.glob("*.lock")):
        body = _read_lock(p)
        if body:
            out.append(body)
    return out


def _render_empty_state(stdscr, curses, msg_w: int, body_h: int,
                        me: str, session: str) -> None:
    """Draw a 'next-steps' block in the Messages pane when no traffic yet.

    Goal: a user who just landed on this screen sees the literal command
    to paste into a second terminal so the bus has another participant.
    Personalized with their current identity + session.
    """
    other = "codex" if me == "claude" else "claude" if me else "codex"
    sess_show = session if session else "demo"
    session_warning = not session

    lines: list[tuple[str, int]] = []  # (text, attr-mask)
    BOLD = curses.A_BOLD
    DIM = curses.A_DIM
    YELLOW = curses.color_pair(2) | curses.A_BOLD
    GREEN = curses.color_pair(3) | curses.A_BOLD
    CYAN = curses.color_pair(1)
    NORM = 0

    lines.append((" No messages yet — you're alone on this bus.", BOLD))
    lines.append(("", NORM))

    if session_warning:
        lines.append(("  ⚠ AGENT_BRIDGE_SESSION not set. Quit (q) and run:",
                      YELLOW))
        lines.append(("      export AGENT_BRIDGE_SESSION=demo", BOLD))
        lines.append(("      agentbridge", BOLD))
        lines.append(("", NORM))

    lines.append((f"  Step 1: open a SECOND terminal in this directory.", CYAN))
    lines.append(("", NORM))
    lines.append((f"  Step 2: paste these into it:", CYAN))
    lines.append(("", NORM))
    lines.append((f"      export AGENT_BRIDGE_IDENTITY={other}", BOLD))
    lines.append((f"      export AGENT_BRIDGE_SESSION={sess_show}", BOLD))
    lines.append(("", NORM))
    lines.append((f"  Step 3: send a test message FROM that terminal:", CYAN))
    lines.append(("", NORM))
    lines.append((f"      agentbridge handoff --to {me or 'claude'} "
                  f"--task 'hello' --body 'first msg'",
                  BOLD))
    lines.append(("", NORM))
    lines.append((f"  Watch THIS pane — the message lands here within a "
                  f"second.", DIM))
    lines.append(("", NORM))
    lines.append((f"  Shortcuts:", CYAN))
    lines.append((f"      n  copy the new-pane command (incl. tmux one-liner)",
                  BOLD))
    lines.append((f"      o  copy onboarding prompt for {other}",
                  BOLD))
    lines.append((f"      s / a / h  send / ask / handoff from here",
                  BOLD))
    lines.append(("", NORM))
    lines.append((f"  Press ? for full key list.", DIM))

    max_rows = body_h - 1
    for i, (text, attr) in enumerate(lines):
        if i >= max_rows:
            break
        row = 2 + i
        try:
            if attr:
                stdscr.attron(attr)
            stdscr.addnstr(row, 0, text.ljust(msg_w), msg_w)
            if attr:
                stdscr.attroff(attr)
        except curses.error:
            pass


def cmd_ui(args: argparse.Namespace) -> int:
    """Curses spectator: live message stream + active locks; menu keys for sending."""
    try:
        import curses
    except ImportError:
        print("error: curses not available on this platform", file=sys.stderr)
        return 1

    if not bus_root().exists():
        print(f"error: bus not initialized at {bus_root()}. Run: agentbridge init",
              file=sys.stderr)
        return 1

    # Pre-resolve identity/session (lenient — TUI shows them, doesn't enforce).
    me = getattr(args, "as_agent", None) or env_identity() or ""
    session = getattr(args, "session", None) or env_session() or ""

    import collections
    import threading

    msgs: collections.deque[dict] = collections.deque(maxlen=500)
    msg_lock = threading.Lock()
    stop = threading.Event()
    paused = {"v": False}
    status_msg = {"v": "ready — press ? for keys"}

    # Seed with existing messages (tail)
    if msg_path().exists():
        offset = 0
        for _start, end, raw in _scan_new(msg_path(), 0):
            try:
                obj = json.loads(raw)
                msgs.append(obj)
                offset = end
            except json.JSONDecodeError:
                offset = end
        cur_offset = offset
    else:
        cur_offset = 0

    def tail_thread():
        nonlocal cur_offset
        while not stop.is_set():
            if not paused["v"] and msg_path().exists():
                for _s, end, raw in _scan_new(msg_path(), cur_offset):
                    try:
                        obj = json.loads(raw)
                        with msg_lock:
                            msgs.append(obj)
                    except json.JSONDecodeError:
                        pass
                    cur_offset = end
            time.sleep(0.25)

    t = threading.Thread(target=tail_thread, daemon=True)
    t.start()

    def draw(stdscr) -> None:
        curses.curs_set(0)
        stdscr.nodelay(True)
        stdscr.keypad(True)
        try:
            curses.start_color()
            curses.use_default_colors()
            curses.init_pair(1, curses.COLOR_CYAN, -1)
            curses.init_pair(2, curses.COLOR_YELLOW, -1)
            curses.init_pair(3, curses.COLOR_GREEN, -1)
            curses.init_pair(4, curses.COLOR_RED, -1)
            curses.init_pair(5, curses.COLOR_MAGENTA, -1)
        except curses.error:
            pass

        scroll = {"v": 0}  # offset from tail

        while not stop.is_set():
            try:
                stdscr.erase()
                h, w = stdscr.getmaxyx()
                if h < 10 or w < 60:
                    stdscr.addnstr(0, 0, "terminal too small (need >= 60x10)", w - 1)
                    stdscr.refresh()
                    time.sleep(0.2)
                    continue

                # Header — yellow-highlight any unset env value so the user
                # can see at a glance that send/ask/handoff will refuse.
                header_cp = curses.color_pair(1) | curses.A_BOLD
                warn_cp = curses.color_pair(2) | curses.A_BOLD
                try:
                    stdscr.attron(header_cp)
                    stdscr.addnstr(0, 0, " " * (w - 1), w - 1)
                    stdscr.attroff(header_cp)
                except curses.error:
                    pass

                def _hdr_put(col: int, s: str, cp: int) -> int:
                    if col >= w - 1 or not s:
                        return col
                    try:
                        stdscr.attron(cp)
                        stdscr.addnstr(0, col, s, max(0, w - 1 - col))
                        stdscr.attroff(cp)
                    except curses.error:
                        pass
                    return col + len(s)

                col = 0
                col = _hdr_put(col,
                               f" AgentBridge {VERSION}  bus={bus_root()}  me=",
                               header_cp)
                col = _hdr_put(col, me or "(unset)",
                               header_cp if me else warn_cp)
                col = _hdr_put(col, "  session=", header_cp)
                col = _hdr_put(col,
                               session or "(unset — press S to set)",
                               header_cp if session else warn_cp)

                # Layout: messages | locks
                body_h = h - 3
                msg_w = max(40, (w * 2) // 3)
                lock_w = w - msg_w - 1

                # Messages pane
                stdscr.attron(curses.A_BOLD)
                pause_tag = "  [PAUSED]" if paused["v"] else ""
                stdscr.addnstr(1, 0, f" Messages (live){pause_tag}".ljust(msg_w), msg_w)
                stdscr.attroff(curses.A_BOLD)

                with msg_lock:
                    items = list(msgs)
                max_rows = body_h - 1
                # Scroll: 0 = show last `max_rows`. Positive = look back.
                end_idx = max(0, len(items) - scroll["v"])
                start_idx = max(0, end_idx - max_rows)
                visible = items[start_idx:end_idx]

                if not items:
                    _render_empty_state(stdscr, curses, msg_w, body_h,
                                        me, session)
                else:
                    for i, obj in enumerate(visible):
                        row = 2 + i
                        line = _fmt_msg_row(obj)
                        pair = 0
                        typ = obj.get("type", "")
                        if typ == "task":
                            pair = curses.color_pair(2)
                        elif typ == "question":
                            pair = curses.color_pair(5)
                        elif typ == "answer":
                            pair = curses.color_pair(3)
                        elif typ == "warning":
                            pair = curses.color_pair(4)
                        if pair:
                            stdscr.attron(pair)
                        stdscr.addnstr(row, 0, line.ljust(msg_w), msg_w)
                        if pair:
                            stdscr.attroff(pair)

                # Vertical separator
                for r in range(1, body_h + 1):
                    try:
                        stdscr.addch(r, msg_w, curses.ACS_VLINE)
                    except curses.error:
                        pass

                # Locks pane
                stdscr.attron(curses.A_BOLD)
                stdscr.addnstr(1, msg_w + 1, " Active Locks".ljust(lock_w - 1), lock_w - 1)
                stdscr.attroff(curses.A_BOLD)

                locks = _read_active_locks()
                row = 2
                if not locks:
                    stdscr.addnstr(row, msg_w + 1, "  (none)".ljust(lock_w - 1), lock_w - 1)
                else:
                    for body in locks:
                        if row >= body_h:
                            break
                        path, meta = _fmt_lock_row(body)
                        stdscr.addnstr(row, msg_w + 1, ("  " + path).ljust(lock_w - 1),
                                       lock_w - 1)
                        row += 1
                        if row < body_h:
                            stdscr.addnstr(row, msg_w + 1, meta.ljust(lock_w - 1),
                                           lock_w - 1)
                            row += 1

                # Status line + key hints
                sb = f" {status_msg['v']}"
                stdscr.attron(curses.color_pair(1))
                stdscr.addnstr(h - 2, 0, sb.ljust(w), w - 1)
                stdscr.attroff(curses.color_pair(1))

                hints = ("  [s]end [a]sk [h]andoff  [n]ew-pane [o]nboard-other  "
                         "[t]ranscript  [j/k]scroll  [p]ause  [?]help  [q]uit")
                stdscr.attron(curses.A_REVERSE)
                stdscr.addnstr(h - 1, 0, hints.ljust(w), w - 1)
                stdscr.attroff(curses.A_REVERSE)
                stdscr.refresh()
            except curses.error:
                pass

            # Input poll (250ms)
            stdscr.timeout(250)
            try:
                ch = stdscr.getch()
            except curses.error:
                ch = -1
            if ch == -1:
                continue

            if ch in (ord("q"), 27):  # q or ESC
                stop.set()
                return
            elif ch == ord("p"):
                paused["v"] = not paused["v"]
                status_msg["v"] = "paused" if paused["v"] else "resumed"
            elif ch == ord("j") or ch == curses.KEY_DOWN:
                scroll["v"] = max(0, scroll["v"] - 1)
            elif ch == ord("k") or ch == curses.KEY_UP:
                scroll["v"] = min(len(msgs), scroll["v"] + 1)
            elif ch == ord("g"):
                scroll["v"] = len(msgs)
            elif ch == ord("G"):
                scroll["v"] = 0
            elif ch == ord("r"):
                status_msg["v"] = "refreshed"
            elif ch == ord("?"):
                _ui_help_modal(stdscr)
            elif ch == ord("s"):
                _ui_send_modal(stdscr, "send", me, session, status_msg)
            elif ch == ord("a"):
                _ui_send_modal(stdscr, "ask", me, session, status_msg)
            elif ch == ord("h"):
                _ui_send_modal(stdscr, "handoff", me, session, status_msg)
            elif ch == ord("t"):
                _ui_transcript_modal(stdscr, session)
            elif ch == ord("o"):
                _ui_copy_onboarding(stdscr, me, session, status_msg)
            elif ch == ord("n"):
                _ui_newpane_modal(stdscr, me, session, status_msg)

    try:
        curses.wrapper(draw)
    finally:
        stop.set()
    return 0


def _ui_prompt(stdscr, prompt: str, default: str = "") -> str | None:
    """Bottom-line text prompt. Returns None on ESC."""
    import curses
    h, w = stdscr.getmaxyx()
    win = curses.newwin(1, w, h - 1, 0)
    win.clear()
    win.addnstr(0, 0, prompt[: w - 2], w - 1, curses.A_REVERSE)
    win.refresh()
    curses.echo()
    curses.curs_set(1)
    stdscr.nodelay(False)
    try:
        # Inline edit area
        ed_y = h - 1
        ed_x = min(len(prompt) + 1, w - 2)
        edwin = curses.newwin(1, w - ed_x - 1, ed_y, ed_x)
        edwin.addstr(0, 0, default)
        edwin.refresh()
        buf = list(default)
        while True:
            ch = stdscr.getch()
            if ch in (10, 13):  # Enter
                return "".join(buf)
            elif ch == 27:  # ESC
                return None
            elif ch in (curses.KEY_BACKSPACE, 127, 8):
                if buf:
                    buf.pop()
            elif 32 <= ch < 127:
                buf.append(chr(ch))
            edwin.clear()
            edwin.addnstr(0, 0, "".join(buf), w - ed_x - 2)
            edwin.refresh()
    finally:
        curses.noecho()
        curses.curs_set(0)
        stdscr.nodelay(True)


def _ui_send_modal(stdscr, kind: str, me: str, session: str, status_msg: dict) -> None:
    """Open form for send/ask/handoff. kind in {'send','ask','handoff'}."""
    if not me:
        status_msg["v"] = "no identity set (export AGENT_BRIDGE_IDENTITY or pass --as)"
        return
    if not session:
        status_msg["v"] = "no session set (export AGENT_BRIDGE_SESSION or pass --session)"
        return

    other = "codex" if me == "claude" else "claude"
    to = _ui_prompt(stdscr, f" to [{other}]:", other)
    if to is None:
        status_msg["v"] = f"{kind} cancelled"
        return
    if not to:
        to = other
    if to not in AGENT_ENUM:
        status_msg["v"] = f"invalid recipient {to!r}"
        return

    subj_prompt = " task:" if kind == "handoff" else " subject:"
    subject = _ui_prompt(stdscr, subj_prompt, "")
    if subject is None or not subject.strip():
        status_msg["v"] = f"{kind} cancelled"
        return

    body = _ui_prompt(stdscr, " body:", "")
    if body is None:
        body = ""

    kind_to_type = {"send": "status", "ask": "question", "handoff": "task"}
    if kind not in kind_to_type:
        status_msg["v"] = f"unknown kind {kind!r}"
        return
    try:
        if me == to:
            raise ValidationError(f"self-message refused: from and to are both {me!r}")
        msg = {
            "id": new_uuid(),
            "ts": now_iso(),
            "session_id": session,
            "from": me,
            "to": to,
            "type": kind_to_type[kind],
            "subject": subject,
            "body": body,
            "requires_ack": kind == "ask",
            "priority": "normal",
            "tags": ["ui", kind],
        }
        validate_message(msg)
        atomic_append(msg_path(), msg)
        emit_event("agentbridge", f"send_completed", session, {
            "msg_id": msg["id"], "from": me, "to": to, "type": msg["type"],
        })
        status_msg["v"] = f"{kind} sent → {msg['id'][:8]}"
    except (ValidationError, OSError) as e:
        status_msg["v"] = f"{kind} failed: {e}"


def _ui_transcript_modal(stdscr, session: str) -> None:
    """Pop up a scrollable transcript of the current session."""
    import curses
    h, w = stdscr.getmaxyx()
    win = curses.newwin(h - 2, w, 1, 0)
    win.box()
    win.addstr(0, 2, " Transcript (PgUp/PgDn, q to close) ")

    entries: list[tuple[str, int, int, str, dict]] = []
    for off, m in _iter_jsonl_with_offset(msg_path()):
        if session and m.get("session_id") != session:
            continue
        entries.append((m.get("ts", ""), 0, off, "msg", m))
    for off, e in _iter_jsonl_with_offset(evt_path()):
        if session and e.get("session_id") != session:
            continue
        entries.append((e.get("ts", ""), 1, off, "evt", e))
    entries.sort(key=lambda x: (x[0], x[1], x[2]))

    lines: list[str] = []
    for ts, _r, _o, kind, obj in entries:
        ts_short = ts[11:19] if len(ts) > 19 else ts
        if kind == "msg":
            lines.append(f"{ts_short}  msg  {obj.get('from','?')}→{obj.get('to','?')}  "
                         f"{obj.get('type','?'):<8} {obj.get('subject','')}")
        else:
            lines.append(f"{ts_short}  evt  {obj.get('actor','?')}  {obj.get('event','?')}")

    pos = max(0, len(lines) - (h - 4))
    while True:
        win.erase()
        win.box()
        win.addnstr(0, 2, f" Transcript: session={session or '(any)'} "
                          f" ({len(lines)} entries) — q to close ", w - 4)
        visible = lines[pos:pos + (h - 4)]
        for i, ln in enumerate(visible):
            try:
                win.addnstr(1 + i, 1, ln, w - 3)
            except curses.error:
                pass
        win.refresh()
        ch = stdscr.getch()
        if ch in (ord("q"), 27, ord("t")):
            break
        elif ch in (curses.KEY_DOWN, ord("j")):
            pos = min(max(0, len(lines) - 1), pos + 1)
        elif ch in (curses.KEY_UP, ord("k")):
            pos = max(0, pos - 1)
        elif ch == curses.KEY_NPAGE:
            pos = min(max(0, len(lines) - 1), pos + (h - 4))
        elif ch == curses.KEY_PPAGE:
            pos = max(0, pos - (h - 4))


def _clipboard_copy(text: str) -> str | None:
    """Try xclip / xsel / pbcopy / wl-copy. Returns the tool name on success."""
    import shutil
    candidates = [
        (["xclip", "-selection", "clipboard"], "xclip"),
        (["xsel", "--clipboard", "--input"],   "xsel"),
        (["pbcopy"],                            "pbcopy"),
        (["wl-copy"],                           "wl-copy"),
    ]
    for cmd, name in candidates:
        if shutil.which(cmd[0]) is None:
            continue
        try:
            p = subprocess.run(cmd, input=text.encode("utf-8"),
                               check=True, timeout=2,
                               stdout=subprocess.DEVNULL,
                               stderr=subprocess.DEVNULL)
            return name
        except (subprocess.SubprocessError, OSError):
            continue
    return None


def _ui_text_modal(stdscr, title: str, lines: list[str],
                   footer: str = "Press any key to close.") -> None:
    """Generic scrollable text modal."""
    import curses
    h, w = stdscr.getmaxyx()
    hh = min(max(8, len(lines) + 4), h - 2)
    ww = min(max(60, max((len(l) for l in lines), default=40) + 4), w - 4)
    win = curses.newwin(hh, ww, (h - hh) // 2, (w - ww) // 2)
    pos = 0
    while True:
        win.erase()
        win.box()
        try:
            win.addnstr(0, 2, f" {title} ", ww - 4)
        except curses.error:
            pass
        visible = lines[pos:pos + hh - 3]
        for i, ln in enumerate(visible):
            try:
                win.addnstr(1 + i, 2, ln, ww - 4)
            except curses.error:
                pass
        try:
            win.addnstr(hh - 2, 2,
                        f" {footer}  [j/k or ↑/↓ scroll · q to close] ",
                        ww - 4, curses.A_REVERSE)
        except curses.error:
            pass
        win.refresh()
        stdscr.nodelay(False)
        ch = stdscr.getch()
        stdscr.nodelay(True)
        if ch in (ord("q"), 27, ord("o"), ord("n"), 10, 13):
            return
        elif ch in (curses.KEY_DOWN, ord("j")):
            pos = min(max(0, len(lines) - (hh - 4)), pos + 1)
        elif ch in (curses.KEY_UP, ord("k")):
            pos = max(0, pos - 1)
        elif ch == curses.KEY_NPAGE:
            pos = min(max(0, len(lines) - (hh - 4)), pos + (hh - 4))
        elif ch == curses.KEY_PPAGE:
            pos = max(0, pos - (hh - 4))


def _ui_copy_onboarding(stdscr, me: str, session: str,
                        status_msg: dict) -> None:
    """Generate the OTHER agent's onboarding prompt; copy to clipboard if avail."""
    other = "codex" if me == "claude" else "claude" if me else "codex"
    sess = session or "demo"
    self_path = os.path.abspath(sys.argv[0])
    ns = argparse.Namespace(as_agent=other, session=sess)
    import io
    saved = sys.stdout
    sys.stdout = io.StringIO()
    try:
        cmd_agent_prompt(ns)
        prompt = sys.stdout.getvalue()
    finally:
        sys.stdout = saved

    tool = _clipboard_copy(prompt)
    if tool:
        status_msg["v"] = (f"onboarding prompt for {other} copied via "
                           f"{tool} ({len(prompt)} chars)")
    else:
        status_msg["v"] = (f"no clipboard tool found "
                           f"(install xclip / pbcopy / wl-copy)")
    lines = [
        f"# Onboarding prompt for the OTHER agent ({other}).",
        f"# Paste this into {other}'s chat to bootstrap it.",
        "",
    ]
    lines.extend(prompt.splitlines())
    if not tool:
        lines.insert(2, f"  (auto-copy failed — select + copy manually)")
    _ui_text_modal(stdscr, f"Onboarding for {other}  [session={sess}]",
                   lines,
                   footer=("copied to clipboard" if tool
                           else "no clipboard — copy manually"))


def _ui_newpane_modal(stdscr, me: str, session: str,
                      status_msg: dict) -> None:
    """Show how to spin up the second agent in a fresh terminal/pane."""
    other = "codex" if me == "claude" else "claude" if me else "codex"
    sess = session or "demo"
    self_path = os.path.abspath(sys.argv[0])
    cwd = os.getcwd()
    one_liner = (f"agentbridge tmux --session {sess}")
    manual = [
        f"cd {cwd}",
        f"export AGENT_BRIDGE_IDENTITY={other}",
        f"export AGENT_BRIDGE_SESSION={sess}",
        f"agentbridge",
    ]
    payload = one_liner + "\n\n# OR manually in a fresh terminal:\n" + \
        "\n".join(manual)
    tool = _clipboard_copy(payload)
    if tool:
        status_msg["v"] = f"new-pane command copied via {tool}"
    else:
        status_msg["v"] = (f"no clipboard tool found "
                           f"(install xclip / pbcopy / wl-copy)")
    lines = [
        f"Spin up {other!r} as the other side of session {sess!r}.",
        "",
        "  Option A — tmux one-liner (splits this window):",
        f"      {one_liner}",
        "",
        "  Option B — manually in a fresh terminal:",
    ]
    lines.extend("      " + ln for ln in manual)
    lines.append("")
    if tool:
        lines.append(f"  (Already copied to clipboard via {tool}.)")
    else:
        lines.append(f"  (No clipboard tool — copy by hand.)")
    _ui_text_modal(stdscr, f"Start {other} in a new pane",
                   lines,
                   footer=("copied to clipboard" if tool
                           else "no clipboard — copy manually"))


def _ui_help_modal(stdscr) -> None:
    import curses
    h, w = stdscr.getmaxyx()
    hh, ww = min(20, h - 2), min(70, w - 4)
    win = curses.newwin(hh, ww, (h - hh) // 2, (w - ww) // 2)
    win.box()
    text = [
        "  AgentBridge TUI — keys",
        "",
        "  s   send a status message (free-form)",
        "  a   ask a question (recipient must reply)",
        "  h   hand off a task",
        "  n   show command to launch other agent in a new pane",
        "  o   copy onboarding prompt for the OTHER agent",
        "  t   open transcript modal",
        "  j   scroll messages down (newer)",
        "  k   scroll messages up (older)",
        "  g   jump to oldest",
        "  G   jump to newest (tail)",
        "  p   pause/resume live tail",
        "  r   force refresh",
        "  ?   this help",
        "  q   quit (ESC also works)",
        "",
        "  All actions write to .agent-bus/ in cwd.",
        "  Press any key to close.",
    ]
    for i, line in enumerate(text):
        if i + 1 >= hh - 1:
            break
        try:
            win.addnstr(1 + i, 1, line, ww - 2)
        except curses.error:
            pass
    win.refresh()
    stdscr.nodelay(False)
    stdscr.getch()
    stdscr.nodelay(True)


# ----- argparse ------------------------------------------------------------

def _identity_kwargs(env_var: str):
    """Argparse kwargs for an identity flag — required only if env not set."""
    env_val = os.environ.get(env_var)
    if env_val in AGENT_ENUM:
        return {"required": False, "default": env_val, "choices": sorted(AGENT_ENUM)}
    return {"required": False, "default": None, "choices": sorted(AGENT_ENUM)}


def _session_kwargs():
    return {"default": os.environ.get(ENV_SESSION, "")}


def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="agentbridge",
        description=f"AgentBridge {VERSION} — local Claude↔Codex bridge",
        epilog=f"Env defaults: {ENV_IDENTITY}=<claude|codex>  {ENV_SESSION}=<session-id>",
    )
    p.add_argument("-v", "--version", action="version",
                   version=f"agentbridge {VERSION}")
    sub = p.add_subparsers(dest="cmd", required=True)

    sub.add_parser("init", help="bootstrap .agent-bus/").set_defaults(func=cmd_init)
    sub.add_parser("status", help="show bus health").set_defaults(func=cmd_status)

    # ---- ui (interactive spectator) ----
    ui_p = sub.add_parser("ui", help="interactive spectator TUI (curses)")
    ui_p.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY),
                      help="identity to act as when sending from TUI")
    ui_p.add_argument("--session", **_session_kwargs())
    ui_p.set_defaults(func=cmd_ui)

    # ---- send ----
    s = sub.add_parser("send", help="send a message")
    s.add_argument("--from", **_identity_kwargs(ENV_IDENTITY),
                   help="sender (default: $AGENT_BRIDGE_IDENTITY)")
    s.add_argument("--to", required=True, choices=sorted(AGENT_ENUM),
                   help="recipient")
    s.add_argument("--type", required=True, choices=sorted(MSG_TYPE_ENUM))
    s.add_argument("--subject", required=True)
    s.add_argument("--body", default="")
    s.add_argument("--body-file", default=None,
                   help="read body from FILE (or '-' for stdin)")
    s.add_argument("--session", **_session_kwargs(),
                   help="session id (default: $AGENT_BRIDGE_SESSION)")
    s.add_argument("--ack", action="store_true", help="require ack")
    s.add_argument("--reply-to", default=None, help="parent message uuid")
    s.add_argument("--priority", default="normal", choices=sorted(PRIORITY_ENUM))
    s.add_argument("--tag", action="append", default=[])
    s.add_argument("--artifact", action="append", default=[],
                   help="kind=file,ref=path/to/x[,sha256=...]")
    s.add_argument("--dry-run", action="store_true",
                   help="validate + print the message JSON without committing")
    s.set_defaults(func=cmd_send)

    # ---- watch ----
    w = sub.add_parser("watch", help="tail messages for an agent")
    w.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY),
                   help="identity (default: $AGENT_BRIDGE_IDENTITY)")
    w.add_argument("--session", **_session_kwargs())
    w.add_argument("--interval", type=int, default=200, help="poll interval ms")
    w.add_argument("--once", action="store_true", help="drain once and exit")
    w.add_argument("--pretty", action="store_true",
                   help="force pretty output (default: auto by TTY)")
    w.add_argument("--json", action="store_true",
                   help="force JSON output (overrides --pretty)")
    w.add_argument("--bell", action="store_true",
                   help="ring terminal bell on requires_ack messages")
    w.set_defaults(func=cmd_watch)

    # ---- ack ----
    a = sub.add_parser("ack", help="reply to a requires_ack message")
    a.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    a.add_argument("--reply-to", required=True)
    a.add_argument("--body", default="")
    a.add_argument("--body-file", default=None,
                   help="read body from FILE (or '-' for stdin)")
    a.set_defaults(func=cmd_ack)

    # ---- inbox ----
    ib = sub.add_parser("inbox", help="snapshot pending messages (non-blocking)")
    ib.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    ib.add_argument("--session", **_session_kwargs())
    ib.add_argument("--unread", action="store_true",
                    help="only show messages past the cursor")
    ib.add_argument("--type", choices=sorted(MSG_TYPE_ENUM),
                    help="filter by message type")
    ib.add_argument("--needs-ack", action="store_true",
                    help="only show requires_ack messages")
    ib.add_argument("--pretty", action="store_true")
    ib.add_argument("--json", action="store_true")
    ib.set_defaults(func=cmd_inbox)

    # ---- lock {acquire,heartbeat,release,reap,key,with-hold} ----
    lk = sub.add_parser("lock", help="lock subcommands")
    lksub = lk.add_subparsers(dest="lock_cmd", required=True)

    la = lksub.add_parser("acquire", help="acquire per-file edit lock")
    la.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    la.add_argument("--path", required=True, help="repo-relative path")
    la.add_argument("--ttl", type=int, default=120, help="seconds")
    la.add_argument("--reason", default="")
    la.add_argument("--session", **_session_kwargs())
    la.add_argument("--wait", type=int, default=0,
                    help="block up to N seconds waiting for lock (default 0)")
    la.set_defaults(func=cmd_lock_acquire)

    lh = lksub.add_parser("heartbeat", help="refresh lock liveness")
    lh.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    lh.add_argument("--path", required=True)
    lh.add_argument("--session", **_session_kwargs())
    lh.set_defaults(func=cmd_lock_heartbeat)

    lr = lksub.add_parser("release", help="drop your lock")
    lr.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    lr.add_argument("--path", required=True)
    lr.add_argument("--force", action="store_true")
    lr.add_argument("--session", **_session_kwargs())
    lr.set_defaults(func=cmd_lock_release)

    lp = lksub.add_parser("reap", help="force-delete stale/overridden lock")
    lp.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    lp.add_argument("--path", required=True)
    lp.add_argument("--force-stale", action="store_true",
                    help="override non-stale lock (requires prior warning msg)")
    lp.add_argument("--window", type=int, default=60)
    lp.add_argument("--session", **_session_kwargs())
    lp.set_defaults(func=cmd_lock_reap)

    lkey = lksub.add_parser("key", help="print the normalized lock key for a path")
    lkey.add_argument("path")
    lkey.set_defaults(func=cmd_lock_key)

    lwh = lksub.add_parser("with-hold",
                           help="run a command while holding a lock (auto heartbeat + release)")
    lwh.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    lwh.add_argument("--path", required=True)
    lwh.add_argument("--ttl", type=int, default=120)
    lwh.add_argument("--wait", type=int, default=0)
    lwh.add_argument("--reason", default="")
    lwh.add_argument("--session", **_session_kwargs())
    lwh.add_argument("cmd", nargs=argparse.REMAINDER,
                     help="-- <command> [args...]  (the actual command to run)")
    lwh.set_defaults(func=cmd_lock_with_hold)

    # ---- transcript / tmux / agent-prompt ----
    t = sub.add_parser("transcript", help="reconstruct timeline from logs")
    t.add_argument("--session", **_session_kwargs())
    t.add_argument("--format", choices=["pretty", "jsonl"], default="pretty")
    t.set_defaults(func=cmd_transcript)

    tmx = sub.add_parser("tmux", help="spawn 4-pane tmux topology")
    tmx.add_argument("--session", **_session_kwargs())
    tmx.add_argument("--name", default="")
    tmx.set_defaults(func=cmd_tmux)

    ap = sub.add_parser("agent-prompt",
                        help="print a system-prompt snippet teaching agents to use agentbridge")
    ap.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    ap.add_argument("--session", **_session_kwargs())
    ap.set_defaults(func=cmd_agent_prompt)

    # ---- composite verbs: handoff, ask ----
    ho = sub.add_parser("handoff",
                        help="send a task to the other agent (optionally wait for ack)")
    ho.add_argument("--from", dest="from_agent", **_identity_kwargs(ENV_IDENTITY),
                    help="sender (default: $AGENT_BRIDGE_IDENTITY)")
    ho.add_argument("--to", required=True, choices=sorted(AGENT_ENUM))
    ho.add_argument("--task", required=True, help="short task subject")
    ho.add_argument("--body", default="")
    ho.add_argument("--body-file", default=None,
                    help="read body from FILE (or '-' for stdin)")
    ho.add_argument("--session", **_session_kwargs())
    ho.add_argument("--priority", default="normal", choices=sorted(PRIORITY_ENUM))
    ho.add_argument("--tag", action="append", default=[])
    ho.add_argument("--wait-ack", action="store_true",
                    help="block until recipient acks (else return msg-id and exit)")
    ho.add_argument("--timeout", type=int, default=300,
                    help="max seconds to wait for ack (default 300)")
    ho.set_defaults(func=cmd_handoff)

    aq = sub.add_parser("ask",
                        help="send a question and block until answered")
    aq.add_argument("--from", dest="from_agent", **_identity_kwargs(ENV_IDENTITY))
    aq.add_argument("--to", required=True, choices=sorted(AGENT_ENUM))
    aq.add_argument("--subject", required=True)
    aq.add_argument("--body", default="")
    aq.add_argument("--body-file", default=None)
    aq.add_argument("--session", **_session_kwargs())
    aq.add_argument("--priority", default="normal", choices=sorted(PRIORITY_ENUM))
    aq.add_argument("--tag", action="append", default=[])
    aq.add_argument("--timeout", type=int, default=300,
                    help="max seconds to wait for answer (default 300)")
    aq.add_argument("--json", action="store_true",
                    help="print full answer JSON instead of body only")
    aq.set_defaults(func=cmd_ask)

    # ---- completion ----
    comp = sub.add_parser("completion",
                          help="emit a bash or zsh completion script")
    comp.add_argument("shell", choices=["bash", "zsh"])
    comp.set_defaults(func=cmd_completion)

    # ---- cursor {show, reset} ----
    cur = sub.add_parser("cursor", help="inspect/reset per-agent watch cursor")
    cursub = cur.add_subparsers(dest="cursor_cmd", required=True)

    cs = cursub.add_parser("show", help="print cursor offset for an agent")
    cs.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    cs.set_defaults(func=cmd_cursor_show)

    cr = cursub.add_parser("reset",
                           help="reset cursor to a specific offset (default: remove file → 0)")
    cr.add_argument("--as", dest="as_agent", **_identity_kwargs(ENV_IDENTITY))
    cr.add_argument("--to", type=int, default=None,
                    help="set cursor to N bytes (omit to remove the cursor file)")
    cr.set_defaults(func=cmd_cursor_reset)

    return p


def _ansi(color: bool):
    if color:
        return {
            "cyan": "\033[36m", "yellow": "\033[33m", "green": "\033[32m",
            "red": "\033[31m", "bold": "\033[1m", "dim": "\033[2m",
            "reset": "\033[0m",
        }
    return {k: "" for k in ("cyan", "yellow", "green", "red", "bold", "dim", "reset")}


def _print_header(c: dict) -> None:
    print()
    print(f"  {c['bold']}{c['cyan']}AgentBridge {VERSION}{c['reset']}  "
          f"{c['dim']}— local Claude↔Codex realtime bridge{c['reset']}")
    print()


def _print_welcome() -> None:
    """Non-interactive fallback when stdin is not a TTY (cannot prompt)."""
    c = _ansi(sys.stdout.isatty() and not os.environ.get("AGENT_BRIDGE_NO_COLOR"))
    _print_header(c)
    print(f"  {c['yellow']}No bus found in this directory.{c['reset']}  "
          f"{c['dim']}({bus_root()} does not exist){c['reset']}")
    print()
    print(f"  {c['bold']}Get started:{c['reset']}")
    print()
    print(f"    {c['green']}1.{c['reset']} {c['bold']}agentbridge init{c['reset']}"
          f"                        {c['dim']}# bootstrap .agent-bus/ here{c['reset']}")
    print(f"    {c['green']}2.{c['reset']} {c['bold']}export AGENT_BRIDGE_IDENTITY=claude{c['reset']}"
          f"     {c['dim']}# or codex{c['reset']}")
    print(f"    {c['green']}3.{c['reset']} {c['bold']}export AGENT_BRIDGE_SESSION=demo{c['reset']}"
          f"        {c['dim']}# any session id{c['reset']}")
    print(f"    {c['green']}4.{c['reset']} {c['bold']}agentbridge{c['reset']}"
          f"                             {c['dim']}# opens the spectator TUI{c['reset']}")
    print()


def _prompt(label: str, default: str, c: dict) -> str:
    suffix = f" {c['dim']}[{default}]{c['reset']}" if default else ""
    try:
        v = input(f"  {label}{suffix}: ").strip()
    except (EOFError, KeyboardInterrupt):
        print()
        return ""
    return v or default


def _prompt_choice(label: str, options: list[str], default: str, c: dict) -> str:
    """Y/n style prompt — typed answer with prefix matching. Used for confirmations."""
    opts = "/".join(o if o != default else o.upper() for o in options)
    while True:
        try:
            v = input(f"  {label} ({opts}) ").strip().lower()
        except (EOFError, KeyboardInterrupt):
            print()
            return ""
        if not v:
            return default
        if v in options:
            return v
        for o in options:
            if o.startswith(v):
                return o
        print(f"  {c['red']}choose one of: {', '.join(options)}{c['reset']}")


def _arrow_select(label: str, options: list[tuple[str, str]],
                  default_idx: int, c: dict) -> str | None:
    """Arrow-key picker. options = [(value, description)]. Returns value or None.

    Falls back to typed input on platforms without termios (e.g. Windows).
    """
    try:
        import termios
        import tty
    except ImportError:
        # No raw-mode tty available — degrade to typed input.
        vals = [v for v, _ in options]
        return _prompt_choice(label, vals, vals[default_idx], c) or None

    if not sys.stdin.isatty() or not sys.stdout.isatty():
        vals = [v for v, _ in options]
        return _prompt_choice(label, vals, vals[default_idx], c) or None

    print(f"  {label}  {c['dim']}(↑/↓ to move, Enter to select){c['reset']}")
    idx = max(0, min(default_idx, len(options) - 1))

    def render(first: bool) -> None:
        if not first:
            # Move cursor up by len(options) to overwrite previous render.
            sys.stdout.write(f"\033[{len(options)}A")
        for i, (val, desc) in enumerate(options):
            marker = f"{c['green']}▸{c['reset']}" if i == idx else " "
            highlight_on = c['bold'] if i == idx else c['dim']
            highlight_off = c['reset']
            line = f"    {marker} {highlight_on}{val}{highlight_off}"
            if desc:
                line += f"  {c['dim']}— {desc}{c['reset']}"
            # Clear to end-of-line in case previous render was longer.
            sys.stdout.write("\r" + line + "\033[K\n")
        sys.stdout.flush()

    render(first=True)

    fd = sys.stdin.fileno()
    saved = termios.tcgetattr(fd)
    try:
        tty.setcbreak(fd)
        while True:
            ch = sys.stdin.read(1)
            if ch == "\x1b":
                # Could be ESC alone or the start of an arrow sequence.
                seq = sys.stdin.read(2)
                if seq == "[A":  # up
                    idx = (idx - 1) % len(options)
                    render(first=False)
                elif seq == "[B":  # down
                    idx = (idx + 1) % len(options)
                    render(first=False)
                else:
                    return None  # ESC = cancel
            elif ch in ("\r", "\n"):
                return options[idx][0]
            elif ch == "\x03":  # Ctrl-C
                return None
            elif ch in ("j", "J"):
                idx = (idx + 1) % len(options)
                render(first=False)
            elif ch in ("k", "K"):
                idx = (idx - 1) % len(options)
                render(first=False)
            elif ch.isdigit():
                n = int(ch)
                if 1 <= n <= len(options):
                    idx = n - 1
                    render(first=False)
                    return options[idx][0]
    finally:
        termios.tcsetattr(fd, termios.TCSADRAIN, saved)


def _run_quick_env() -> int | None:
    """Mini-prompt when bus exists but env is missing. Sets env in-process.

    Returns None on success (caller should continue to TUI), or an exit code.
    """
    c = _ansi(sys.stdout.isatty() and not os.environ.get("AGENT_BRIDGE_NO_COLOR"))
    _print_header(c)
    missing = []
    if not env_identity():
        missing.append("identity")
    if not env_session():
        missing.append("session")
    print(f"  {c['yellow']}Bus exists at {bus_root()}, but "
          f"{' and '.join(missing)} not set.{c['reset']}")
    print(f"  {c['dim']}Pick now (defaults in brackets). "
          f"Ctrl-C / ESC = open TUI without sending ability.{c['reset']}")
    print()

    if not env_identity():
        agent_options = [
            ("claude", "Anthropic Claude (Claude Code, Sonnet/Opus)"),
            ("codex",  "OpenAI Codex (codex CLI, GPT-class)"),
        ]
        identity = _arrow_select("Which side are you driving?",
                                 agent_options, 0, c)
        if identity:
            os.environ[ENV_IDENTITY] = identity
        print()

    if not env_session():
        print(f"  {c['dim']}Session name = label scoping this coordination "
              f"context (e.g. feature branch{c['reset']}")
        print(f"  {c['dim']}or ticket id). Both agents must use the same value "
              f"to see each other.{c['reset']}")
        session = _prompt("Session name", "demo", c)
        if session:
            os.environ[ENV_SESSION] = session
        print()

    me = env_identity() or ""
    sess = env_session() or ""
    print(f"  {c['green']}✓{c['reset']} identity = {c['bold']}{me or '(unset)'}"
          f"{c['reset']}, session = {c['bold']}{sess or '(unset)'}{c['reset']}  "
          f"{c['dim']}(this shell only){c['reset']}")
    print(f"  {c['dim']}Persist: export AGENT_BRIDGE_IDENTITY={me or '<claude|codex>'} "
          f"AGENT_BRIDGE_SESSION={sess or '<id>'}{c['reset']}")
    print()
    try:
        input(f"  Press {c['bold']}Enter{c['reset']} to open the TUI… ")
    except (EOFError, KeyboardInterrupt):
        print()
        return 0
    return None


def _run_onboarding() -> int:
    """Interactive first-run wizard. Returns exit code; sets env vars on success."""
    c = _ansi(sys.stdout.isatty() and not os.environ.get("AGENT_BRIDGE_NO_COLOR"))
    _print_header(c)
    print(f"  {c['yellow']}No bus found in {bus_root()}.{c['reset']}")
    print(f"  Set one up here now? It will create {c['bold']}./.agent-bus/{c['reset']} "
          f"in this directory.")
    print()
    yn = _prompt_choice("Initialize?", ["y", "n"], "y", c)
    if yn != "y":
        print()
        print(f"  {c['dim']}Skipped. Run `agentbridge init` later, or `agentbridge --help` "
              f"for the full command list.{c['reset']}")
        return 0

    print()
    agent_options = [
        ("claude", "Anthropic Claude (Claude Code, Sonnet/Opus)"),
        ("codex",  "OpenAI Codex (codex CLI, GPT-class)"),
    ]
    default_agent = env_identity() or "claude"
    default_idx = next((i for i, (v, _) in enumerate(agent_options)
                        if v == default_agent), 0)
    identity = _arrow_select("Which side are you driving?",
                             agent_options, default_idx, c)
    if not identity:
        return 0
    print()
    print(f"  {c['dim']}Session name = a label scoping this coordination context "
          f"(e.g. a feature{c['reset']}")
    print(f"  {c['dim']}branch or ticket id). Both agents must use the same value "
          f"to see each other.{c['reset']}")
    session = _prompt("Session name", env_session() or "demo", c)
    if not session:
        return 0

    # Run init silently — we print our own confirmation below.
    import io
    saved_stdout = sys.stdout
    sys.stdout = io.StringIO()
    try:
        rc = cmd_init(argparse.Namespace())
    finally:
        sys.stdout = saved_stdout
    if rc != 0:
        return rc

    # Persist into this process so the TUI inherits the right defaults.
    os.environ[ENV_IDENTITY] = identity
    os.environ[ENV_SESSION] = session

    print()
    print(f"  {c['green']}✓{c['reset']} bus initialized at {c['bold']}{bus_root()}{c['reset']}")
    print(f"  {c['green']}✓{c['reset']} identity = {c['bold']}{identity}{c['reset']}, "
          f"session = {c['bold']}{session}{c['reset']}  "
          f"{c['dim']}(this shell only){c['reset']}")
    print()
    print(f"  {c['dim']}To persist across shells, add to your rc file:{c['reset']}")
    print(f"    {c['dim']}export AGENT_BRIDGE_IDENTITY={identity}{c['reset']}")
    print(f"    {c['dim']}export AGENT_BRIDGE_SESSION={session}{c['reset']}")
    print()
    try:
        input(f"  Press {c['bold']}Enter{c['reset']} to open the TUI… ")
    except (EOFError, KeyboardInterrupt):
        print()
        return 0
    return None  # signal: continue to launch TUI


def main(argv: list[str] | None = None) -> int:
    effective = list(argv) if argv is not None else sys.argv[1:]
    # Bare invocation on a TTY: launch TUI if bus exists, else onboarding wizard.
    if not effective and sys.stdout.isatty():
        if bus_root().exists():
            if (sys.stdin.isatty()
                    and (not env_identity() or not env_session())):
                rc = _run_quick_env()
                if rc is not None:
                    return rc
            effective = ["ui"]
        elif sys.stdin.isatty():
            rc = _run_onboarding()
            if rc is not None:
                return rc
            effective = ["ui"]
        else:
            _print_welcome()
            return 0
    parser = build_parser()
    args = parser.parse_args(effective)
    try:
        return args.func(args)
    except ValidationError as e:
        c = _ansi(sys.stderr.isatty() and not os.environ.get("AGENT_BRIDGE_NO_COLOR"))
        print(f"{c['red']}error:{c['reset']} {e}", file=sys.stderr)
        return 2


if __name__ == "__main__":
    sys.exit(main())
