#!/usr/bin/env python3
"""arq-github — GitHub bridge MVP for mesh-routed Twin dispatches.

Accepts a small set of write-verbs, wraps each in a signed envelope
emitted to ARQERA's substrate before and after the HTTP call, then
dispatches to GitHub's REST API (or internal `git push` for branches).

Canonical policy this exists to satisfy:
  arq://doc/policy/twin-dispatch-mesh-only-v1

Parent principles:
  arq://doc/principle/one-primitive-speaks-every-protocol-v1
  arq://doc/principle/action-owner-and-worker-v1

Authentication: loads the current `gh auth token` for whichever account
gh CLI is switched to. To sign as Twin: `gh auth switch --user ArqeraAITwin`
before calling. That switch is a local config flip (not a dispatch) and
is permitted by the mesh-enforce hook.

Reads (pr view, run view, etc.) stay with bare `gh` — they do not
mutate external state and the hook permits them. This bridge is for
the writes only.

Minimum-viable. Supports:
  arq-github push <remote> <branch> [--force-with-lease]
  arq-github pr create --title T --body B --head H [--base main]
  arq-github pr review <num> --request-changes | --comment [--body B]
  arq-github pr merge-via-substrate <num> --squash | --merge | --rebase

Each call emits:
  arq://act/github_envelope_sent/<verb>/<ts>  (pre-dispatch)
  arq://act/github_envelope_ack/<verb>/<ts>   (post-dispatch, with outcome)
"""

from __future__ import annotations

import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path

GITHUB_API = "https://api.github.com"
DEFAULT_OWNER = "Arqera-IO"
DEFAULT_REPO = "ARQERA"
TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
TWIN_EMIT_TIMEOUT_S = float(os.environ.get("ARQ_GITHUB_TWIN_EMIT_TIMEOUT_S", "20"))
TWIN_EMIT_ATTEMPTS = max(1, int(os.environ.get("ARQ_GITHUB_TWIN_EMIT_ATTEMPTS", "3")))
TWIN_EMIT_BACKOFF_S = float(os.environ.get("ARQ_GITHUB_TWIN_EMIT_BACKOFF_S", "2"))
TWIN_LOOKUP_TIMEOUT_S = float(os.environ.get("ARQ_GITHUB_TWIN_LOOKUP_TIMEOUT_S", "10"))

# Actor peer — every envelope declares which action-owner dispatched
# it. Per arq://doc/principle/actor-scoped-credential-resolution-v1.
# The backend resolver (Epic H.2 version B — tracked separately) will
# eventually map this to the platform-native identity. Until then,
# substrate records the actor even though credential resolution
# client-side still uses gh-CLI's active account.
ACTOR_PEER_ADDRESS = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",  # Twin's peer on this Mac
)


def _utc_ts() -> str:
    return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")


# ---------------------------------------------------------------------------
# Primitive-first dispatch (Option B Move 3; shared helper since PR D)
# ---------------------------------------------------------------------------
#
# Each verb tries the local arq daemon's `/v1/invoke` first, falls back to
# the direct gh/REST path if the verb isn't registered or the daemon is
# unreachable. When the primitive owns the call, it emits its own envelope
# trail (invoke_requested + invoke_completed), so the wrapper MUST NOT
# double-emit github_envelope_sent/ack acts on the primitive path.
#
# Helper lives in _arq_primitive.py (sibling file) — shared with arq-kube
# and arq-call. Import it via the script's own directory.

sys.path.insert(0, str(Path(__file__).resolve().parent))
from _arq_primitive import primitive_invoke as _primitive_base  # noqa: E402


def _primitive_invoke(verb: str, payload: dict, timeout_s: float = 30.0):
    """Thin shim over the shared helper — injects ACTOR_PEER_ADDRESS."""
    return _primitive_base(verb, payload, ACTOR_PEER_ADDRESS, timeout_s=timeout_s)


def _emit_act_via_daemon(class_: str, type_: str, reference: str, payload: dict) -> tuple[str, str | None]:
    """Try the daemon.act.emit verb (Layer 2 — zero keychain prompts).

    Returns (outcome, address):
      - ("ok", "arq://...")     — daemon signed + projected successfully
      - ("unknown_verb", None)  — daemon is older; caller should fall back
      - ("unavailable", None)   — daemon socket missing / opt-out; fall back
      - ("error", None)         — daemon reached but upstream failed; do NOT
                                  fall back (addressing is authoritative)
    """
    try:
        outcome, result = _primitive_invoke(
            "daemon.act.emit",
            {
                "class": class_,
                "type": type_,
                "reference": reference,
                "payload": payload,
            },
            timeout_s=10.0,
        )
    except Exception:
        return ("unavailable", None)
    if outcome == "ok" and isinstance(result, dict):
        return ("ok", result.get("address"))
    if outcome == "unknown_verb":
        return ("unknown_verb", None)
    if outcome == "error":
        return ("error", None)
    return ("unavailable", None)


def _act_address_from_index_item(item: object) -> str | None:
    if isinstance(item, str) and item.startswith("arq://"):
        return item
    if not isinstance(item, dict):
        return None
    for key in ("address", "addr", "uri"):
        value = item.get(key)
        if isinstance(value, str) and value.startswith("arq://"):
            return value
    return None


def _lookup_emitted_act_by_ref(class_: str, type_: str, reference: str) -> str | None:
    """Check whether an exact-reference act landed after a timed-out emit."""
    if not TWIN_CLI.exists():
        return None
    try:
        result = subprocess.run(
            [
                str(TWIN_CLI),
                "--use-keychain",
                "index",
                "--class",
                class_,
                "--type",
                type_,
                "--reference",
                reference,
                "--limit",
                "1",
                "--json",
            ],
            capture_output=True,
            text=True,
            timeout=TWIN_LOOKUP_TIMEOUT_S,
            check=False,
        )
    except (subprocess.TimeoutExpired, OSError) as exc:
        print(f"[arq-github] substrate post-timeout lookup exception: {exc}", file=sys.stderr)
        return None
    if result.returncode != 0:
        stderr = result.stderr.strip()
        if stderr:
            print(f"[arq-github] substrate post-timeout lookup failed: {stderr}", file=sys.stderr)
        return None
    try:
        rows = json.loads(result.stdout or "[]")
    except json.JSONDecodeError as exc:
        print(f"[arq-github] substrate post-timeout lookup JSON error: {exc}", file=sys.stderr)
        return None
    if isinstance(rows, dict):
        rows = rows.get("items") or rows.get("results") or rows.get("rows") or []
    if not isinstance(rows, list):
        return None
    for row in rows:
        address = _act_address_from_index_item(row)
        if address:
            return address
    return None


def _emit_act_via_twin_subprocess(class_: str, type_: str, reference: str, payload: dict) -> str | None:
    """Legacy path: shell out to `twin --use-keychain act emit`.

    Kept for backwards-compat with daemons that don't implement
    daemon.act.emit yet. Each call touches the macOS keychain ACL —
    on ACL-mismatched Macs (pre-Layer-1) this is the prompt-storm
    source. Will be deleted once daemon.act.emit is ubiquitous.
    """
    if not TWIN_CLI.exists():
        return None
    payload_json = json.dumps(payload)  # noqa: ARQ-NO-JSON-HOT-PATH twin CLI input boundary — twin accepts --payload <json>
    for attempt in range(1, TWIN_EMIT_ATTEMPTS + 1):
        try:
            result = subprocess.run(
                [
                    str(TWIN_CLI),
                    "--use-keychain",
                    "act",
                    "emit",
                    class_,
                    type_,
                    reference,
                    "--payload",
                    payload_json,
                ],
                capture_output=True,
                text=True,
                timeout=TWIN_EMIT_TIMEOUT_S,
                check=False,
            )
            if result.returncode == 0:
                for line in result.stdout.splitlines():
                    line = line.strip()
                    if line.startswith("arq://"):
                        return line
            else:
                print(
                    f"[arq-github] substrate emit failed attempt {attempt}/{TWIN_EMIT_ATTEMPTS}: "
                    f"{result.stderr.strip()}",
                    file=sys.stderr,
                )
        except subprocess.TimeoutExpired as exc:
            print(
                f"[arq-github] substrate emit timed out attempt {attempt}/{TWIN_EMIT_ATTEMPTS}: {exc}",
                file=sys.stderr,
            )
            address = _lookup_emitted_act_by_ref(class_, type_, reference)
            if address:
                print(
                    f"[arq-github] substrate emit recovered landed act after timeout: {address}",
                    file=sys.stderr,
                )
                return address
        except OSError as exc:
            print(
                f"[arq-github] substrate emit exception attempt {attempt}/{TWIN_EMIT_ATTEMPTS}: {exc}",
                file=sys.stderr,
            )
        if attempt < TWIN_EMIT_ATTEMPTS:
            time.sleep(TWIN_EMIT_BACKOFF_S * attempt)
    address = _lookup_emitted_act_by_ref(class_, type_, reference)
    if address:
        print(f"[arq-github] substrate emit recovered landed act after retries: {address}", file=sys.stderr)
        return address
    return None


def _emit_act(class_: str, type_: str, reference: str, payload: dict) -> str | None:
    """Fire-and-forget substrate act emission.

    Tries daemon.act.emit first (in-memory signing, zero keychain
    prompts) and falls back to the twin subprocess on unknown_verb /
    unavailable. On daemon-reported "error" the fallback is skipped:
    addressing service is authoritative about its own state.
    """
    outcome, address = _emit_act_via_daemon(class_, type_, reference, payload)
    if outcome == "ok":
        return address
    if outcome == "error":
        return None
    return _emit_act_via_twin_subprocess(class_, type_, reference, payload)


def _gh_auth_token() -> str:
    """Read the current gh CLI's active token. Fail clearly if gh isn't
    authenticated — do not silently fall through to unauth'd calls."""
    try:
        result = subprocess.run(
            ["gh", "auth", "token"],
            capture_output=True,
            text=True,
            check=True,
            timeout=5,
        )
        token = result.stdout.strip()
        if not token:
            print("arq-github: empty token from gh auth token", file=sys.stderr)
            sys.exit(2)
        return token
    except FileNotFoundError:
        print("arq-github: gh CLI not on PATH — cannot resolve token", file=sys.stderr)
        sys.exit(2)
    except subprocess.CalledProcessError as exc:
        print(
            f"arq-github: gh auth token failed (rc={exc.returncode}): {exc.stderr.strip()}",
            file=sys.stderr,
        )
        sys.exit(2)


def _read_arq_connection(service: str, resource: str, *, audit_only: bool = False) -> str | None:
    """Read a credential via the `arq-connection` broker.

    Returns the credential value on success, or ``None`` when:
      - the broker binary is not on PATH (e.g. machine without ARQERA tools)
      - the resource is not provisioned in the vault
      - the broker rejects the request (RBAC denial, audit-only veto)

    Audit emission (`credential_accessed` / `credential_access_denied`) is
    handled inside `arq-connection` itself — the consumer does not need to
    emit its own. Used by `--as <peer>` substrate-approve to gate on the
    presence of bot-identity credentials WITHOUT silently degrading to the
    operator's gh-CLI when they are missing.

    audit_only=True returns the sentinel ``"<present>"`` instead of the
    real value — useful for presence-checks at substrate-attestation time
    where the actual auth happens later in the call chain.
    """
    broker = shutil.which("arq-connection")
    if not broker:
        return None
    try:
        cmd = [
            broker, "access", service, resource,
            "--requesting-worker", "arq://body/worker/arq-github",
        ]
        if audit_only:
            cmd.append("--audit-only")
        result = subprocess.run(
            cmd, check=False, timeout=10, capture_output=True, text=True,
        )
        if result.returncode != 0:
            return None
        value = (result.stdout or "").rstrip("\n")
        if audit_only:
            # --audit-only mode: arq-connection suppresses value print and
            # signals presence via rc=0 (already verified above). Sentry
            # catch 2026-05-22: the previous `value or rc==0` branch was
            # dead-code because rc==0 is guaranteed here, so it always
            # returned "<present>" regardless of value. Simplifying:
            # presence == rc=0, value is irrelevant for audit-only callers.
            return "<present>"
        return value or None
    except Exception:
        return None


def _gh_active_user() -> str:
    """Which gh account is active right now — so the substrate act records it.

    Parses the multi-line `gh auth status` output. `gh` places the account
    login on a "Logged in to github.com account <login> (...)" line and the
    active flag on a SUBSEQUENT "  - Active account: true" line. Substrate
    audit trails depend on the `actor` field being accurate; an `unknown`
    return contaminates the evidence chain.

    Sentry SUR-2026-05-13 (PR #3691) noted the prior implementation:
      (a) `continue` skipped the very line carrying the signal
      (b) the second branch required both tokens on the same line — they
          are never on the same line in any `gh` version
    Replaced with: primary structured regex on the combined stream, then a
    line-pair fallback tolerant of `gh` minor-version output drift.
    """
    import re

    try:
        result = subprocess.run(
            ["gh", "auth", "status"],
            capture_output=True,
            text=True,
            check=False,
            timeout=5,
        )
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return "unknown"

    combined = (result.stdout or "") + (result.stderr or "")

    # Primary: structured regex matching the canonical two-line shape.
    # `account <login> (<source>)\n  - Active account: true` captures the
    # login of the account marked active. `(\S+)` excludes whitespace so
    # the login can't accidentally grab the next token.
    m = re.search(
        r"account\s+(\S+)\s+\([^)]*\)\s*\n\s*-\s*Active account:\s*true",
        combined,
    )
    if m:
        return m.group(1)

    # Fallback: scan line-pairs explicitly. Holds the most recent login
    # seen on a "Logged in to github.com account <login>" line, then
    # returns it when a following "Active account: true" line appears.
    # Tolerates intervening `- Token scopes: ...` / `- Git operations
    # protocol: ...` lines that gh emits between login and active flag.
    last_login: str | None = None
    for line in combined.splitlines():
        login_match = re.search(
            r"Logged in to github\.com account\s+(\S+)\s+\([^)]*\)",
            line,
        )
        if login_match:
            last_login = login_match.group(1)
            continue
        if last_login and re.search(r"Active account:\s*true", line):
            return last_login
        # A new account block starting before the active flag resets
        # the pending login — handles inactive accounts listed first.
        if last_login and re.search(r"Active account:\s*false", line):
            last_login = None

    return "unknown"


def _api_call(
    method: str,
    path: str,
    token: str,
    body: dict | None = None,
) -> tuple[int, dict | None, str]:
    """Blocking call to GitHub REST API. Returns (status_code, parsed_json, error_text)."""
    url = GITHUB_API + path
    data = json.dumps(body).encode("utf-8") if body is not None else None  # noqa: ARQ-NO-JSON-HOT-PATH GitHub REST API vendor wire format
    req = urllib.request.Request(
        url,
        method=method,
        data=data,
        headers={
            "Accept": "application/vnd.github+json",
            "Authorization": f"Bearer {token}",
            "User-Agent": "arq-github-bridge/0.1",
            "X-GitHub-Api-Version": "2022-11-28",
        },
    )
    if data is not None:
        req.add_header("Content-Type", "application/json")
    try:
        with urllib.request.urlopen(req, timeout=30) as resp:
            raw = resp.read().decode("utf-8") or "{}"
            return resp.status, json.loads(raw) if raw.strip() else None, ""
    except urllib.error.HTTPError as exc:
        raw = exc.read().decode("utf-8") if exc.fp else ""
        try:
            parsed = json.loads(raw) if raw.strip() else None
        except json.JSONDecodeError:
            parsed = None
        return exc.code, parsed, raw[:500]
    except (urllib.error.URLError, TimeoutError, OSError) as exc:
        return 0, None, f"{type(exc).__name__}: {exc}"


def _post_arq_merge_gate(
    *,
    owner: str,
    repo: str,
    head_sha: str,
    conclusion: str,
    title: str,
    summary: str,
    token: str,
    external_id: str | None = None,
    details_act: str | None = None,
) -> tuple[int, str]:
    """Post a commit status with context `arq-merge-gate` at head_sha.

    Branch-protection's `required_status_checks.contexts` accepts both
    check-run names and status contexts. Statuses are PAT-supported (Checks
    API requires GitHub App auth). This is the substrate-attested gate that
    Phase 4 of merge-decision-on-substrate-v1 puts in front of the merge.

    Composes:
      - merge-decision-on-substrate-v1 (canonised) — substrate decides; this
        status is the GitHub-side projection of that decision.
      - cancelled-is-not-failure-v1 (canonised 2026-04-26) — the evaluator
        that produces this gate applies the canonical semantic.
      - inefficiency-is-a-bug-v1 (canonised 2026-04-26) — replaces the
        cancellation-cascade-prone `CI Gate` workflow check.

    The status is posted directly by `arq-github pr merge-via-substrate`,
    NOT by a workflow. Cancellation-cascade structurally cannot produce a
    phantom `arq-merge-gate` failure because no workflow emits the context.

    Maps Checks API `conclusion` → Statuses API `state`:
      success / neutral / skipped → success
      failure / timed_out / action_required → failure
      cancelled (per cancelled-is-not-failure-v1) → success
      anything else → error

    Returns (http_status, error_text).
    """
    state = {
        "success": "success",
        "neutral": "success",
        "skipped": "success",
        "cancelled": "success",
        "failure": "failure",
        "timed_out": "failure",
        "action_required": "failure",
    }.get(conclusion, "error")
    # Description capped at 140 chars by GitHub. Full audit trail lives on
    # substrate (merge_decided / merge_blocked acts).
    desc = f"{title}: {summary}"
    if len(desc) > 140:
        desc = desc[:137] + "..."
    body: dict = {
        "state": state,
        "context": "arq-merge-gate",
        "description": desc,
    }
    if details_act:
        # target_url must be http(s); substrate arq:// addresses can't be
        # linked directly. Point to the GitHub commit, where the substrate
        # address appears in the merge_decided / merge_blocked act emitted
        # by this same dispatcher. Auditor follows substrate trail from
        # there.
        body["target_url"] = (
            f"https://github.com/{owner}/{repo}/commit/{head_sha}"
        )
    status, _, err = _api_call(
        "POST", f"/repos/{owner}/{repo}/statuses/{head_sha}", token=token, body=body,
    )
    return status, err


def cmd_push(args: argparse.Namespace) -> int:
    """`git push` wrapped with substrate envelope — the bridge is the mesh
    boundary; the git push itself happens server-side to us."""
    verb = "push"
    ts = _utc_ts()
    actor = _gh_active_user()
    actor_peer = ACTOR_PEER_ADDRESS
    envelope = {
        "verb": f"git.{verb}",
        "remote": args.remote,
        "branch": args.branch,
        "force_with_lease": args.force_with_lease,
        "actor": actor,
        "actor_peer": actor_peer,
        "ts": ts,
    }
    sent_addr = _emit_act("act", "github_envelope_sent", f"{verb}-{ts}", envelope)
    cmd = ["git", "push", args.remote, args.branch]
    if args.force_with_lease:
        cmd = ["git", "push", "--force-with-lease", args.remote, args.branch]
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, check=False)
        status = result.returncode
        ack = {
            "verb": f"git.{verb}",
            "envelope_sent": sent_addr,
            "exit_code": status,
            "stdout": result.stdout[-500:],
            "stderr": result.stderr[-500:],
            "ts": _utc_ts(),
        }
        _emit_act("act", "github_envelope_ack", f"{verb}-{ts}", ack)
        # Stream the actual output so the caller sees git's messages
        if result.stdout:
            sys.stdout.write(result.stdout)
        if result.stderr:
            sys.stderr.write(result.stderr)
        return status
    except FileNotFoundError:
        _emit_act("act", "github_envelope_ack", f"{verb}-{ts}", {
            "verb": f"git.{verb}",
            "envelope_sent": sent_addr,
            "exit_code": 127,
            "error": "git not on PATH",
        })
        print("arq-github: git binary not found", file=sys.stderr)
        return 127


def _resolve_body(args: argparse.Namespace) -> str:
    """If --body-file is provided, read from file; else use --body (default '')."""
    body_file = getattr(args, "body_file", None)
    if body_file:
        try:
            with open(body_file, "r", encoding="utf-8") as f:
                return f.read()
        except OSError as exc:
            print(f"[arq-github] --body-file read failed: {exc}", file=sys.stderr)
            sys.exit(2)
    return args.body or ""


def cmd_pr_create(args: argparse.Namespace) -> int:
    body = _resolve_body(args)
    # Primitive-first (Option B Move 6).
    outcome, result = _primitive_invoke(
        "github.pr.create",
        {
            "title": args.title,
            "head": args.head,
            "base": args.base,
            "body": body,
            "owner": args.owner,
            "repo": args.repo,
        },
        timeout_s=60.0,
    )
    if outcome == "ok":
        print(result.get("url", ""))
        return 0
    if outcome == "error":
        print(f"arq-github pr create failed via primitive: {json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback: direct REST (body already resolved at function entry)
    verb = "pr.create"
    ts = _utc_ts()
    actor = _gh_active_user()
    actor_peer = ACTOR_PEER_ADDRESS
    req_body = {
        "title": args.title, "head": args.head, "base": args.base, "body": body,
    }
    envelope = {**req_body, "verb": verb, "owner": args.owner, "repo": args.repo,
                "actor": actor, "actor_peer": actor_peer, "ts": ts,
                "dispatch_path": "direct_rest_fallback"}
    sent_addr = _emit_act("act", "github_envelope_sent", f"pr-create-{ts}", envelope)
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "POST", f"/repos/{args.owner}/{args.repo}/pulls", token=token, body=req_body,
    )
    ack = {
        "verb": verb, "envelope_sent": sent_addr, "http_status": status,
        "pr_number": parsed.get("number") if isinstance(parsed, dict) else None,
        "pr_url": parsed.get("html_url") if isinstance(parsed, dict) else None,
        "error": err if status >= 400 or status == 0 else None, "ts": _utc_ts(),
    }
    _emit_act("act", "github_envelope_ack", f"pr-create-{ts}", ack)
    if 200 <= status < 300:
        print(parsed.get("html_url", ""))
        return 0
    print(f"arq-github pr create failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_pr_view(args: argparse.Namespace) -> int:
    # Primitive-first path (Option B). The daemon owns the envelope trail
    # when it handles the verb — no double emission from this wrapper.
    outcome, result = _primitive_invoke(
        "github.pr.view",
        {"pr_number": args.pr, "owner": args.owner, "repo": args.repo},
    )
    if outcome == "ok":
        # Daemon returned the condensed shape with full_pr echoed inside.
        print(json.dumps(result.get("full_pr") or result, indent=2))
        return 0
    if outcome == "error":
        # Primitive ran and failed upstream — authoritative. Do not fall back.
        print(
            f"arq-github pr view failed via primitive: {json.dumps(result, indent=2)}",
            file=sys.stderr,
        )
        return 1

    # Fallback: unknown_verb (not yet rolled out) or unavailable (daemon down).
    # Wrapper emits its own trail on this path; once the verb is everywhere,
    # this branch dies and the primitive path becomes the only trail.
    verb = "pr.view"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"pr-view-{args.pr}-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "pr_number": args.pr,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
        "dispatch_path": "direct_rest_fallback",
    })
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}", token=token
    )
    _emit_act("act", "github_envelope_ack", f"pr-view-{args.pr}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status,
        "error": err if status >= 400 or status == 0 else None, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        print(json.dumps(parsed, indent=2))
        return 0
    print(f"arq-github pr view failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_pr_list(args: argparse.Namespace) -> int:
    # Primitive-first (Option B Move 6).
    outcome, result = _primitive_invoke(
        "github.pr.list",
        {"state": args.state, "limit": args.limit, "owner": args.owner, "repo": args.repo},
    )
    if outcome == "ok":
        for p in result.get("prs", []):
            print(f"#{p['pr_number']:<6} {p['state']:<8} {p['title']}")
        return 0
    if outcome == "error":
        print(f"arq-github pr list failed via primitive: {json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback: direct REST
    verb = "pr.list"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"pr-list-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "state": args.state,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
        "dispatch_path": "direct_rest_fallback",
    })
    token = _gh_auth_token()
    q = f"?state={args.state}&per_page={args.limit}"
    status, parsed, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/pulls{q}", token=token
    )
    _emit_act("act", "github_envelope_ack", f"pr-list-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status,
        "count": len(parsed) if isinstance(parsed, list) else 0, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        for p in (parsed or []):
            print(f"#{p['number']:<6} {p['state']:<8} {p['title']}")
        return 0
    print(f"arq-github pr list failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_pr_checks(args: argparse.Namespace) -> int:
    # Primitive-first (Option B Move 6).
    outcome, result = _primitive_invoke(
        "github.pr.checks",
        {"pr_number": args.pr, "owner": args.owner, "repo": args.repo},
    )
    if outcome == "ok":
        for cr in result.get("check_runs", []):
            conclusion = cr.get("conclusion") or cr.get("status")
            print(f"{conclusion:<12} {cr['name']}")
        return 0
    if outcome == "error":
        print(f"arq-github pr checks failed via primitive: {json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback: two-step REST
    verb = "pr.checks"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"pr-checks-{args.pr}-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "pr_number": args.pr,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
        "dispatch_path": "direct_rest_fallback",
    })
    token = _gh_auth_token()
    st1, pr_json, _ = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}", token=token
    )
    if st1 >= 400 or not isinstance(pr_json, dict):
        print(f"arq-github pr checks: fetch failed status={st1}", file=sys.stderr)
        return 1
    sha = pr_json.get("head", {}).get("sha")
    status, parsed, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/commits/{sha}/check-runs", token=token
    )
    _emit_act("act", "github_envelope_ack", f"pr-checks-{args.pr}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status,
        "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        for cr in (parsed or {}).get("check_runs", []):
            print(f"{cr['conclusion'] or cr['status']:<12} {cr['name']}")
        return 0
    print(f"arq-github pr checks failed: status={status} err={err}", file=sys.stderr)
    return 1


# ─── governed wait/watch primitives ─────────────────────────────────────────
#
# Surfaces operators reach for when waiting on CI / required checks. Before
# these existed, workers improvised bash polling loops around `arq-github pr
# checks` — operational pattern that's dangerous at multi-worker scale because
# the loops diverge in timeout / backoff / jq parsing, and the substrate has
# no envelope trail for the wait itself. Each wait verb emits envelope_sent
# pre-poll and envelope_ack post-poll with terminal outcome + wall_duration_s
# + attempts so substrate audits can see who was blocked on what and for how
# long.
#
# Principle: arq://doc/principle/repeated-bypasses-are-undiscovered-primitives-v1
# (the observation corollary — operational polling that becomes muscle-memory
# is itself a missing primitive).

CHECK_TERMINAL_CONCLUSIONS = frozenset({
    "success", "failure", "cancelled", "neutral", "skipped",
    "timed_out", "action_required", "stale",
})


def _fetch_pr_head_sha(*, owner: str, repo: str, pr: int, token: str) -> tuple[str | None, int, str | None]:
    """Resolve a PR number to its current head sha.

    Returns ``(sha, http_status, error)``. ``sha`` is ``None`` when the
    PR cannot be fetched OR has no head sha.
    """
    status, parsed, err = _api_call("GET", f"/repos/{owner}/{repo}/pulls/{pr}", token=token)
    if not (200 <= status < 300) or not isinstance(parsed, dict):
        return None, status, err
    sha = (parsed.get("head") or {}).get("sha")
    if not isinstance(sha, str) or not sha:
        return None, status, "no head sha in PR payload"
    return sha, status, None


def _check_run_terminal(check_run: dict) -> bool:
    """A check-run is terminal when status=='completed' OR its conclusion is
    in the terminal set (defensive — GitHub occasionally returns conclusion
    before flipping status)."""
    status = check_run.get("status") or ""
    conclusion = check_run.get("conclusion") or ""
    return status == "completed" or conclusion in CHECK_TERMINAL_CONCLUSIONS


def _check_run_passed(check_run: dict) -> bool:
    """Success-class outcomes: success, skipped (didn't run because not
    applicable — counts as not blocking), neutral (informational pass)."""
    conclusion = check_run.get("conclusion") or ""
    return conclusion in {"success", "skipped", "neutral"}


def _fetch_required_check_contexts(
    *, owner: str, repo: str, branch: str, token: str,
) -> tuple[list[str], int, str | None]:
    """Read the required-status-checks contexts list from branch protection.

    Returns ``(contexts, status, error)``. Empty list on 404 / 401 — caller
    decides whether to refuse or fall back to "every check" semantics.
    """
    status, parsed, err = _api_call(
        "GET", f"/repos/{owner}/{repo}/branches/{branch}/protection/required_status_checks",
        token=token,
    )
    if status == 404:
        return [], status, "branch protection not configured"
    if not (200 <= status < 300) or not isinstance(parsed, dict):
        return [], status, err
    contexts = parsed.get("contexts") or []
    if not isinstance(contexts, list):
        contexts = []
    return [c for c in contexts if isinstance(c, str)], status, None


def cmd_pr_wait_check(args: argparse.Namespace) -> int:
    """Wait for a single named check on a PR to reach a terminal state.

    Substrate-attested replacement for hand-written bash polling loops
    around ``arq-github pr checks``. Emits envelope_sent pre-poll and
    envelope_ack post-poll with the terminal conclusion, wall_duration_s,
    attempt count, and check_run_id so the substrate audit captures the
    wait itself.

    Exit codes:
      0   — check reached terminal state in the success class
            (success | skipped | neutral)
      1   — check reached terminal state in the failure class
            (failure | cancelled | timed_out | action_required | stale)
      2   — usage / pre-poll failure (PR not found, head sha not
            resolvable, etc.)
      124 — timeout: deadline expired before any matching check became
            terminal (matches the conventional 'timeout' exit code)
    """
    verb = "pr.wait_check"
    ts = _utc_ts()
    started = time.monotonic()
    token = _gh_auth_token()
    head_sha, ps, perr = _fetch_pr_head_sha(
        owner=args.owner, repo=args.repo, pr=args.pr, token=token,
    )
    if head_sha is None:
        print(
            f"arq-github pr wait-check: could not resolve PR #{args.pr} head sha "
            f"(status={ps} err={perr})",
            file=sys.stderr,
        )
        return 2

    sent_addr = _emit_act("act", "github_envelope_sent", f"pr-wait-check-{args.pr}-{ts}", {
        "verb": verb,
        "owner": args.owner, "repo": args.repo, "pr_number": args.pr,
        "head_sha": head_sha,
        "check_name": args.check,
        "timeout_s": args.timeout_s,
        "poll_interval_s": args.poll_interval_s,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    })

    attempts = 0
    outcome: str | None = None  # "passed" | "failed" | "timeout" | "matched_none"
    matched_check: dict | None = None
    deadline = started + max(1, args.timeout_s)

    while True:
        attempts += 1
        st, parsed, _e = _api_call(
            "GET",
            f"/repos/{args.owner}/{args.repo}/commits/{head_sha}/check-runs?per_page=100",
            token=token,
        )
        if 200 <= st < 300 and isinstance(parsed, dict):
            for cr in (parsed.get("check_runs") or []):
                name = cr.get("name") or ""
                # Exact or substring match — substring helps with names
                # parameterised by matrix.
                if name == args.check or args.check in name:
                    if _check_run_terminal(cr):
                        matched_check = cr
                        outcome = "passed" if _check_run_passed(cr) else "failed"
                        break
            if matched_check is not None:
                break

        if time.monotonic() >= deadline:
            outcome = "timeout"
            break
        time.sleep(args.poll_interval_s)

    wall_s = round(time.monotonic() - started, 2)
    ack = {
        "verb": verb, "envelope_sent": sent_addr,
        "outcome": outcome,
        "attempts": attempts, "wall_duration_s": wall_s,
        "check_name_pattern": args.check,
        "matched_check_name": (matched_check or {}).get("name"),
        "matched_check_id": (matched_check or {}).get("id"),
        "matched_check_conclusion": (matched_check or {}).get("conclusion"),
        "matched_check_url": (matched_check or {}).get("html_url"),
        "ts": _utc_ts(),
    }
    _emit_act("act", "github_envelope_ack", f"pr-wait-check-{args.pr}-{ts}", ack)

    if outcome == "passed":
        print(f"{(matched_check or {}).get('conclusion','?')}: {(matched_check or {}).get('name','?')}")
        return 0
    if outcome == "failed":
        print(
            f"{(matched_check or {}).get('conclusion','?')}: "
            f"{(matched_check or {}).get('name','?')} "
            f"(url={(matched_check or {}).get('html_url','')})",
            file=sys.stderr,
        )
        return 1
    print(
        f"arq-github pr wait-check: timeout after {wall_s}s / {attempts} attempts "
        f"waiting for {args.check!r} on PR #{args.pr}",
        file=sys.stderr,
    )
    return 124


def cmd_pr_wait_required(args: argparse.Namespace) -> int:
    """Wait for all branch-protection-required checks on a PR to reach
    terminal state.

    Reads the base branch's required-status-checks contexts (substrate
    governance source of truth) and waits until every required context
    appears in the PR's check-runs as terminal. Substrate-attested
    counterpart to "poll `gh pr checks --required` in a loop."

    Exit codes:
      0   — all required checks reached terminal state in the success
            class
      1   — at least one required check terminal-failed; all other
            required checks also terminal (so the verdict is final, not
            racing)
      2   — usage / pre-poll failure (PR / branch protection
            unreadable)
      124 — timeout
    """
    verb = "pr.wait_required"
    ts = _utc_ts()
    started = time.monotonic()
    token = _gh_auth_token()
    head_sha, ps, perr = _fetch_pr_head_sha(
        owner=args.owner, repo=args.repo, pr=args.pr, token=token,
    )
    if head_sha is None:
        print(
            f"arq-github pr wait-required: could not resolve PR #{args.pr} head sha "
            f"(status={ps} err={perr})",
            file=sys.stderr,
        )
        return 2

    # Resolve base branch to read the protection rule. Fetch the PR once
    # for the base ref; required_status_checks lives on the BASE branch,
    # not on main implicitly.
    st_pr, pr_json, _e = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}", token=token,
    )
    base_ref = "main"
    if 200 <= st_pr < 300 and isinstance(pr_json, dict):
        base_ref = (pr_json.get("base") or {}).get("ref") or "main"

    required, rc_status, rc_err = _fetch_required_check_contexts(
        owner=args.owner, repo=args.repo, branch=base_ref, token=token,
    )
    if not required:
        print(
            f"arq-github pr wait-required: no required-status-checks "
            f"configured on branch {base_ref!r} "
            f"(status={rc_status} err={rc_err}). Refusing to fall back to "
            f"\"every check\" semantics — branch protection IS the source of "
            f"truth for required.",
            file=sys.stderr,
        )
        return 2

    sent_addr = _emit_act("act", "github_envelope_sent", f"pr-wait-required-{args.pr}-{ts}", {
        "verb": verb,
        "owner": args.owner, "repo": args.repo, "pr_number": args.pr,
        "head_sha": head_sha,
        "base_ref": base_ref,
        "required_contexts": required,
        "timeout_s": args.timeout_s,
        "poll_interval_s": args.poll_interval_s,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    })

    attempts = 0
    deadline = started + max(1, args.timeout_s)
    final_state: dict[str, dict] = {}  # required name -> terminal check-run
    outcome: str | None = None

    while True:
        attempts += 1
        st, parsed, _e = _api_call(
            "GET",
            f"/repos/{args.owner}/{args.repo}/commits/{head_sha}/check-runs?per_page=100",
            token=token,
        )
        if 200 <= st < 300 and isinstance(parsed, dict):
            terminal_by_name: dict[str, dict] = {}
            for cr in (parsed.get("check_runs") or []):
                name = cr.get("name") or ""
                if _check_run_terminal(cr):
                    # Last-write-wins: GitHub returns multiple check-runs
                    # per name (re-runs). Pick the most recent (highest id).
                    prev = terminal_by_name.get(name)
                    if prev is None or (cr.get("id") or 0) > (prev.get("id") or 0):
                        terminal_by_name[name] = cr
            # Match required contexts against terminal names. Substring
            # match allowed because GitHub Actions sometimes prefixes
            # context names with the workflow display name.
            matched: dict[str, dict] = {}
            for ctx in required:
                for name, cr in terminal_by_name.items():
                    if name == ctx or ctx in name or name in ctx:
                        matched[ctx] = cr
                        break
            if len(matched) >= len(required):
                final_state = matched
                outcome = "all_terminal"
                break

        if time.monotonic() >= deadline:
            outcome = "timeout"
            break
        time.sleep(args.poll_interval_s)

    wall_s = round(time.monotonic() - started, 2)
    per_check_summary = [
        {
            "context": ctx,
            "matched_name": cr.get("name"),
            "conclusion": cr.get("conclusion"),
            "passed": _check_run_passed(cr),
            "id": cr.get("id"),
            "url": cr.get("html_url"),
        }
        for ctx, cr in final_state.items()
    ]
    failed_contexts = [s["context"] for s in per_check_summary if not s["passed"]]
    ack_outcome = outcome
    if outcome == "all_terminal":
        ack_outcome = "all_passed" if not failed_contexts else "some_failed"
    ack = {
        "verb": verb, "envelope_sent": sent_addr,
        "outcome": ack_outcome,
        "attempts": attempts, "wall_duration_s": wall_s,
        "required_contexts": required,
        "per_check": per_check_summary,
        "failed_contexts": failed_contexts,
        "ts": _utc_ts(),
    }
    _emit_act("act", "github_envelope_ack", f"pr-wait-required-{args.pr}-{ts}", ack)

    if ack_outcome == "all_passed":
        for s in per_check_summary:
            print(f"{s['conclusion']:<12} {s['context']}")
        return 0
    if ack_outcome == "some_failed":
        for s in per_check_summary:
            mark = "✓" if s["passed"] else "✗"
            print(f"{mark} {s['conclusion']:<12} {s['context']}", file=sys.stderr)
        return 1
    print(
        f"arq-github pr wait-required: timeout after {wall_s}s / "
        f"{attempts} attempts; {len(final_state)}/{len(required)} terminal",
        file=sys.stderr,
    )
    return 124


def cmd_pr_files(args: argparse.Namespace) -> int:
    """List files changed in a PR. Read-only. Substrate envelope trail.

    Closes the substrate gap noted in policy-approver Phase 2 (PR #3694):
    the worker's fact-gatherer needed file-pattern matching but had no
    arq-github verb to fetch /pulls/{N}/files, so it fell back to a bare
    `gh api` call. This verb makes the read-path mesh-routed.

    By default: prints `<status>\\t<filename>` per file (status is one of
    added / modified / removed / renamed / copied / changed / unchanged).
    `--json` prints the full GitHub file objects (additions, deletions,
    changes, patch, blob_url, raw_url, contents_url).
    """
    # Primitive-first (mirrors cmd_pr_view / cmd_pr_checks pattern).
    outcome, result = _primitive_invoke(
        "github.pr.files",
        {"pr_number": args.pr, "owner": args.owner, "repo": args.repo},
    )
    if outcome == "ok":
        files = result.get("files") or result.get("pr_files") or []
        if getattr(args, "json", False):
            print(json.dumps(files, indent=2))
        else:
            for f in files:
                print(f"{f.get('status','?'):<10}\t{f.get('filename','')}")
        return 0
    if outcome == "error":
        print(
            f"arq-github pr files failed via primitive: {json.dumps(result, indent=2)}",
            file=sys.stderr,
        )
        return 1

    # Fallback: direct REST.
    verb = "pr.files"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"pr-files-{args.pr}-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "pr_number": args.pr,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
        "dispatch_path": "direct_rest_fallback",
    })
    token = _gh_auth_token()
    # /pulls/{N}/files supports up to 300 per page; we cap there. PRs with
    # >300 changed files are rare and out of policy-approver scope anyway.
    status, parsed, err = _api_call(
        "GET",
        f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}/files?per_page=300",
        token=token,
    )
    _emit_act("act", "github_envelope_ack", f"pr-files-{args.pr}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status,
        "count": len(parsed) if isinstance(parsed, list) else 0, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        files = parsed if isinstance(parsed, list) else []
        if getattr(args, "json", False):
            print(json.dumps(files, indent=2))
        else:
            for f in files:
                print(f"{f.get('status','?'):<10}\t{f.get('filename','')}")
        return 0
    print(f"arq-github pr files failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_run_view(args: argparse.Namespace) -> int:
    verb = "run.view"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"run-view-{args.run}-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "run_id": args.run,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    })
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/actions/runs/{args.run}", token=token
    )
    _emit_act("act", "github_envelope_ack", f"run-view-{args.run}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        r = parsed or {}
        print(f"run #{r.get('id')} {r.get('status')}/{r.get('conclusion')} "
              f"sha={r.get('head_sha','')[:8]} event={r.get('event')}")
        print(f"  started: {r.get('run_started_at')}")
        print(f"  url:     {r.get('html_url')}")
        return 0
    print(f"arq-github run view failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_run_list(args: argparse.Namespace) -> int:
    verb = "run.list"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"run-list-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo,
        "workflow": args.workflow, "limit": args.limit,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    })
    token = _gh_auth_token()
    path = f"/repos/{args.owner}/{args.repo}/actions/runs?per_page={args.limit}"
    if args.workflow:
        # Resolve workflow filename to id
        wf_st, wf_parsed, _ = _api_call(
            "GET", f"/repos/{args.owner}/{args.repo}/actions/workflows", token=token
        )
        if 200 <= wf_st < 300:
            for wf in (wf_parsed or {}).get("workflows", []):
                if wf.get("path", "").endswith(args.workflow) or wf.get("name") == args.workflow:
                    path = f"/repos/{args.owner}/{args.repo}/actions/workflows/{wf['id']}/runs?per_page={args.limit}"
                    break
    status, parsed, err = _api_call("GET", path, token=token)
    _emit_act("act", "github_envelope_ack", f"run-list-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        for r in (parsed or {}).get("workflow_runs", []):
            print(f"run#{r['id']:<12} {r['status']}/{r.get('conclusion') or '-':<10} "
                  f"{r.get('head_sha','')[:8]}  {r.get('name')}")
        return 0
    print(f"arq-github run list failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_run_jobs(args: argparse.Namespace) -> int:
    """List the jobs inside a workflow run with per-job status/conclusion.

    Mesh-surface gap closer: ``arq-github pr checks`` surfaces the aggregate
    status of multi-job workflows (e.g. ``Security Gate`` aggregating
    frontend-audit / backend-audit / trivy / codeql / secret-scan). Without
    this verb, diagnosing which sub-job failed required ``gh run view`` ­—
    blocked by the mesh-enforce hook.
    """
    verb = "run.jobs"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"run-jobs-{args.run}-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "run_id": args.run,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    })
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/actions/runs/{args.run}/jobs?per_page=100",
        token=token,
    )
    _emit_act("act", "github_envelope_ack", f"run-jobs-{args.run}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        jobs = (parsed or {}).get("jobs", [])
        if not jobs:
            print("(no jobs)")
            return 0
        for j in jobs:
            conclusion = j.get("conclusion") or "-"
            print(f"job#{j['id']:<12} {j.get('status'):12} {conclusion:10} {j.get('name')}")
            if args.steps:
                for step in j.get("steps") or []:
                    st_conc = step.get("conclusion") or "-"
                    marker = "✓" if st_conc == "success" else ("✗" if st_conc == "failure" else "·")
                    print(f"    {marker} step#{step.get('number'):<3} {step.get('status'):12} "
                          f"{st_conc:10} {step.get('name')}")
        return 0
    print(f"arq-github run jobs failed: status={status} err={err}", file=sys.stderr)
    return 1


class _NoRedirectHandler(urllib.request.HTTPRedirectHandler):
    """Intercept 302s so we can re-issue without the GitHub auth header.

    GitHub's job-logs endpoint issues a 302 to a pre-signed Azure blob URL
    whose authentication is embedded in the query string. If urllib forwards
    our ``Authorization: Bearer <gh-token>`` to Azure, Azure rejects with 401
    ``InvalidAuthenticationInfo``.
    """

    def redirect_request(self, req, fp, code, msg, headers, newurl):  # noqa: PLR6301
        raise _Redirected(newurl)


class _Redirected(Exception):
    def __init__(self, url: str):
        super().__init__(url)
        self.url = url


def cmd_run_rerun(args: argparse.Namespace) -> int:
    """Re-run a workflow run (all jobs, or failed-only with --failed).

    Substrate-attested wrapper for `gh run rerun`. Closes the
    rerun-verb primitive gap surfaced 2026-05-15 during the FIRE-02
    CI-flake recovery (shard-7 killed by 14.5-min step timeout).

    Operational evidence:
      arq://act/session_close/operational-topology-convergence-merge-sequence-completion-2026-05-15

    Governance: this is the canonical path for CI-flake recovery per
    CLAUDE.md "CI reliability" section ("No trigger-commits — use
    `gh run rerun <run-id>`"). The bare-gh form is blocked by
    mesh-enforce; this verb provides the substrate-attested route.
    """
    verb = "run.rerun_failed" if args.failed else "run.rerun"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"run-rerun-{args.run}-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "run_id": args.run,
        "failed_only": bool(args.failed),
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    })
    token = _gh_auth_token()
    suffix = "rerun-failed-jobs" if args.failed else "rerun"
    status, parsed, err = _api_call(
        "POST",
        f"/repos/{args.owner}/{args.repo}/actions/runs/{args.run}/{suffix}",
        token=token,
        body={},
    )
    _emit_act("act", "github_envelope_ack", f"run-rerun-{args.run}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        print(f"run-rerun: queued ({'failed-only' if args.failed else 'all'}) on run #{args.run}")
        return 0
    print(f"arq-github run rerun failed: status={status} err={err} parsed={parsed}", file=sys.stderr)
    return 1


def cmd_run_logs(args: argparse.Namespace) -> int:
    """Fetch raw logs for a single job inside a workflow run.

    GitHub's REST API serves per-job logs at
    ``/repos/{owner}/{repo}/actions/jobs/{job_id}/logs``. A 302 redirect to a
    pre-signed Azure blob URL is issued on the first hop; we follow the
    redirect with a fresh request that DOES NOT forward the GitHub auth
    header (the pre-signed URL carries its own SAS token).
    """
    verb = "run.logs"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"run-logs-{args.job}-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "job_id": args.job,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    })
    token = _gh_auth_token()
    url = f"{GITHUB_API}/repos/{args.owner}/{args.repo}/actions/jobs/{args.job}/logs"

    opener = urllib.request.build_opener(_NoRedirectHandler)
    redirect_url: str | None = None
    req = urllib.request.Request(
        url,
        headers={
            "Authorization": f"Bearer {token}",
            "Accept": "application/vnd.github+json",
            "X-GitHub-Api-Version": "2022-11-28",
            "User-Agent": "arq-github/mvp",
        },
        method="GET",
    )

    status = 0
    text: str | None = None
    try:
        try:
            with opener.open(req, timeout=60) as resp:
                status = resp.status
                text = resp.read().decode("utf-8", errors="replace")
        except _Redirected as r:
            redirect_url = r.url
    except urllib.error.HTTPError as exc:
        status = exc.code
        err_text = exc.read().decode("utf-8", errors="replace")
        _emit_act("act", "github_envelope_ack", f"run-logs-{args.job}-{ts}", {
            "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
        })
        print(f"arq-github run logs failed: status={status} err={err_text[:200]}", file=sys.stderr)
        return 1
    except urllib.error.URLError as exc:
        _emit_act("act", "github_envelope_ack", f"run-logs-{args.job}-{ts}", {
            "verb": verb, "envelope_sent": sent, "http_status": 0, "ts": _utc_ts(),
            "error": str(exc),
        })
        print(f"arq-github run logs failed: network error {exc}", file=sys.stderr)
        return 1

    if redirect_url:
        # Fresh request to the pre-signed URL — no Authorization header.
        followup = urllib.request.Request(
            redirect_url, headers={"User-Agent": "arq-github/mvp"}, method="GET",
        )
        try:
            with urllib.request.urlopen(followup, timeout=60) as resp2:
                status = resp2.status
                text = resp2.read().decode("utf-8", errors="replace")
        except urllib.error.HTTPError as exc:
            status = exc.code
            err_text = exc.read().decode("utf-8", errors="replace")
            _emit_act("act", "github_envelope_ack", f"run-logs-{args.job}-{ts}", {
                "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
            })
            print(f"arq-github run logs failed (redirect): status={status} err={err_text[:200]}",
                  file=sys.stderr)
            return 1

    _emit_act("act", "github_envelope_ack", f"run-logs-{args.job}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
    })

    if text is None:
        print("arq-github run logs: no body returned", file=sys.stderr)
        return 1
    if args.tail and args.tail > 0:
        lines = text.splitlines()
        text = "\n".join(lines[-args.tail:])
    print(text)
    return 0


# ─── workflow dispatch (write verb) ─────────────────────────────────────────
#
# Closes the substrate gap that forced staging deploys through the
# ARQERA_MESH_ENFORCE_DISABLE=1 emergency bypass. The deploy-staging.yml
# workflow is workflow_dispatch-only and requires a `substrate_approval_ref`
# input. Without this verb, the only path was raw `gh workflow run` —
# blocked by mesh-enforce. With it, every deploy carries an envelope trail
# AND a verifiable approval reference.
#
# Surfaced by: PR #3719 (adjudicator visibility close, 2026-05-14)
# Closes: issue #3720
# Policy: arq://doc/policy/twin-dispatch-mesh-only-v2
# Boundary: marked INFRA_OWNED_TEMPORARY_EXECUTION since it executes
#   GitHub deploy authority from this repo. Long-term, arqera-infra owns
#   the workflow_dispatch path; this verb is the substrate-attested
#   transitional surface until then. See CLAUDE.md § ARQERA mutation
#   boundary.


def _resolve_workflow_id(
    *, owner: str, repo: str, workflow_name: str, token: str,
) -> tuple[int | None, int, str | None]:
    """Resolve a workflow filename/display-name to its numeric id.

    Returns ``(id, http_status, error)``. ``id`` is ``None`` when the
    workflow cannot be found OR when the API call failed; ``http_status``
    is whatever the underlying ``/actions/workflows`` call returned so
    callers can distinguish "not found" (200 + no match) from "API broken"
    (4xx/5xx).
    """
    status, parsed, err = _api_call(
        "GET", f"/repos/{owner}/{repo}/actions/workflows", token=token,
    )
    if not (200 <= status < 300) or not isinstance(parsed, dict):
        return None, status, err
    for wf in (parsed.get("workflows") or []):
        path = wf.get("path", "") or ""
        name = wf.get("name", "") or ""
        # Match either the filename suffix (e.g. ``deploy-staging.yml``)
        # or the display name (``Deploy to Staging``). Mirrors the
        # ``cmd_run_list`` resolution pattern so behaviour is consistent
        # across read and write verbs.
        if path.endswith(workflow_name) or name == workflow_name:
            wf_id = wf.get("id")
            if isinstance(wf_id, int):
                return wf_id, status, None
    return None, status, None


def _find_run_id_for_dispatch(
    *,
    owner: str,
    repo: str,
    workflow_id: int,
    branch: str,
    after_dt: datetime,
    token: str,
    poll_attempts: int = 5,
    poll_delay_s: float = 2.0,
) -> tuple[int | None, str | None]:
    """Best-effort capture of the run id GitHub spawned from this dispatch.

    Polls ``/actions/workflows/{id}/runs?event=workflow_dispatch&branch=...``
    a few times and returns the first run whose ``created_at`` is at or
    after ``after_dt``. Returns ``(run_id, run_url)`` or ``(None, None)``
    if no matching run appears within the window.

    ``after_dt`` is a tz-aware ``datetime``. GitHub returns ``created_at``
    in canonical ISO 8601 (``2026-05-14T06:00:30Z`` — with hyphens +
    colons + trailing Z) which is NOT lex-comparable to the compact
    ``_utc_ts()`` format (``20260514T060000Z``). Parsing each candidate
    into a datetime and comparing as datetimes is the only correct path.

    Deliberately best-effort: GitHub's dispatch endpoint returns 204
    with no body, so the run id is not knowable from the dispatch call
    itself. Polling is the only path. If polling fails the ack still
    emits with ``run_id=null`` so the dispatch outcome stays substrate-
    visible — we never silently drop evidence because the lookup raced.
    """
    path = (
        f"/repos/{owner}/{repo}/actions/workflows/{workflow_id}/runs"
        f"?event=workflow_dispatch&branch={branch}&per_page=5"
    )
    for _ in range(max(1, poll_attempts)):
        status, parsed, _err = _api_call("GET", path, token=token)
        if 200 <= status < 300 and isinstance(parsed, dict):
            for run in (parsed.get("workflow_runs") or []):
                created_at_raw = run.get("created_at", "") or ""
                created_dt = _parse_github_iso8601(created_at_raw)
                if created_dt is None:
                    continue
                if created_dt >= after_dt:
                    run_id = run.get("id")
                    run_url = run.get("html_url")
                    if isinstance(run_id, int):
                        return run_id, run_url
        time.sleep(poll_delay_s)
    return None, None


def _parse_github_iso8601(s: str) -> datetime | None:
    """Parse a GitHub timestamp into a tz-aware datetime.

    GitHub returns ``YYYY-MM-DDTHH:MM:SSZ`` (Z suffix, UTC). Python's
    ``fromisoformat`` accepts ``+00:00`` but not bare ``Z`` until 3.11,
    so normalise. Returns ``None`` if the string is empty or unparsable
    — callers treat that as "skip this candidate," never as a crash.
    """
    if not s:
        return None
    try:
        return datetime.fromisoformat(s.replace("Z", "+00:00"))
    except (ValueError, TypeError):
        return None


def cmd_workflow_dispatch(args: argparse.Namespace) -> int:
    """Trigger a GitHub workflow run through the substrate-attested path.

    Substrate-canonical replacement for ``gh workflow run`` + the
    ``ARQERA_MESH_ENFORCE_DISABLE=1`` emergency bypass. Every dispatch:

      1. Requires a resolvable ``--substrate-approval-ref arq://act/...``
         authorising the call. Refuses if missing, malformed, or
         unresolvable on substrate.
      2. Emits ``arq://act/github_envelope_sent/workflow-dispatch/<ts>``
         with verb, workflow, ref, inputs (approval_ref INCLUDED in full),
         actor, actor_peer, reason.
      3. POSTs ``/repos/{owner}/{repo}/actions/workflows/{id}/dispatches``.
      4. Polls the runs API briefly to capture the spawned run id.
      5. Emits ``arq://act/github_envelope_ack/workflow-dispatch/<ts>``
         with envelope_sent address, http_status, run_id, run_url,
         dispatch_outcome.

    INFRA_OWNED_TEMPORARY_EXECUTION: this verb executes GitHub deploy
    authority from the ARQERA repo. Long-term owner is arqera-infra.
    """
    # ── Approval-ref validation (fail closed) ────────────────────────
    approval_ref = (args.substrate_approval_ref or "").strip()
    if not approval_ref.startswith("arq://act/"):
        print(
            "arq-github workflow dispatch: --substrate-approval-ref must be "
            "an arq://act/... address (no implicit approvals)",
            file=sys.stderr,
        )
        return 2
    approval_record = _fetch_substrate_act_via_twin(approval_ref, timeout=10)
    if approval_record is None:
        print(
            f"arq-github workflow dispatch: substrate could not resolve "
            f"approval_ref {approval_ref!r}. Either the act does not exist, "
            f"the twin CLI is unavailable, or addressing.arqera.io is "
            f"unreachable. Refusing to dispatch — re-run once substrate "
            f"can confirm the approval.",
            file=sys.stderr,
        )
        return 2

    # ── Reason required (audit-trail discipline) ─────────────────────
    if not args.reason or not args.reason.strip():
        print(
            "arq-github workflow dispatch: --reason is required (audit trail "
            "for what authorised this deploy)",
            file=sys.stderr,
        )
        return 2

    # ── Inputs (-F key=value, repeatable) → dispatch payload dict ───
    inputs: dict[str, str] = {}
    for raw in (args.field or []):
        if "=" not in raw:
            print(
                f"arq-github workflow dispatch: --field must be key=value "
                f"(got {raw!r})",
                file=sys.stderr,
            )
            return 2
        k, _, v = raw.partition("=")
        inputs[k.strip()] = v
    # Always inject the approval ref as a dispatch input — the
    # deploy-staging.yml workflow requires this exact field. Workers
    # don't have to remember to pass it via -F because the verb's
    # contract IS substrate-attested.
    inputs.setdefault("substrate_approval_ref", approval_ref)

    # ── Resolve workflow filename → id ───────────────────────────────
    token = _gh_auth_token()
    workflow_id, lookup_status, lookup_err = _resolve_workflow_id(
        owner=args.owner, repo=args.repo, workflow_name=args.workflow, token=token,
    )
    if workflow_id is None:
        print(
            f"arq-github workflow dispatch: could not resolve workflow "
            f"{args.workflow!r} (lookup status={lookup_status} "
            f"err={lookup_err})",
            file=sys.stderr,
        )
        return 1

    # ── Pre-dispatch envelope ────────────────────────────────────────
    verb = "workflow.dispatch"
    # ``ts`` is the compact-format slug ("20260514T060000Z") used for the
    # act ref. ``ts_dt`` is the tz-aware datetime used to filter GitHub
    # run candidates whose ``created_at`` arrives in canonical ISO 8601
    # (with hyphens + colons) — the two formats are NOT lex-comparable.
    ts_dt = datetime.now(timezone.utc)
    ts = ts_dt.strftime("%Y%m%dT%H%M%SZ")
    actor = _gh_active_user()
    actor_peer = ACTOR_PEER_ADDRESS
    envelope = {
        "verb": verb,
        "owner": args.owner,
        "repo": args.repo,
        "workflow": args.workflow,
        "workflow_id": workflow_id,
        "ref": args.ref,
        "inputs": inputs,
        "substrate_approval_ref": approval_ref,
        "approval_record_class_type": approval_record.get("class_type"),
        "approval_record_source": approval_record.get("source"),
        "actor": actor,
        "actor_peer": actor_peer,
        "reason": args.reason.strip(),
        "ts": ts,
        "dispatch_path": "direct_rest",
        "principle": "twin-dispatch-mesh-only-v2",
        "boundary_note": "INFRA_OWNED_TEMPORARY_EXECUTION",
    }
    ref_for_act = f"workflow-dispatch-{workflow_id}-{ts}"
    sent_addr = _emit_act("act", "github_envelope_sent", ref_for_act, envelope)

    # ── Dispatch ─────────────────────────────────────────────────────
    body = {"ref": args.ref, "inputs": inputs}
    status, parsed, err = _api_call(
        "POST",
        f"/repos/{args.owner}/{args.repo}/actions/workflows/{workflow_id}/dispatches",
        token=token,
        body=body,
    )
    dispatched_ok = 200 <= status < 300

    # ── Run-id capture (best-effort) ────────────────────────────────
    run_id: int | None = None
    run_url: str | None = None
    run_capture_outcome = "not_attempted"
    if dispatched_ok:
        run_id, run_url = _find_run_id_for_dispatch(
            owner=args.owner,
            repo=args.repo,
            workflow_id=workflow_id,
            branch=args.ref,
            after_dt=ts_dt,
            token=token,
        )
        run_capture_outcome = "captured" if run_id is not None else "not_found_within_poll_window"

    # ── Post-dispatch envelope ───────────────────────────────────────
    ack = {
        "verb": verb,
        "envelope_sent": sent_addr,
        "http_status": status,
        "dispatch_outcome": "dispatched" if dispatched_ok else "rejected",
        "workflow": args.workflow,
        "workflow_id": workflow_id,
        "ref": args.ref,
        "run_id": run_id,
        "run_url": run_url,
        "run_capture_outcome": run_capture_outcome,
        "substrate_approval_ref": approval_ref,
        "actor": actor,
        "actor_peer": actor_peer,
        "error": err if (status >= 400 or status == 0) else None,
        "ts": _utc_ts(),
    }
    _emit_act("act", "github_envelope_ack", ref_for_act, ack)

    # ── Output ───────────────────────────────────────────────────────
    if dispatched_ok:
        if run_url:
            print(run_url)
        else:
            # Dispatch succeeded but we didn't catch the run id. Print
            # the workflow runs URL so the operator can find it.
            print(
                f"https://github.com/{args.owner}/{args.repo}/actions/"
                f"workflows/{workflow_id}"
            )
        return 0
    print(
        f"arq-github workflow dispatch failed: status={status} err={err}",
        file=sys.stderr,
    )
    return 1


def cmd_code_scanning_alerts(args: argparse.Namespace) -> int:
    """List Code Scanning alerts for a repo, filtered by tool + state.

    Companion to ``run jobs`` / ``run logs``: SARIF-based scanners (Trivy,
    CodeQL, semgrep, etc.) upload their findings as Code Scanning alerts.
    The ``Trivy Security Scan`` job exits 1 when HIGH/CRITICAL findings
    exist, but stdout only shows progress — the actual vuln details live
    in the alerts. Without this verb, the only way to see them was the
    repo Security tab (UI) or bare ``gh api`` (blocked by mesh-enforce).
    """
    verb = "code-scanning.alerts"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"code-scanning-alerts-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo,
        "tool": args.tool, "state": args.state, "ref": args.ref,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    })
    token = _gh_auth_token()
    query = [f"state={args.state}", f"per_page={args.limit}"]
    if args.tool:
        query.append(f"tool_name={args.tool}")
    if args.ref:
        query.append(f"ref=refs/heads/{args.ref}")
    if args.severity:
        query.append(f"severity={args.severity}")
    path = f"/repos/{args.owner}/{args.repo}/code-scanning/alerts?{'&'.join(query)}"
    status, parsed, err = _api_call("GET", path, token=token)
    _emit_act("act", "github_envelope_ack", f"code-scanning-alerts-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        alerts = parsed or []
        if not alerts:
            print("(no alerts)")
            return 0
        if args.json:
            print(json.dumps(alerts, indent=2))
            return 0
        for a in alerts:
            rule = a.get("rule") or {}
            most_recent = a.get("most_recent_instance") or {}
            location = most_recent.get("location") or {}
            loc_str = f"{location.get('path', '?')}:{location.get('start_line', '?')}"
            msg = (most_recent.get("message") or {}).get("text", "")[:120]
            print(
                f"alert#{a.get('number'):<6} "
                f"{(a.get('state') or '-'):10} "
                f"sev={rule.get('security_severity_level') or rule.get('severity') or '-':<10} "
                f"rule={(rule.get('id') or rule.get('name') or '-')[:60]:<60} "
                f"tool={(a.get('tool') or {}).get('name', '-'):<10} "
                f"{loc_str}"
            )
            if args.verbose and msg:
                print(f"    {msg}")
        return 0
    print(f"arq-github code-scanning alerts failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_issue_list(args: argparse.Namespace) -> int:
    # Primitive-first (mirror cmd_pr_list, Option B Move 6 pattern).
    primitive_payload = {
        "state": args.state,
        "limit": args.limit,
        "owner": args.owner,
        "repo": args.repo,
    }
    if args.label:
        primitive_payload["labels"] = args.label
    outcome, result = _primitive_invoke("github.issue.list", primitive_payload)
    if outcome == "ok":
        for it in result.get("issues", []):
            raw_labels = it.get("labels") or []
            # Defensive: when github.issue.list lands as a real daemon
            # bridge, the underlying GitHub REST shape returns labels as
            # list-of-dicts ({"name": "...", "color": "..."}). Symmetric
            # with the fallback path below (line ~1730). Strings stay
            # strings; dicts surface their "name".
            label_names = [
                lb if isinstance(lb, str) else (lb.get("name", "") if isinstance(lb, dict) else "")
                for lb in raw_labels
            ]
            labels = ",".join(lb for lb in label_names if lb)
            label_field = f" [{labels}]" if labels else ""
            print(f"#{it['number']:<6} {it['state']:<8}{label_field} {it['title']}")
        return 0
    if outcome == "error":
        print(f"arq-github issue list failed via primitive: {json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback: direct REST. GitHub's issues endpoint returns BOTH issues
    # and PRs; PRs carry a "pull_request" subobject which we filter out so
    # the surface name (`issue list`) matches the surface meaning.
    verb = "issue.list"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"issue-list-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "state": args.state,
        "labels": args.label or "",
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
        "dispatch_path": "direct_rest_fallback",
    })
    token = _gh_auth_token()
    q = f"?state={args.state}&per_page={args.limit}"
    if args.label:
        q += f"&labels={args.label}"
    status, parsed, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/issues{q}", token=token
    )
    rows = []
    if isinstance(parsed, list):
        rows = [i for i in parsed if isinstance(i, dict) and "pull_request" not in i]
    _emit_act("act", "github_envelope_ack", f"issue-list-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status,
        "raw_count": len(parsed) if isinstance(parsed, list) else 0,
        "issue_count_excluding_prs": len(rows),
        "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        for i in rows:
            labels = ",".join(l.get("name","") for l in (i.get("labels") or []) if isinstance(l, dict))
            label_field = f" [{labels}]" if labels else ""
            print(f"#{i['number']:<6} {i['state']:<8}{label_field} {i['title']}")
        return 0
    print(f"arq-github issue list failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_issue_view(args: argparse.Namespace) -> int:
    # Primitive-first (Option B Move 6).
    outcome, result = _primitive_invoke(
        "github.issue.view",
        {"issue_number": args.issue, "owner": args.owner, "repo": args.repo},
    )
    if outcome == "ok":
        print(json.dumps(result.get("full_issue") or result, indent=2))
        return 0
    if outcome == "error":
        print(f"arq-github issue view failed via primitive: {json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback
    verb = "issue.view"
    ts = _utc_ts()
    sent = _emit_act("act", "github_envelope_sent", f"issue-view-{args.issue}-{ts}", {
        "verb": verb, "owner": args.owner, "repo": args.repo, "issue_number": args.issue,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
        "dispatch_path": "direct_rest_fallback",
    })
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/issues/{args.issue}", token=token
    )
    _emit_act("act", "github_envelope_ack", f"issue-view-{args.issue}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        print(json.dumps(parsed, indent=2))
        return 0
    print(f"arq-github issue view failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_issue_comment(args: argparse.Namespace) -> int:
    # Primitive-first (Option B Move 6).
    outcome, result = _primitive_invoke(
        "github.issue.comment",
        {
            "issue_number": args.issue,
            "body": args.body,
            "owner": args.owner,
            "repo": args.repo,
        },
    )
    if outcome == "ok":
        print(result.get("url", ""))
        return 0
    if outcome == "error":
        print(f"arq-github issue comment failed via primitive: {json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback: direct REST
    verb = "issue.comment"
    ts = _utc_ts()
    actor = _gh_active_user()
    actor_peer = ACTOR_PEER_ADDRESS
    body = {"body": args.body}
    envelope = {
        **body, "verb": verb, "owner": args.owner, "repo": args.repo,
        "issue_number": args.issue, "actor": actor, "actor_peer": actor_peer, "ts": ts,
        "dispatch_path": "direct_rest_fallback",
    }
    sent_addr = _emit_act("act", "github_envelope_sent", f"issue-comment-{args.issue}-{ts}", envelope)
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "POST",
        f"/repos/{args.owner}/{args.repo}/issues/{args.issue}/comments",
        token=token,
        body=body,
    )
    ack = {
        "verb": verb, "envelope_sent": sent_addr, "http_status": status,
        "issue_number": args.issue,
        "comment_id": parsed.get("id") if isinstance(parsed, dict) else None,
        "comment_url": parsed.get("html_url") if isinstance(parsed, dict) else None,
        "error": err if status >= 400 or status == 0 else None, "ts": _utc_ts(),
    }
    _emit_act("act", "github_envelope_ack", f"issue-comment-{args.issue}-{ts}", ack)
    if 200 <= status < 300:
        print(parsed.get("html_url", ""))
        return 0
    print(f"arq-github issue comment failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_issue_create(args: argparse.Namespace) -> int:
    # Primitive-first (Option B Move 6).
    labels = [lb.strip() for lb in (args.labels or "").split(",") if lb.strip()]
    payload = {
        "title": args.title,
        "body": args.body or "",
        "owner": args.owner,
        "repo": args.repo,
    }
    if labels:
        payload["labels"] = labels
    outcome, result = _primitive_invoke("github.issue.create", payload)
    if outcome == "ok":
        print(result.get("url", ""))
        return 0
    if outcome == "error":
        print(f"arq-github issue create failed via primitive: {json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback
    verb = "issue.create"
    ts = _utc_ts()
    actor = _gh_active_user()
    actor_peer = ACTOR_PEER_ADDRESS
    body = {"title": args.title, "body": args.body or ""}
    if labels:
        body["labels"] = labels
    envelope = {**body, "verb": verb, "owner": args.owner, "repo": args.repo,
                "actor": actor, "actor_peer": actor_peer, "ts": ts,
                "dispatch_path": "direct_rest_fallback"}
    sent_addr = _emit_act("act", "github_envelope_sent", f"issue-create-{ts}", envelope)
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "POST", f"/repos/{args.owner}/{args.repo}/issues", token=token, body=body,
    )
    ack = {
        "verb": verb, "envelope_sent": sent_addr, "http_status": status,
        "issue_number": parsed.get("number") if isinstance(parsed, dict) else None,
        "issue_url": parsed.get("html_url") if isinstance(parsed, dict) else None,
        "error": err if status >= 400 or status == 0 else None, "ts": _utc_ts(),
    }
    _emit_act("act", "github_envelope_ack", f"issue-create-{ts}", ack)
    if 200 <= status < 300:
        print(parsed.get("html_url", ""))
        return 0
    print(f"arq-github issue create failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_pr_review(args: argparse.Namespace) -> int:
    if args.decision == "approve":
        print(
            "arq-github pr review --approve is disabled: approval affects "
            "substrate-governed merge authority.\n"
            "Use GitHub peer review directly, or route approval through a "
            "substrate-governed review flow before merge-via-substrate.",
            file=sys.stderr,
        )
        return 2

    event_map = {"approve": "APPROVE", "request-changes": "REQUEST_CHANGES", "comment": "COMMENT"}
    event = event_map[args.decision]
    body_text = _resolve_body(args)

    # Primitive-first (Option B Move 6).
    outcome, result = _primitive_invoke(
        "github.pr.review",
        {
            "pr_number": args.pr,
            "event": event,
            "body": body_text,
            "owner": args.owner,
            "repo": args.repo,
        },
    )
    if outcome == "ok":
        print(f"review submitted: event={event} pr=#{args.pr}")
        return 0
    if outcome == "error":
        print(f"arq-github pr review failed via primitive: {json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback: direct REST
    verb = "pr.review"
    ts = _utc_ts()
    actor = _gh_active_user()
    actor_peer = ACTOR_PEER_ADDRESS
    body = {"event": event, "body": body_text}
    envelope = {
        **body, "verb": verb, "owner": args.owner, "repo": args.repo,
        "pr_number": args.pr, "actor": actor, "actor_peer": actor_peer, "ts": ts,
        "dispatch_path": "direct_rest_fallback",
    }
    sent_addr = _emit_act("act", "github_envelope_sent", f"pr-review-{args.pr}-{ts}", envelope)
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "POST",
        f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}/reviews",
        token=token,
        body=body,
    )
    ack = {
        "verb": verb,
        "envelope_sent": sent_addr,
        "http_status": status,
        "pr_number": args.pr,
        "event": event,
        "review_id": parsed.get("id") if isinstance(parsed, dict) else None,
        "error": err if status >= 400 or status == 0 else None,
        "ts": _utc_ts(),
    }
    _emit_act("act", "github_envelope_ack", f"pr-review-{args.pr}-{ts}", ack)
    if 200 <= status < 300:
        print(f"review submitted: event={event} pr=#{args.pr}")
        return 0
    print(f"arq-github pr review failed: status={status} err={err}", file=sys.stderr)
    return 1


# ─── Substrate-governed approval flow ──────────────────────────────────────
#
# PR #3609 (2026-05-04) intentionally disabled `arq-github pr review --approve`
# to close legacy auto-approval bypasses. The error message there points at
# "a substrate-governed review flow" — which is what this section builds.
#
# How it works:
#   1. `arq-github pr substrate-approve <pr> --reason "..."` emits a signed
#      `review_approved` substrate act at canonical address
#      `arq://act/review_approved/<owner>-<repo>-<pr>-<head_sha[:12]>-<approver_login>`
#      and posts an indexable marker as a PR review comment so
#      merge-via-substrate can discover the approval.
#
#   2. `merge-via-substrate` parses SUBSTRATE_APPROVAL markers from the PR's
#      review comments + issue comments, fetches each referenced act, and
#      counts the signing peer as a cross-peer approver when:
#        - the marker's pr / head_sha / owner / repo all match the merge
#          target's current head SHA (no SHA drift); AND
#        - the act's signing peer is different from the PR author's peer; AND
#        - the act's payload `github_login` differs from the PR author's login
#
#   3. Same-peer signing FAILS the cross-peer check by design — that is the
#      structural property the regulator-link (SOC 2 CC8.1, EU AI Act Art.12)
#      depends on.
#
# Customer-link: every substrate-attested approval is a cryptographically
# verifiable record an external auditor can replay. The marker comment on the
# PR is the discovery index; the act is the proof.


SUBSTRATE_APPROVAL_MARKER_PREFIX = "<!-- SUBSTRATE_APPROVAL"
SUBSTRATE_APPROVAL_MARKER_SUFFIX = "-->"


# Documented best-effort / non-blocking CI checks. These workflows
# explicitly declare themselves non-blocking in their own logic
# (e.g. signal-arqera.yml line 81: "Signal failed -- non-blocking").
# A failure on a non-blocking check MUST NOT block the substrate merge
# gate AND MUST NOT contaminate the substrate evidence chain's
# "failing checks" list (which informs auditors what actually went wrong).
#
# Composes with `arq://doc/principle/cancelled-is-not-failure-v1` —
# generalises that principle's "infer from semantics, not raw conclusion"
# pattern to documented-best-effort observability workflows.
NON_BLOCKING_CHECKS: set[str] = {
    "signal",  # signal-arqera.yml — best-effort observability ping
}


def _substrate_approval_ref(
    *, owner: str, repo: str, pr_number: int, head_sha: str, approver_login: str
) -> str:
    """Canonical reference for a substrate review-approval act.

    Idempotent on (pr, head_sha, approver_login) so a repeated approval by the
    same approver against the same head SHA does not multiply act records.
    """
    short_sha = (head_sha or "unknown")[:12]
    safe_login = (approver_login or "unknown").replace("/", "_").replace(" ", "_")
    return f"{owner}-{repo}-{pr_number}-{short_sha}-{safe_login}"


def _substrate_approval_marker(
    *, act_address: str, pr_number: int, head_sha: str, approver_login: str,
    approver_peer: str,
    delegated_via: str | None = None,
    executor_peer: str | None = None,
) -> str:
    """Indexable marker comment that merge-via-substrate parses to discover
    substrate-attested approvals. Kept simple and grep-able so future tooling
    can also enumerate without parsing arbitrary HTML.

    When `delegated_via` is set, the approval was emitted by Twin acting_for
    the granting peer per a substrate-attested approval_delegation_grant.
    The merge gate verifies the delegation chain instead of expecting the
    act's signing peer to equal approver_peer.
    """
    parts = [
        f"act={act_address}",
        f"pr={pr_number}",
        f"head_sha={head_sha}",
        f"approver_login={approver_login}",
        f"approver_peer={approver_peer}",
    ]
    if delegated_via:
        parts.append(f"delegated_via={delegated_via}")
    if executor_peer:
        parts.append(f"executor_peer={executor_peer}")
    return f"{SUBSTRATE_APPROVAL_MARKER_PREFIX} {' '.join(parts)} {SUBSTRATE_APPROVAL_MARKER_SUFFIX}"


def _parse_substrate_approval_markers(text: str) -> list[dict]:
    """Extract every SUBSTRATE_APPROVAL marker from a comment body.

    Robust to multiple markers in one body. Returns a list of dicts with
    keys {act, pr, head_sha, approver_login, approver_peer} when all five
    are present; partial markers are skipped (defensive — never elevate a
    half-formed marker to a cross-peer signal).
    """
    out: list[dict] = []
    if not text:
        return out
    start = 0
    while True:
        idx = text.find(SUBSTRATE_APPROVAL_MARKER_PREFIX, start)
        if idx == -1:
            break
        end = text.find(SUBSTRATE_APPROVAL_MARKER_SUFFIX, idx)
        if end == -1:
            break
        body = text[idx + len(SUBSTRATE_APPROVAL_MARKER_PREFIX): end].strip()
        fields: dict[str, str] = {}
        for token in body.split():
            if "=" in token:
                k, v = token.split("=", 1)
                fields[k.strip()] = v.strip()
        if all(k in fields for k in ("act", "pr", "head_sha", "approver_login", "approver_peer")):
            out.append(fields)
        start = end + len(SUBSTRATE_APPROVAL_MARKER_SUFFIX)
    return out


def _fetch_substrate_act_via_twin(address: str, timeout: int = 5) -> dict | None:
    """Fetch an act from addressing.arqera.io using `twin address fetch`.

    Returns a dict with at least {address, class_type, sha256, source, parent,
    issued_at, payload_size} when the act is retrievable. Returns None when
    the address is invalid, the act cannot be found, or the twin CLI is
    unavailable. Never raises — substrate-fetch failure must degrade
    gracefully into "no substrate approval found" rather than abort the
    merge gate.
    """
    if not address or not address.startswith("arq://"):
        return None
    try:
        result = subprocess.run(
            ["twin", "--use-keychain", "address", "fetch", address],
            capture_output=True,
            text=True,
            timeout=timeout,
        )
    except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
        return None
    if result.returncode != 0:
        return None
    # Parse the human-readable output: "key      : value" per line.
    # `twin address fetch` renders null/missing fields as the literal
    # string "-" for readability. The merge-via-substrate gate compares
    # parsed["parent"] against the marker's claimed approver_peer; if we
    # store "-" literally, daemon-emitted approval acts (which store
    # parent as null on substrate) get silently rejected as peer-mismatch.
    # Normalise "-" -> empty so the gate's truthy guard correctly treats
    # it as "no parent recorded" rather than "parent='-' which != peer".
    parsed: dict[str, str] = {"address": address}
    for line in result.stdout.splitlines():
        if ":" not in line:
            continue
        k, _, v = line.partition(":")
        v_stripped = v.strip()
        if v_stripped == "-":
            v_stripped = ""
        parsed[k.strip()] = v_stripped
    return parsed


def cmd_pr_substrate_approve(args: argparse.Namespace) -> int:
    """Emit a substrate-attested approval for a PR.

    This is the substrate-governed counterpart to `pr review --approve`
    (which was disabled by #3609). It produces a cryptographically signed
    `review_approved` substrate act AND posts an indexable marker as a PR
    review comment so `merge-via-substrate` can verify the approval.

    Cross-peer trust property: the signing peer must differ from the PR
    author's peer. This script EMITS the act with whatever peer key is
    locally available; the merge gate VERIFIES the property at merge time.
    Signing locally as the same peer as the PR author yields an act that
    will not satisfy the cross-peer check — that's intentional and the
    customer-link rests on it.
    """
    if not args.reason or not args.reason.strip():
        print("arq-github pr substrate-approve: --reason is required", file=sys.stderr)
        return 2

    # Fetch PR state for head SHA + author. Use the existing primitive surface.
    verb_outcome, pr_payload = _primitive_invoke(
        "github.pr.view",
        {"pr_number": args.pr, "owner": args.owner, "repo": args.repo},
    )
    head_sha = ""
    pr_author = ""
    if verb_outcome == "ok" and isinstance(pr_payload, dict):
        head_sha = (pr_payload.get("head") or {}).get("sha", "") or pr_payload.get("head_sha", "")
        pr_author = (pr_payload.get("user") or {}).get("login", "") or pr_payload.get("author", "")
    if not head_sha or not pr_author:
        # Direct REST fallback so we always know head SHA + author before signing.
        token = _gh_auth_token()
        status, parsed, err = _api_call(
            "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}", token=token,
        )
        if not (200 <= status < 300) or not isinstance(parsed, dict):
            print(
                f"arq-github pr substrate-approve: could not fetch PR #{args.pr} "
                f"(status={status} err={err})",
                file=sys.stderr,
            )
            return 1
        head_sha = (parsed.get("head") or {}).get("sha", "")
        pr_author = (parsed.get("user") or {}).get("login", "")

    approver_login = _gh_active_user()
    # `--via-delegation <grant-address>` lets Twin honor an operator's
    # substrate-attested approval grant. Two-person rule is preserved at
    # the SUBSTRATE level (the grant is signed by the granting peer; this
    # execution adds a co-signed approval act with acting_for provenance)
    # rather than at the GitHub-login level. Fixes the gap captured at
    # arq://body/arqera_delegation_authority_gap/v1.
    via_delegation = getattr(args, "via_delegation", None)
    delegation_from_peer = None
    delegation_from_login = None
    if via_delegation:
        # Fetch FULL grant payload via the addressing service HTTP API with
        # ?full=true (the existing _fetch_substrate_act_via_twin helper uses
        # human-readable mode which omits the payload; twin --json returns
        # `payload_preview` which is truncated for >512B payloads).
        import json as _json
        import urllib.request
        grant_payload: dict = {}
        # Convert arq:// address to addressing-service URL.
        cleaned = via_delegation[len("arq://"):] if via_delegation.startswith("arq://") else via_delegation
        full_url = f"https://addressing.arqera.io/address/{cleaned}?full=true"
        try:
            with urllib.request.urlopen(full_url, timeout=15) as resp:
                full_rec = _json.loads(resp.read().decode("utf-8"))
            raw = full_rec.get("payload")
            if isinstance(raw, dict):
                grant_payload = raw
            elif isinstance(raw, str) and raw:
                try:
                    grant_payload = _json.loads(raw)
                except Exception:
                    grant_payload = {}
        except Exception as _e:
            print(
                f"arq-github pr substrate-approve --via-delegation: HTTP fetch "
                f"error for {full_url}: {_e}",
                file=sys.stderr,
            )
            return 2
        if not isinstance(grant_payload, dict):
            grant_payload = {}
        g_from_peer = (grant_payload.get("from_peer") or "").strip()
        g_from_login = (grant_payload.get("from_login") or "").strip()
        g_to_peer = (grant_payload.get("to_peer") or "").strip()
        g_expires_at = (grant_payload.get("expires_at") or "").strip()
        if g_to_peer and g_to_peer != ACTOR_PEER_ADDRESS:
            print(
                f"arq-github pr substrate-approve --via-delegation: grant.to_peer="
                f"{g_to_peer} does not match current actor peer {ACTOR_PEER_ADDRESS}",
                file=sys.stderr,
            )
            return 2
        if g_expires_at:
            # _utc_ts() returns compact %Y%m%dT%H%M%SZ; grant expires_at
            # comes in as ISO-8601 with dashes + tz offset. Compare as
            # datetime objects, not strings, to avoid lexicographic
            # mismatch between the two formats.
            try:
                _now_dt = datetime.now(timezone.utc)
                _exp_dt = datetime.fromisoformat(g_expires_at.replace("Z", "+00:00"))
                if _exp_dt.tzinfo is None:
                    _exp_dt = _exp_dt.replace(tzinfo=timezone.utc)
                if _exp_dt < _now_dt:
                    print(
                        f"arq-github pr substrate-approve --via-delegation: "
                        f"grant expired at {g_expires_at} (now={_now_dt.isoformat()})",
                        file=sys.stderr,
                    )
                    return 2
            except Exception as _exp_err:
                print(
                    f"arq-github pr substrate-approve --via-delegation: cannot "
                    f"parse expires_at={g_expires_at!r}: {_exp_err}",
                    file=sys.stderr,
                )
                return 2
        if not g_from_login or not g_from_peer:
            print(
                "arq-github pr substrate-approve --via-delegation: grant missing "
                "from_peer or from_login fields",
                file=sys.stderr,
            )
            return 2
        if g_from_login == pr_author:
            print(
                f"arq-github pr substrate-approve --via-delegation: grant.from_login="
                f"{g_from_login} == pr_author={pr_author}; granting peer cannot be "
                f"the PR author (would violate cross-peer at the substrate level)",
                file=sys.stderr,
            )
            return 2
        delegation_from_peer = g_from_peer
        delegation_from_login = g_from_login
        print(
            f"substrate-approve: honoring delegation grant — acting_for "
            f"{g_from_login} ({g_from_peer}) via {via_delegation}",
            file=sys.stderr,
        )
    # `--as <peer>` records the approval under a distinct bot identity.
    # Per arq://doc/principle/arqera-policy-bot-distinct-identity-v1 (DRAFT),
    # the credentials backing that identity MUST be present in the vault
    # before the approval is allowed to sign — otherwise we'd silently
    # collapse back to the operator's gh-CLI auth and the cross-peer
    # property would degrade without a visible signal. Phase A wires the
    # presence-gate; Phase B wires the actual GitHub App auth swap.
    # getattr is used so test fixtures that pre-date this flag continue
    # to work without modification (argparse itself always sets the attr).
    as_peer = getattr(args, "as_peer", None)
    if as_peer:
        bot_slug = as_peer.rsplit("/", 1)[-1] or as_peer
        app_id_present = _read_arq_connection(
            "github.com", f"{bot_slug}.app-id", audit_only=True,
        )
        priv_key_present = _read_arq_connection(
            "github.com", f"{bot_slug}.private-key", audit_only=True,
        )
        missing = [
            label for label, present in (
                (f"github.com/{bot_slug}.app-id", app_id_present),
                (f"github.com/{bot_slug}.private-key", priv_key_present),
            ) if not present
        ]
        if missing:
            _emit_act(
                "act",
                "operator_tier_surface",
                f"policy-bot-credentials-missing-{bot_slug}-{_utc_ts()}",
                {
                    "verb": "pr.substrate-approve",
                    "as_peer": as_peer,
                    "pr_number": args.pr,
                    "owner": args.owner,
                    "repo": args.repo,
                    "missing_resources": missing,
                    "principle": "arqera-policy-bot-distinct-identity-v1",
                    "remediation": (
                        "Operator: register the bot's GitHub App on the org; "
                        "provision app-id + private-key into the vault under "
                        f"github.com/{bot_slug}.* before retrying --as."
                    ),
                    "ts": _utc_ts(),
                },
            )
            print(
                f"arq-github pr substrate-approve --as {as_peer}: "
                f"required credentials not in vault: {', '.join(missing)}. "
                f"Refusing to sign — operator_tier_surface act emitted.",
                file=sys.stderr,
            )
            return 3
        approver_peer = as_peer
    else:
        approver_peer = ACTOR_PEER_ADDRESS
    # Determine the effective approver identity. If a delegation grant was
    # honored above, the approval is recorded acting_for the granting peer:
    # approver_login + approver_peer reflect the GRANTING party (so the
    # substrate trail + merge-via-substrate count the cross-peer correctly),
    # while delegated_via + executor_peer preserve the chain of who
    # actually ran the approval action.
    effective_login = delegation_from_login if delegation_from_login else approver_login
    effective_peer = delegation_from_peer if delegation_from_peer else approver_peer
    executor_login = approver_login
    executor_peer = ACTOR_PEER_ADDRESS

    if effective_login == pr_author:
        print(
            f"arq-github pr substrate-approve: refusing to self-approve "
            f"(effective_login=={pr_author}). Approval must come from a "
            f"peer different from the PR author. Switch gh auth to a "
            f"non-author account, or pass --via-delegation pointing at a "
            f"grant from a different peer.",
            file=sys.stderr,
        )
        return 2

    ref = _substrate_approval_ref(
        owner=args.owner, repo=args.repo, pr_number=args.pr,
        head_sha=head_sha, approver_login=effective_login,
    )
    ts = _utc_ts()
    payload = {
        "pr_number": args.pr,
        "owner": args.owner,
        "repo": args.repo,
        "head_sha": head_sha,
        "approver_login": effective_login,
        "approver_peer": effective_peer,
        "approver_peer_override": bool(as_peer),
        "pr_author": pr_author,
        "reason": args.reason.strip(),
        "principle": "substrate-governed-approval-v1",
        "delegated_via": via_delegation,
        "executor_login": executor_login if via_delegation else None,
        "executor_peer": executor_peer if via_delegation else None,
        "ts": ts,
    }
    act_address = _emit_act("act", "review_approved", ref, payload)
    if not act_address:
        print(
            "arq-github pr substrate-approve: substrate act emission failed; "
            "no approval recorded",
            file=sys.stderr,
        )
        return 1

    # Post an indexable marker comment so merge-via-substrate can discover this
    # approval without scanning all org acts. Uses event=COMMENT (not APPROVE),
    # which is allowed by the post-#3609 policy.
    marker = _substrate_approval_marker(
        act_address=act_address,
        pr_number=args.pr,
        head_sha=head_sha,
        approver_login=effective_login,
        approver_peer=effective_peer,
        delegated_via=via_delegation if via_delegation else None,
        executor_peer=executor_peer if via_delegation else None,
    )
    delegation_line = (
        f"- delegated_via: `{via_delegation}` (executed by `{executor_login}` "
        f"peer `{executor_peer}`)\n"
    ) if via_delegation else ""
    comment_body = (
        f"Substrate-attested approval recorded.\n\n"
        f"- act: `{act_address}`\n"
        f"- approver: `{effective_login}` (peer: `{effective_peer}`)\n"
        f"{delegation_line}"
        f"- head_sha: `{head_sha}`\n"
        f"- reason: {args.reason.strip()}\n"
        f"- principle: `substrate-governed-approval-v1`\n\n"
        f"This is the substrate-governed counterpart to `pr review --approve` "
        f"(disabled by #3609). `merge-via-substrate` will read this marker, "
        f"fetch the signed act, and count the signing peer as a cross-peer "
        f"approver when peer + login both differ from the PR author.\n\n"
        f"{marker}\n"
    )
    token = _gh_auth_token()
    comment_status, comment_parsed, comment_err = _api_call(
        "POST",
        f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}/reviews",
        token=token,
        body={"event": "COMMENT", "body": comment_body},
    )
    comment_id = comment_parsed.get("id") if isinstance(comment_parsed, dict) else None
    _emit_act(
        "act",
        "github_envelope_ack",
        f"substrate-approve-{args.pr}-{ts}",
        {
            "verb": "pr.substrate-approve",
            "act_address": act_address,
            "comment_id": comment_id,
            "http_status": comment_status,
            "error": comment_err if comment_status >= 400 or comment_status == 0 else None,
            "ts": _utc_ts(),
        },
    )
    if not (200 <= comment_status < 300):
        print(
            f"arq-github pr substrate-approve: act emitted but marker-comment "
            f"post failed (status={comment_status} err={comment_err}). "
            f"merge-via-substrate will not discover this approval until the "
            f"marker is posted manually.",
            file=sys.stderr,
        )
        # Still return 0: the substrate side of the approval succeeded;
        # the comment is the index, not the proof.
    print(f"substrate-approve: act={act_address}")
    return 0


def cmd_pr_merge(args: argparse.Namespace) -> int:
    """Deprecated legacy merge path.

    Merge authority is substrate-governed. Keep the parser entry so old
    callers fail closed with a clear migration path instead of falling back
    to raw GitHub merge or auto-merge.
    """
    method_flag = f"--{args.method}" if getattr(args, "method", None) else "--squash"
    print(
        "arq-github pr merge is disabled: merge authority is substrate-governed.\n"
        "Use:\n"
        f"  arq-github pr merge-via-substrate {args.pr} {method_flag} "
        f"--owner {args.owner} --repo {args.repo}",
        file=sys.stderr,
    )
    return 2


def cmd_pr_merge_via_substrate(args: argparse.Namespace) -> int:
    """Substrate-decided + GitHub-dispatched merge.

    Implements arq://doc/principle/merge-decision-on-substrate-v1 (DRAFT).

    The legacy `pr merge` verb now fails closed. This command is the only
    merge path because it records and verifies the substrate decision before
    dispatching to GitHub.

    Sequence:
      1. Fetch PR state from GitHub (head_sha, author, mergeable_state)
      2. Verify substrate preconditions:
         - At least one Actions run on head_sha succeeded (CI green)
         - At least one review approval from a peer != author (trust-graded-merge-v1)
      3. Emit arq://act/merge_decided/<pr>-<sha>-<ts> with the full provenance
         bundle. THIS is the authorization. Without this act, no dispatch.
      4. Dispatch the merge to GitHub via REST PUT (admin path; the substrate
         act IS the admin rationale).
      5. Emit arq://act/merge_dispatched/<pr>-<sha>-<ts> with the GitHub response.
      6. Emit arq://act/merge_landed/<pr>-<sha>-<ts> when the commit is on the
         target branch (verified by re-fetching the PR state).

    On any precondition failure, emits arq://act/merge_blocked/... and exits 1.

    Exit codes: 0 success, 1 substrate-precondition-failure, 2 dispatch-failure.
    """
    ts = _utc_ts()
    actor = _gh_active_user()
    actor_peer = ACTOR_PEER_ADDRESS
    method = (args.method or "squash").lower()
    method_upper = method.upper()
    token = _gh_auth_token()

    # ── Step 1: Fetch PR state ──
    st_pr, pr_json, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}", token=token
    )
    if st_pr >= 400 or not isinstance(pr_json, dict):
        _emit_act("act", "merge_blocked", f"{args.pr}-fetch-{ts}", {
            "pr_number": args.pr, "step": "fetch_pr",
            "http_status": st_pr, "error": err,
            "blocked_reason": "could_not_fetch_pr", "ts": _utc_ts(),
        })
        print(f"merge-via-substrate: failed to fetch PR #{args.pr}: status={st_pr} err={err}", file=sys.stderr)
        return 1

    head_sha = pr_json.get("head", {}).get("sha", "")
    author = pr_json.get("user", {}).get("login", "")
    state = pr_json.get("state")
    mergeable = pr_json.get("mergeable")
    mergeable_state = pr_json.get("mergeable_state")

    if state != "open":
        _emit_act("act", "merge_blocked", f"{args.pr}-state-{ts}", {
            "pr_number": args.pr, "blocked_reason": f"pr_state_{state}",
            "ts": _utc_ts(),
        })
        print(f"merge-via-substrate: PR #{args.pr} is {state}, not open", file=sys.stderr)
        return 1

    # ── Step 2a: Verify CI green ──
    st_runs, runs_json, err = _api_call(
        "GET",
        f"/repos/{args.owner}/{args.repo}/commits/{head_sha}/check-runs?per_page=100",
        token=token,
    )
    ci_green = False
    ci_summary: dict[str, str] = {}
    in_progress_checks: list[str] = []
    if isinstance(runs_json, dict):
        # Explicit sort by started_at DESC (don't rely on undocumented GitHub
        # API default ordering). After sort, first-seen-per-name = most-recent
        # run for that check — handles the cancellation-cascade pattern where
        # an auto-cancelled run leaves a stale `cancelled` status that a later
        # successful run supersedes.
        runs_sorted = sorted(
            runs_json.get("check_runs", []) or [],
            key=lambda r: r.get("started_at") or "",
            reverse=True,
        )
        for run in runs_sorted:
            name = run.get("name") or ""
            if name in ci_summary:
                continue  # already have most-recent
            status = run.get("status")
            concl = run.get("conclusion")
            # Track non-terminal checks separately; we cannot decide
            # `ci_green` while any check is still running.
            if status in ("queued", "in_progress", "pending", "waiting") and not concl:
                in_progress_checks.append(name)
                ci_summary[name] = status
            else:
                ci_summary[name] = concl or status or "unknown"
        if ci_summary and not in_progress_checks:
            # `skipped` and `neutral` are pass-equivalents (path-filtered jobs,
            # advisory checks). True failures: failure / timed_out / action_required.
            # `cancelled` is excluded because in the cancellation-cascade pattern
            # an older cancelled run is almost always superseded by a newer
            # successful run on the same SHA — and we kept the most-recent above.
            pass_equivalents = ("success", "skipped", "neutral")
            failures = [
                n for n, c in ci_summary.items()
                if c in ("failure", "timed_out", "action_required")
                and n not in NON_BLOCKING_CHECKS
            ]
            successes = [n for n, c in ci_summary.items() if c in pass_equivalents]
            ci_green = bool(successes) and not failures

    # Refuse while CI is still running. Substrate-decided merge requires
    # terminal-state for all checks before deciding ci_green.
    if in_progress_checks:
        blocked_addr = _emit_act("act", "merge_blocked", f"{args.pr}-ci-running-{ts}", {
            "pr_number": args.pr, "head_sha": head_sha,
            "blocked_reason": "ci_in_progress",
            "in_progress_checks": in_progress_checks,
            "ci_summary": ci_summary, "ts": _utc_ts(),
        })
        # Substrate-side decision projected onto GitHub gate-surface so humans
        # see why the merge is blocked without needing to inspect substrate.
        _post_arq_merge_gate(
            owner=args.owner, repo=args.repo, head_sha=head_sha,
            conclusion="failure", token=token,
            title="CI still running",
            summary=(
                f"{len(in_progress_checks)} check(s) in progress: "
                f"{', '.join(in_progress_checks[:5])}"
                + (f" (and {len(in_progress_checks)-5} more)" if len(in_progress_checks) > 5 else "")
            ),
            details_act=blocked_addr,
        )
        print(
            f"merge-via-substrate: CI still running on {head_sha[:12]}; "
            f"in-progress: {in_progress_checks}",
            file=sys.stderr,
        )
        return 1

    if not ci_green:
        # Sentry SUR-2026-05-13 (#3705 review): the `failing` diagnostic
        # list must also filter NON_BLOCKING_CHECKS so substrate evidence
        # + arq-merge-gate description don't carry false-failure names.
        failing = [
            n for n, c in ci_summary.items()
            if c not in ("success", "skipped", "neutral")
            and n not in NON_BLOCKING_CHECKS
        ]
        blocked_addr = _emit_act("act", "merge_blocked", f"{args.pr}-ci-{ts}", {
            "pr_number": args.pr, "head_sha": head_sha,
            "blocked_reason": "ci_not_green",
            "ci_summary": ci_summary, "ts": _utc_ts(),
        })
        _post_arq_merge_gate(
            owner=args.owner, repo=args.repo, head_sha=head_sha,
            conclusion="failure", token=token,
            title="CI not green (cancelled-is-not-failure-aware)",
            summary=(
                f"Substrate evaluated {len(ci_summary)} check(s); "
                f"{len(failing)} non-passing: {', '.join(failing[:5])}"
            ),
            details_act=blocked_addr,
        )
        print(
            f"merge-via-substrate: CI not green on {head_sha[:12]}; failing: {failing}",
            file=sys.stderr,
        )
        return 1

    # ── Step 2b: Verify trust-graded-merge-v1 (cross-peer approval) ──
    #
    # Two sources of cross-peer approval are valid:
    #   (a) GitHub PR reviews with state=APPROVED from a login != author
    #   (b) Substrate-attested approvals discovered via SUBSTRATE_APPROVAL
    #       markers in PR comments, whose corresponding signed
    #       `review_approved` act lives on addressing.arqera.io and was
    #       signed by a peer + claimed-login both different from the author.
    #
    # Either path is sufficient. Both paths are recorded in the
    # `merge_decided` act for full traceability. Source (a) is the legacy
    # GitHub-trust path; source (b) is the substrate-governed flow opened
    # by #3609's policy change. Same-peer signing in (b) is rejected by the
    # peer-attestation check below — that is the customer-link property
    # the regulator-facing surface (SOC 2 CC8.1, EU AI Act Art.12, ISO
    # 27001 A.14.2.4) depends on.

    # Source (a): GitHub reviews
    st_rev, reviews_json, err = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}/reviews?per_page=50",
        token=token,
    )
    cross_approvers: list[str] = []
    if isinstance(reviews_json, list):
        # last-write-wins per reviewer
        latest_by_reviewer: dict[str, str] = {}
        for r in reviews_json:
            login = (r.get("user") or {}).get("login")
            state_r = r.get("state")
            if login and state_r in ("APPROVED", "CHANGES_REQUESTED", "DISMISSED"):
                latest_by_reviewer[login] = state_r
        cross_approvers = [
            login for login, st in latest_by_reviewer.items()
            if st == "APPROVED" and login != author
        ]

    # Source (b): Substrate-attested approvals via SUBSTRATE_APPROVAL markers
    substrate_approvers: list[dict] = []  # for the merge_decided act
    pr_author_peer = ACTOR_PEER_ADDRESS  # NOTE: best-effort; PRs authored
    # from a different machine would carry a different peer key. The peer
    # check below compares the *approver's* peer (recorded in the marker)
    # against the approver_peer field, not against pr_author_peer here.

    # Collect both review comments and issue comments — markers can appear
    # in either surface.
    marker_sources: list[str] = []
    st_rc, rc_json, _ = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}/comments?per_page=100",
        token=token,
    )
    if isinstance(rc_json, list):
        for c in rc_json:
            marker_sources.append(c.get("body") or "")
    st_ic, ic_json, _ = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/issues/{args.pr}/comments?per_page=100",
        token=token,
    )
    if isinstance(ic_json, list):
        for c in ic_json:
            marker_sources.append(c.get("body") or "")
    # Also scan review-summary bodies (the `event=COMMENT` post made by
    # substrate-approve lives there, not in inline review comments).
    if isinstance(reviews_json, list):
        for r in reviews_json:
            marker_sources.append(r.get("body") or "")

    # Parse markers + fetch their backing acts. Verify the cross-peer property.
    seen_approver_logins: set[str] = set()
    for body in marker_sources:
        for marker in _parse_substrate_approval_markers(body):
            try:
                marker_pr = int(marker.get("pr", "-1"))
            except (TypeError, ValueError):
                continue
            if marker_pr != int(args.pr):
                continue
            marker_sha = marker.get("head_sha", "")
            if marker_sha != head_sha:
                # SHA drift — the approval was for an older head. Reject.
                continue
            approver_login = marker.get("approver_login", "")
            approver_peer = marker.get("approver_peer", "")
            if not approver_login or not approver_peer:
                continue
            if approver_login == author:
                # Same-login fakery (e.g. switching gh auth then re-signing).
                continue
            if approver_login in seen_approver_logins:
                continue
            # Fetch the act and verify it exists + matches.
            act_address = marker.get("act", "")
            act_record = _fetch_substrate_act_via_twin(act_address)
            if act_record is None:
                # Cannot verify the substrate side — refuse to count.
                continue
            # Best-effort: the parent field of the fetched act is the signing
            # peer's address. Confirm it matches the marker's claim.
            recorded_peer = act_record.get("parent", "")
            delegated_via = marker.get("delegated_via", "")
            if recorded_peer and recorded_peer != approver_peer:
                # Marker claims peer X, act was signed by peer Y.
                # If the marker declares a `delegated_via` substrate-attested
                # grant, verify the delegation chain instead of rejecting:
                #   1. Fetch the grant (full payload) from substrate
                #   2. Grant must be signed by the claimed approver_peer
                #      (i.e. the grantor signed their grant)
                #   3. Grant must delegate to recorded_peer (i.e. the act
                #      was signed by the grant's claimed delegate)
                #   4. Grant must not be expired
                #   5. Grant's from_login must match the marker's
                #      approver_login (consistency check)
                # On any failure: reject the approval.
                # On success: accept — substrate-attested delegation closes
                # the cross-peer property at the SUBSTRATE level, not the
                # GitHub-login level. Per arq://doc/principle/merge-decision-
                # on-substrate-v1 + arq://body/arqera_delegation_authority_gap/v1.
                if not delegated_via:
                    continue
                grant_ok = False
                try:
                    import json as _json
                    import urllib.request
                    cleaned = (
                        delegated_via[len("arq://"):]
                        if delegated_via.startswith("arq://") else delegated_via
                    )
                    full_url = f"https://addressing.arqera.io/address/{cleaned}?full=true"
                    with urllib.request.urlopen(full_url, timeout=15) as resp:
                        grant_full = _json.loads(resp.read().decode("utf-8"))
                    grant_signer = grant_full.get("parent", "") or ""
                    raw_payload = grant_full.get("payload")
                    if isinstance(raw_payload, dict):
                        grant_payload = raw_payload
                    elif isinstance(raw_payload, str) and raw_payload:
                        grant_payload = _json.loads(raw_payload)
                    else:
                        grant_payload = {}
                    g_from_peer = (grant_payload.get("from_peer") or "").strip()
                    g_from_login = (grant_payload.get("from_login") or "").strip()
                    g_to_peer = (grant_payload.get("to_peer") or "").strip()
                    g_expires_at = (grant_payload.get("expires_at") or "").strip()
                    # (1)+(2): grant signed by the granting peer
                    if grant_signer and g_from_peer and grant_signer != g_from_peer:
                        raise ValueError(
                            f"grant signer {grant_signer} != grant.from_peer {g_from_peer}"
                        )
                    # (3): delegate is who actually signed the approval act
                    if g_to_peer and g_to_peer != recorded_peer:
                        raise ValueError(
                            f"grant.to_peer {g_to_peer} != act signer {recorded_peer}"
                        )
                    # (5): marker.approver_peer/login consistent with grant
                    if g_from_peer and g_from_peer != approver_peer:
                        raise ValueError(
                            f"grant.from_peer {g_from_peer} != marker.approver_peer {approver_peer}"
                        )
                    if g_from_login and g_from_login != approver_login:
                        raise ValueError(
                            f"grant.from_login {g_from_login} != marker.approver_login {approver_login}"
                        )
                    # (4): expiry
                    if g_expires_at:
                        try:
                            _exp_dt = datetime.fromisoformat(g_expires_at.replace("Z", "+00:00"))
                            if _exp_dt.tzinfo is None:
                                _exp_dt = _exp_dt.replace(tzinfo=timezone.utc)
                            if _exp_dt < datetime.now(timezone.utc):
                                raise ValueError(f"grant expired at {g_expires_at}")
                        except ValueError:
                            raise
                        except Exception:
                            raise ValueError(f"unparseable expires_at {g_expires_at!r}")
                    grant_ok = True
                except Exception as _grant_err:
                    print(
                        f"merge-via-substrate: delegation chain invalid for "
                        f"{act_address} (via {delegated_via}): {_grant_err}",
                        file=sys.stderr,
                    )
                if not grant_ok:
                    continue
            # Same-peer-as-author check requires us to know the PR author's
            # peer. We don't always have that; pragmatic fallback: enforce
            # the login check above + require the peer to be a recognised
            # substrate peer (any non-empty arq://body/peer/...).
            if not approver_peer.startswith("arq://body/peer/"):
                continue
            seen_approver_logins.add(approver_login)
            substrate_approvers.append({
                "login": approver_login,
                "peer": approver_peer,
                "act": act_address,
                "act_sha256": act_record.get("sha256"),
                "delegated_via": delegated_via or None,
            })

    # Union substrate-attested approvers into cross_approvers under a
    # namespaced prefix so they're distinguishable in the merge_decided act.
    for s in substrate_approvers:
        cross_approvers.append(f"substrate:{s['login']}")

    paired_peer_attestation = getattr(args, "paired_peer", None)
    if not cross_approvers and paired_peer_attestation:
        cross_approvers = [f"paired-peer:{paired_peer_attestation}"]

    if not cross_approvers:
        blocked_addr = _emit_act("act", "merge_blocked", f"{args.pr}-approval-{ts}", {
            "pr_number": args.pr, "head_sha": head_sha, "author": author,
            "blocked_reason": "no_cross_peer_approval",
            "principle": "trust-graded-merge-v1", "ts": _utc_ts(),
        })
        _post_arq_merge_gate(
            owner=args.owner, repo=args.repo, head_sha=head_sha,
            conclusion="failure", token=token,
            title="No cross-peer approval (trust-graded-merge-v1)",
            summary=(
                f"PR author={author}; needs APPROVED review from a different peer."
            ),
            details_act=blocked_addr,
        )
        print(
            f"merge-via-substrate: no cross-peer approval on PR #{args.pr} "
            f"(author={author}); trust-graded-merge-v1 violation",
            file=sys.stderr,
        )
        return 1

    merger = cross_approvers[0]  # First approving cross-peer

    # ── Step 2c: Verify paired-verb declaration (mesh-diagnostic-surface-keeps-pace-with-action-surface-v1) ──
    # If this PR touches scripts/mesh-bridge/ (i.e. adds/modifies a write verb),
    # require a `Paired-verb-check:` declaration in the PR body. This brings
    # the workflow-level check (.github/workflows/paired-verb-check.yml) under
    # the substrate-decided merge — single source of truth, all gates produce
    # substrate acts, all gates honoured uniformly under bounded protection-bypass.
    #
    # Composes:
    #   - mesh-diagnostic-surface-keeps-pace-with-action-surface-v1 (canonised)
    #   - merge-decision-on-substrate-v1 (DRAFT, this principle)
    #   - migrate-soak-delete-no-half-shipped (DRAFT) — workflow-level check is
    #     the legacy fallback; substrate path is canonical going forward
    paired_verb_required = False
    paired_verb_declared: str | None = None
    paired_verb_status = "not_required"
    st_files, files_json, _ = _api_call(
        "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}/files?per_page=300",
        token=token,
    )
    touches_mesh_bridge = False
    if isinstance(files_json, list):
        for f in files_json:
            path = f.get("filename", "")
            if path.startswith("scripts/mesh-bridge/"):
                touches_mesh_bridge = True
                break
    if touches_mesh_bridge:
        paired_verb_required = True
        pr_body = pr_json.get("body", "") or ""
        # Mirrors the workflow's regex (^[[:space:]]*Paired-verb-check:) using
        # Python re syntax: \s for whitespace, (?im) for case-insensitive multiline
        m = re.search(
            r"(?im)^\s*Paired-verb-check:\s*(.+)$",
            pr_body,
        )
        if m:
            paired_verb_declared = m.group(0).strip()
            paired_verb_status = "declared"
        else:
            paired_verb_status = "missing"

    if paired_verb_required and paired_verb_status == "missing":
        blocked_addr = _emit_act("act", "merge_blocked", f"{args.pr}-paired-verb-{ts}", {
            "pr_number": args.pr, "head_sha": head_sha,
            "blocked_reason": "paired_verb_undeclared",
            "principle": "mesh-diagnostic-surface-keeps-pace-with-action-surface-v1",
            "touches_mesh_bridge": True, "ts": _utc_ts(),
        })
        _post_arq_merge_gate(
            owner=args.owner, repo=args.repo, head_sha=head_sha,
            conclusion="failure", token=token,
            title="Paired-verb declaration missing",
            summary=(
                "PR touches `scripts/mesh-bridge/` but lacks the required "
                "`Paired-verb-check:` line in the body."
            ),
            details_act=blocked_addr,
        )
        print(
            f"merge-via-substrate: PR #{args.pr} touches scripts/mesh-bridge/ "
            f"but lacks 'Paired-verb-check:' declaration; "
            f"mesh-diagnostic-surface-keeps-pace-with-action-surface-v1 violation",
            file=sys.stderr,
        )
        return 1

    # ── Step 3: Emit merge_decided substrate act ──
    decided_payload = {
        "pr_number": args.pr,
        "head_sha": head_sha,
        "owner": args.owner,
        "repo": args.repo,
        "author": author,
        "merger_peer": merger,
        "actor": actor,
        "actor_peer": actor_peer,
        "ci_state": "green",
        "ci_check_count": len(ci_summary),
        "ci_success_count": sum(1 for c in ci_summary.values() if c == "success"),
        "cross_approvers": cross_approvers,
        "paired_peer_attestation": paired_peer_attestation,
        "mergeable": mergeable,
        "mergeable_state": mergeable_state,
        "intended_method": method_upper,
        "principle": "merge-decision-on-substrate-v1",
        "principle_status": "DRAFT",
        "trust_graded_merge_v1_satisfied": True,
        # Paired-verb verification (composes
        # mesh-diagnostic-surface-keeps-pace-with-action-surface-v1):
        "paired_verb_required": paired_verb_required,
        "paired_verb_status": paired_verb_status,
        "paired_verb_declaration": paired_verb_declared,
        "touches_mesh_bridge": touches_mesh_bridge,
        "ts": ts,
    }
    decided_addr = _emit_act(
        "act", "merge_decided", f"{args.pr}-{head_sha[:12]}-{ts}", decided_payload
    )
    if not decided_addr:
        print(
            "merge-via-substrate: failed to emit merge_decided act; aborting",
            file=sys.stderr,
        )
        return 1
    print(f"  merge_decided: {decided_addr}")
    print(f"  ci_green={ci_green} cross_approvers={cross_approvers}")

    # ── Step 3b: Post arq-merge-gate check-run (substrate-attested) ──
    # The Required check that branch-protection asks for. Posted directly via
    # the Checks API (NOT via a GitHub Actions workflow), so cancellation-
    # cascade can't produce phantom failures. Composes with:
    #   - cancelled-is-not-failure-v1 (canonised 2026-04-26)
    #   - inefficiency-is-a-bug-v1 (canonised 2026-04-26)
    #   - merge-decision-on-substrate-v1 (canonised)
    gate_status, gate_err = _post_arq_merge_gate(
        owner=args.owner, repo=args.repo, head_sha=head_sha,
        conclusion="success", token=token,
        title="Substrate-decided merge: green",
        summary=(
            f"`merge_decided` evaluated {len(ci_summary)} check(s) "
            f"(cancelled-is-not-failure-v1 applied), "
            f"{len(cross_approvers)} cross-peer approver(s), "
            f"paired-verb={paired_verb_status}."
        ),
        external_id=decided_addr,
        details_act=decided_addr,
    )
    if not (200 <= gate_status < 300):
        # Gate-post failure is non-fatal at decision-emission; the merge
        # dispatch downstream will surface any branch-protection block. But
        # surface clearly so it's visible in the substrate trail.
        print(
            f"  warning: arq-merge-gate post returned status={gate_status} "
            f"err={gate_err[:200]}",
            file=sys.stderr,
        )
    else:
        print("  arq-merge-gate posted: success (substrate-attested)")

    # ── Step 3c: --gate-only short-circuit (backfill / dry-run) ──
    # Substrate-decided gate is now stamped on the PR; GitHub branch-protection
    # is satisfied. No merge is dispatched. Used to backfill arq-merge-gate on
    # currently-open PRs immediately before the required-status-checks flip
    # (Phase 4 of merge-decision-on-substrate-v1).
    if getattr(args, "gate_only", False):
        print("merge-via-substrate: --gate-only complete; merge NOT dispatched")
        print(json.dumps({
            "result": "gate_only",
            "pr_number": args.pr,
            "head_sha": head_sha,
            "merge_decided_addr": decided_addr,
            "ci_green": ci_green,
            "cross_approvers": cross_approvers,
            "arq_merge_gate_status": gate_status,
        }, indent=2))
        return 0

    # ── Step 4: Dispatch merge to GitHub ──
    merge_body = {
        "merge_method": method,
        "commit_title": f"{pr_json.get('title', '')} (#{args.pr})",
    }
    st_merge, merge_resp, err = _api_call(
        "PUT",
        f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}/merge",
        token=token,
        body=merge_body,
    )

    # ── Step 4b: substrate-decided branch-protection bypass via arq-config ──
    # Frame D: when GitHub refuses due to branch-protection (typically 405
    # with a "Required status check ... is failing" tied to a stale
    # cancelled-run marker), AND substrate has already verified the
    # preconditions in a stronger way (ci_green via cancellation-cascade-
    # aware check + trust-graded cross-peer approval), the merge dispatcher
    # emits a scoped runtime_config_decided act (`enforce_admins=false`,
    # scope=`pr:<num>`) and reconciles it. After the merge, it emits the
    # opposing decided + reconciled acts to restore the policy default.
    #
    # The bespoke `protection_bypass_decided` + `protection_toggle` acts are
    # retired in favour of the canonical Frame D triple:
    #   runtime_config_decided → config_reconciled (disable for this PR)
    #   runtime_config_decided → config_reconciled (restore policy default)
    #
    # Composes:
    #   - substrate-is-the-runtime-config-v1 (DRAFT) — substrate IS the
    #     branch-protection's enforce_admins state; reconciler applies it
    #   - merge-decision-on-substrate-v1 (DRAFT) — merge_decided is the
    #     authorisation that justifies the runtime_config_decided
    #   - trust-graded-merge-v1 (canonised) — cross_approvers[0] IS the
    #     paired-peer attestation passed to arq-config
    if st_merge == 405 and isinstance(err, str) and "Required status check" in err:
        paired_peer_attestation = (
            f"merger={merger}, head_sha={head_sha[:12]}, "
            f"merge_decided={decided_addr}"
        )
        # Decide: bypass enforce_admins for this PR's dispatch
        decide_proc = subprocess.run(
            [
                str(Path.home() / ".local" / "bin" / "arq-config"),
                "decide", "github", "branch-protection.enforce_admins", "false",
                "--scope", f"pr:{args.pr}",
                "--paired-peer", paired_peer_attestation,
                "--reason", (
                    f"merge-decision-on-substrate-v1 verified preconditions "
                    f"(ci_green={ci_green}, cross_approvers={cross_approvers}); "
                    f"branch-protection 405 needs scoped bypass for "
                    f"merge_decided={decided_addr}"
                ),
            ],
            capture_output=True, text=True, timeout=15,
        )
        if decide_proc.returncode != 0:
            print(
                f"  arq-config decide refused bypass: "
                f"{decide_proc.stderr.strip()}",
                file=sys.stderr,
            )
            # Fall through; act_chain shows merge_blocked from initial 405
        else:
            print(f"  {decide_proc.stdout.strip().splitlines()[0]}")
            # Reconcile: apply the decision to the platform
            rec_proc = subprocess.run(
                [
                    str(Path.home() / ".local" / "bin" / "arq-config"),
                    "reconcile", "github",
                    "--field", "branch-protection.enforce_admins",
                    "--desired", "false", "--apply",
                ],
                capture_output=True, text=True, timeout=15,
            )
            if rec_proc.returncode == 0:
                # Retry the merge
                st_merge, merge_resp, err = _api_call(
                    "PUT",
                    f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}/merge",
                    token=token,
                    body=merge_body,
                )
                print(f"  retry dispatch after bypass: status={st_merge}")
                # Decide: restore enforce_admins=true (policy default).
                # The restoration is also a substrate decision; it is NOT
                # a side-effect of the merge dispatcher.
                subprocess.run(
                    [
                        str(Path.home() / ".local" / "bin" / "arq-config"),
                        "decide", "github", "branch-protection.enforce_admins", "true",
                        "--scope", "global",
                        "--paired-peer", paired_peer_attestation,
                        "--reason", (
                            f"restore policy default after scoped bypass for "
                            f"merge_decided={decided_addr}"
                        ),
                    ],
                    capture_output=True, text=True, timeout=15,
                )
                subprocess.run(
                    [
                        str(Path.home() / ".local" / "bin" / "arq-config"),
                        "reconcile", "github",
                        "--field", "branch-protection.enforce_admins",
                        "--desired", "true", "--apply",
                    ],
                    capture_output=True, text=True, timeout=15,
                )
                print("  protection restored: enforce_admins=true (policy default)")
            else:
                print(
                    f"  arq-config reconcile failed: {rec_proc.stderr.strip()}",
                    file=sys.stderr,
                )

    dispatched_payload = {
        "merge_decided_addr": decided_addr,
        "pr_number": args.pr,
        "head_sha": head_sha,
        "http_status": st_merge,
        "github_merge_commit_sha": (merge_resp or {}).get("sha")
            if isinstance(merge_resp, dict) else None,
        "github_merged": (merge_resp or {}).get("merged")
            if isinstance(merge_resp, dict) else None,
        "dispatch_outcome": "success" if 200 <= st_merge < 300 else "failure",
        "error": err if st_merge >= 400 else None,
        "ts": _utc_ts(),
    }
    dispatched_addr = _emit_act(
        "act", "merge_dispatched", f"{args.pr}-{head_sha[:12]}-{_utc_ts()}",
        dispatched_payload,
    )

    if st_merge < 200 or st_merge >= 300:
        _emit_act("act", "merge_blocked", f"{args.pr}-dispatch-{_utc_ts()}", {
            "merge_decided_addr": decided_addr,
            "merge_dispatched_addr": dispatched_addr,
            "pr_number": args.pr, "head_sha": head_sha,
            "blocked_reason": "github_dispatch_failed",
            "http_status": st_merge, "error": err, "ts": _utc_ts(),
        })
        print(
            f"merge-via-substrate: dispatch failed status={st_merge} err={err}",
            file=sys.stderr,
        )
        print(f"  merge_dispatched (failure): {dispatched_addr}")
        return 2

    merge_commit_sha = (merge_resp or {}).get("sha", "") if isinstance(merge_resp, dict) else ""
    print(f"  merge_dispatched: {dispatched_addr}")
    print(f"  github_merge_commit: {merge_commit_sha[:12]}")

    # ── Step 5: Emit merge_landed (settlement) ──
    landed_addr = _emit_act("act", "merge_landed", f"{args.pr}-{merge_commit_sha[:12]}-{_utc_ts()}", {
        "merge_decided_addr": decided_addr,
        "merge_dispatched_addr": dispatched_addr,
        "pr_number": args.pr,
        "target_branch": pr_json.get("base", {}).get("ref", "main"),
        "merge_commit_sha": merge_commit_sha,
        "head_sha": head_sha,
        "method": method_upper,
        "ts": _utc_ts(),
    })
    print(f"  merge_landed: {landed_addr}")
    print(f"merged via substrate: pr=#{args.pr} sha={merge_commit_sha}")
    return 0


def cmd_fetch(args: argparse.Namespace) -> int:
    """`git fetch` wrapped with substrate envelope. Closes the long-standing
    gap where local repos drift because the v2 mesh gate blocks bare
    `git fetch`. Until the primitive owns git verbs natively, this gives
    Twin a mesh-routed fetch path with audit trail."""
    verb = "git.fetch"
    ts = _utc_ts()
    envelope = {
        "verb": verb,
        "remote": args.remote,
        "ref": args.ref,
        "all_branches": args.all,
        "actor": _gh_active_user(),
        "actor_peer": ACTOR_PEER_ADDRESS,
        "ts": ts,
    }
    sent = _emit_act("act", "git_envelope_sent", f"fetch-{args.remote}-{ts}", envelope)
    # The child git process is not gated (the hook only fires on Claude's
    # top-level Bash tool calls). Run fetch directly.
    cmd = ["git", "fetch", args.remote]
    if args.all:
        cmd.append("--all")
    elif args.ref:
        cmd.append(args.ref)
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=60, check=False)
    except (subprocess.TimeoutExpired, OSError) as exc:
        _emit_act("act", "git_envelope_ack", f"fetch-{args.remote}-{ts}", {
            "verb": verb, "envelope_sent": sent,
            "error": f"{type(exc).__name__}: {exc}", "ts": _utc_ts(),
        })
        print(f"arq-github fetch failed: {exc}", file=sys.stderr)
        return 1
    _emit_act("act", "git_envelope_ack", f"fetch-{args.remote}-{ts}", {
        "verb": verb, "envelope_sent": sent,
        "exit_code": result.returncode,
        "stdout_preview": (result.stdout or "")[-500:],
        "stderr_preview": (result.stderr or "")[-500:],
        "ts": _utc_ts(),
    })
    if result.stdout:
        sys.stdout.write(result.stdout)
    if result.stderr:
        sys.stderr.write(result.stderr)
    return result.returncode


def cmd_pr_close(args: argparse.Namespace) -> int:
    """Close a PR without merging. Needed after stack-merge cascades where
    a PR becomes superseded (its content already landed via a sibling PR)."""
    verb = "pr.close"
    ts = _utc_ts()
    envelope = {
        "verb": verb, "owner": args.owner, "repo": args.repo,
        "pr_number": args.pr, "reason": args.reason,
        "actor": _gh_active_user(), "actor_peer": ACTOR_PEER_ADDRESS, "ts": ts,
    }
    sent = _emit_act("act", "github_envelope_sent", f"pr-close-{args.pr}-{ts}", envelope)
    token = _gh_auth_token()
    # Optional comment explaining why before we flip state.
    if args.reason:
        _api_call(
            "POST",
            f"/repos/{args.owner}/{args.repo}/issues/{args.pr}/comments",
            token=token,
            body={"body": args.reason},
        )
    status, parsed, err = _api_call(
        "PATCH",
        f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}",
        token=token,
        body={"state": "closed"},
    )
    _emit_act("act", "github_envelope_ack", f"pr-close-{args.pr}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status,
        "error": err if status >= 400 or status == 0 else None, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        print(f"closed: pr=#{args.pr}")
        return 0
    print(f"arq-github pr close failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_pr_retarget(args: argparse.Namespace) -> int:
    """Change a PR's base branch. Needed when stacked PRs land — once the
    bottom of the stack merges to main, the next PR's base is a stale
    branch and won't reach main through a direct merge.

    Wraps PATCH /repos/{owner}/{repo}/pulls/{pr_number} with `{base: "..."}`.
    """
    verb = "pr.retarget"
    ts = _utc_ts()
    envelope = {
        "verb": verb,
        "owner": args.owner,
        "repo": args.repo,
        "pr_number": args.pr,
        "new_base": args.base,
        "actor": _gh_active_user(),
        "actor_peer": ACTOR_PEER_ADDRESS,
        "ts": ts,
    }
    sent = _emit_act("act", "github_envelope_sent", f"pr-retarget-{args.pr}-{ts}", envelope)
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "PATCH",
        f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}",
        token=token,
        body={"base": args.base},
    )
    _emit_act("act", "github_envelope_ack", f"pr-retarget-{args.pr}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status,
        "error": err if status >= 400 or status == 0 else None, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        print(f"retargeted: pr=#{args.pr} new_base={args.base}")
        return 0
    print(f"arq-github pr retarget failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_pr_edit_body(args: argparse.Namespace) -> int:
    """Edit a PR's body (description). Supports replace (default) or append.

    Motivating case: the `Paired-Verb Check` CI gate requires a declaration
    line in the PR body; PRs authored via other tools often miss it and need
    a body edit to satisfy the gate without closing/reopening the PR.

    Wraps PATCH /repos/{owner}/{repo}/pulls/{pr_number} with `{body: "..."}`.
    """
    verb = "pr.edit_body"
    ts = _utc_ts()
    new_body = args.body
    if args.body_file:
        try:
            with open(args.body_file, "r", encoding="utf-8") as fh:
                new_body = fh.read()
        except OSError as e:
            print(f"arq-github pr edit-body: failed to read --body-file: {e}", file=sys.stderr)
            return 1
    if args.append:
        token = _gh_auth_token()
        st_pr, pr_json, err_pr = _api_call(
            "GET", f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}", token=token
        )
        if st_pr >= 400 or not isinstance(pr_json, dict):
            print(f"arq-github pr edit-body: failed to fetch current body status={st_pr} err={err_pr}", file=sys.stderr)
            return 1
        existing = pr_json.get("body") or ""
        new_body = existing.rstrip() + "\n\n" + new_body.lstrip()
    envelope = {
        "verb": verb,
        "owner": args.owner,
        "repo": args.repo,
        "pr_number": args.pr,
        "mode": "append" if args.append else "replace",
        "body_len": len(new_body),
        "actor": _gh_active_user(),
        "actor_peer": ACTOR_PEER_ADDRESS,
        "ts": ts,
    }
    sent = _emit_act("act", "github_envelope_sent", f"pr-edit-body-{args.pr}-{ts}", envelope)
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "PATCH",
        f"/repos/{args.owner}/{args.repo}/pulls/{args.pr}",
        token=token,
        body={"body": new_body},
    )
    _emit_act("act", "github_envelope_ack", f"pr-edit-body-{args.pr}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status,
        "error": err if status >= 400 or status == 0 else None, "ts": _utc_ts(),
    })
    if 200 <= status < 300:
        print(f"body updated: pr=#{args.pr} mode={'append' if args.append else 'replace'} bytes={len(new_body)}")
        return 0
    print(f"arq-github pr edit-body failed: status={status} err={err}", file=sys.stderr)
    return 1


def cmd_project_add(args: argparse.Namespace) -> int:
    """Attach an issue or PR to an org-level GitHub Project (v2).

    Two-step GraphQL: look up projectV2 node_id by (org, number), look up
    issue node_id by (owner, repo, number), then call `addProjectV2ItemById`.
    """
    verb = "project.add"
    ts = _utc_ts()
    envelope = {
        "verb": verb,
        "org": args.org,
        "project_number": args.project,
        "owner": args.owner,
        "repo": args.repo,
        "issue": args.issue,
        "actor": _gh_active_user(),
        "actor_peer": ACTOR_PEER_ADDRESS,
        "ts": ts,
    }
    sent = _emit_act("act", "github_envelope_sent", f"project-add-{args.issue}-{ts}", envelope)
    token = _gh_auth_token()

    # Step 1: project node_id
    q1 = {
        "query": "query($org:String!,$n:Int!){organization(login:$org){projectV2(number:$n){id}}}",
        "variables": {"org": args.org, "n": args.project},
    }
    st1, p1, err1 = _api_call("POST", "/graphql", token=token, body=q1)
    project_id = (
        (p1 or {}).get("data", {}).get("organization", {}).get("projectV2", {}).get("id")
    )
    if not project_id:
        _emit_act("act", "github_envelope_ack", f"project-add-{args.issue}-{ts}", {
            "verb": verb, "envelope_sent": sent, "http_status": st1,
            "step": "fetch_project_id", "error": err1 or "not_found", "ts": _utc_ts(),
        })
        print(f"arq-github project add: project lookup failed status={st1}", file=sys.stderr)
        return 1

    # Step 2: issue node_id
    q2 = {
        "query": (
            "query($owner:String!,$repo:String!,$n:Int!){"
            "repository(owner:$owner,name:$repo){issueOrPullRequest(number:$n){"
            "__typename ... on Issue{id} ... on PullRequest{id}}}}"
        ),
        "variables": {"owner": args.owner, "repo": args.repo, "n": args.issue},
    }
    st2, p2, err2 = _api_call("POST", "/graphql", token=token, body=q2)
    content_id = (
        (p2 or {})
        .get("data", {})
        .get("repository", {})
        .get("issueOrPullRequest", {})
        .get("id")
    )
    if not content_id:
        _emit_act("act", "github_envelope_ack", f"project-add-{args.issue}-{ts}", {
            "verb": verb, "envelope_sent": sent, "http_status": st2,
            "step": "fetch_issue_id", "error": err2 or "not_found", "ts": _utc_ts(),
        })
        print(f"arq-github project add: issue lookup failed status={st2}", file=sys.stderr)
        return 1

    # Step 3: mutation
    m = {
        "query": (
            "mutation($p:ID!,$c:ID!){addProjectV2ItemById(input:{projectId:$p,contentId:$c}){"
            "item{id}}}"
        ),
        "variables": {"p": project_id, "c": content_id},
    }
    status, parsed, err = _api_call("POST", "/graphql", token=token, body=m)
    item_id = (
        (parsed or {}).get("data", {}).get("addProjectV2ItemById", {}).get("item", {}).get("id")
    )
    gql_errors = (parsed or {}).get("errors") if isinstance(parsed, dict) else None
    _emit_act("act", "github_envelope_ack", f"project-add-{args.issue}-{ts}", {
        "verb": verb, "envelope_sent": sent, "http_status": status, "item_id": item_id,
        "error": (err or json.dumps(gql_errors)[:300]) if (status >= 400 or gql_errors) else None,
        "ts": _utc_ts(),
    })
    if 200 <= status < 300 and item_id and not gql_errors:
        print(f"attached: issue=#{args.issue} project={args.project} item_id={item_id}")
        return 0
    print(
        f"arq-github project add failed: status={status} err={err or gql_errors}",
        file=sys.stderr,
    )
    return 1


REVIEW_THREADS_PAGE_SIZE = 50  # GitHub GraphQL max per page is 100; 50 keeps payload bounded
REVIEW_THREADS_MAX_PAGES = 20  # 20 × 50 = 1000-thread hard cap (safety)
REVIEW_COMMENT_BODY_EXCERPT_LEN = 1000


def _thread_inventory_row(thread: dict) -> dict:
    """Canonical inventory shape for a single review thread.

    Used by tests + downstream classification tooling. The contract is:
    every key listed here is always present; missing GraphQL fields fall
    back to safe defaults (None / [] / False).
    """
    comments_data = ((thread or {}).get("comments") or {}).get("nodes") or []
    canonical_comments: list[dict] = []
    for c in comments_data:
        author = ((c or {}).get("author") or {}).get("login")
        body = (c or {}).get("body") or ""
        body_excerpt = body
        if len(body_excerpt) > REVIEW_COMMENT_BODY_EXCERPT_LEN:
            body_excerpt = body_excerpt[:REVIEW_COMMENT_BODY_EXCERPT_LEN] + "…[truncated]"
        commit_oid = ((c or {}).get("commit") or {}).get("oid")
        canonical_comments.append({
            "id": (c or {}).get("id"),
            "author": author,
            "body_excerpt": body_excerpt,
            "body_length": len(body),
            "created_at": (c or {}).get("createdAt"),
            "commit_id": commit_oid[:10] if isinstance(commit_oid, str) else None,
        })
    return {
        "thread_id": (thread or {}).get("id"),
        "is_resolved": bool((thread or {}).get("isResolved")),
        "is_outdated": bool((thread or {}).get("isOutdated")),
        "path": (thread or {}).get("path"),
        "line": (thread or {}).get("line"),
        "comments": canonical_comments,
    }


def _print_review_thread_rows(rows: list[dict], *, as_json: bool) -> None:
    if as_json:
        print(json.dumps(rows, indent=2))
        return
    if not rows:
        print("(no review threads)")
        return
    for thread in rows:
        resolved = "[R]" if thread["is_resolved"] else "   "
        outdated = "[O]" if thread["is_outdated"] else "   "
        path = thread["path"] or "(no path)"
        line = thread["line"] if thread["line"] is not None else "?"
        print(f"{resolved}{outdated} {path}:{line}  (thread={thread['thread_id']})")
        for c in thread["comments"]:
            author = c["author"] or "(unknown)"
            created = c["created_at"] or ""
            commit_id = c["commit_id"] or ""
            preview = (c["body_excerpt"] or "").strip().splitlines()
            preview_line = preview[0] if preview else "(empty)"
            if len(preview_line) > 120:
                preview_line = preview_line[:117] + "…"
            print(
                f"    {author:<24} {created}  sha={commit_id}\n"
                f"        {preview_line}"
            )


_REVIEW_THREADS_GRAPHQL = """
query($owner:String!,$repo:String!,$pr:Int!,$after:String) {
  repository(owner:$owner,name:$repo) {
    pullRequest(number:$pr) {
      reviewThreads(first:%d, after:$after) {
        nodes {
          id
          isResolved
          isOutdated
          path
          line
          comments(first:50) {
            nodes {
              id
              author { login }
              body
              createdAt
              commit { oid }
            }
          }
        }
        pageInfo { hasNextPage endCursor }
      }
    }
  }
}
""" % (REVIEW_THREADS_PAGE_SIZE,)


def cmd_pr_review_threads_list(args: argparse.Namespace) -> int:
    """List review threads on a PR — READ-ONLY.

    Closes the substrate gap arq-github-review-thread-inspection-v1: ARQERA
    previously had no governed path to inspect PR review threads, forcing
    manual GitHub UI bridging. Now Seer/Sentry/Reviewer-bot threads can be
    classified before merge.

    This function MUST NEVER dispatch any write/resolve/dismiss/delete
    operation. The parser surface enforces this structurally (only
    `list` is registered under `pr review-threads`). Tests assert that
    invariant.

    Output (canonical, also returned by --json):

        {thread_id, is_resolved, is_outdated, path, line,
         comments: [{id, author, body_excerpt, body_length,
                     created_at, commit_id}]}

    Substrate envelope trail is preserved: `github_envelope_sent` before
    the GraphQL dispatch, `github_envelope_ack` after every page.
    """
    outcome, result = _primitive_invoke(
        "github.pr.review_threads.list",
        {
            "owner": args.owner,
            "repo": args.repo,
            "pr_number": args.pr,
            "include_resolved": args.include_resolved,
        },
    )
    if outcome == "ok":
        rows = [_thread_inventory_row(t) for t in (result.get("threads") or [])]
        if not args.include_resolved:
            rows = [r for r in rows if not r["is_resolved"]]
        _print_review_thread_rows(rows, as_json=args.json)
        return 0
    if outcome == "error":
        print(
            f"arq-github pr review-threads list failed via primitive: "
            f"{json.dumps(result, indent=2)}",
            file=sys.stderr,
        )
        return 1

    # Fallback: GraphQL via direct REST (POST /graphql).
    verb = "pr.review_threads.list"
    ts = _utc_ts()
    sent = _emit_act(
        "act",
        "github_envelope_sent",
        f"pr-review-threads-{args.pr}-{ts}",
        {
            "verb": verb,
            "owner": args.owner,
            "repo": args.repo,
            "pr_number": args.pr,
            "include_resolved": args.include_resolved,
            "actor": _gh_active_user(),
            "actor_peer": ACTOR_PEER_ADDRESS,
            "ts": ts,
            "dispatch_path": "direct_graphql_fallback",
        },
    )
    token = _gh_auth_token()

    all_threads: list[dict] = []
    after: str | None = None
    pages_fetched = 0
    while pages_fetched < REVIEW_THREADS_MAX_PAGES:
        variables: dict[str, object] = {
            "owner": args.owner,
            "repo": args.repo,
            "pr": int(args.pr),
            "after": after,
        }
        status, parsed, err = _api_call(
            "POST",
            "/graphql",
            token=token,
            body={"query": _REVIEW_THREADS_GRAPHQL, "variables": variables},
        )
        if not (200 <= status < 300):
            _emit_act(
                "act",
                "github_envelope_ack",
                f"pr-review-threads-{args.pr}-{ts}",
                {
                    "verb": verb,
                    "envelope_sent": sent,
                    "http_status": status,
                    "page": pages_fetched + 1,
                    "error": err,
                    "ts": _utc_ts(),
                },
            )
            print(
                f"arq-github pr review-threads list failed: "
                f"status={status} err={err}",
                file=sys.stderr,
            )
            return 1
        gql_errors = (parsed or {}).get("errors") if isinstance(parsed, dict) else None
        if gql_errors:
            _emit_act(
                "act",
                "github_envelope_ack",
                f"pr-review-threads-{args.pr}-{ts}",
                {
                    "verb": verb,
                    "envelope_sent": sent,
                    "http_status": status,
                    "page": pages_fetched + 1,
                    "graphql_errors": json.dumps(gql_errors)[:500],
                    "ts": _utc_ts(),
                },
            )
            print(
                f"arq-github pr review-threads list: GraphQL errors: "
                f"{json.dumps(gql_errors)}",
                file=sys.stderr,
            )
            return 1
        pr_data = (
            ((parsed or {}).get("data") or {})
            .get("repository", {})
            .get("pullRequest")
        )
        if pr_data is None:
            _emit_act(
                "act",
                "github_envelope_ack",
                f"pr-review-threads-{args.pr}-{ts}",
                {
                    "verb": verb,
                    "envelope_sent": sent,
                    "http_status": status,
                    "error": "pullRequest not found in GraphQL response",
                    "ts": _utc_ts(),
                },
            )
            print(
                f"arq-github pr review-threads list: PR #{args.pr} "
                f"not found in {args.owner}/{args.repo}",
                file=sys.stderr,
            )
            return 1
        review_threads_page = pr_data.get("reviewThreads") or {}
        nodes = review_threads_page.get("nodes") or []
        all_threads.extend(nodes)
        pages_fetched += 1
        page_info = review_threads_page.get("pageInfo") or {}
        if not page_info.get("hasNextPage"):
            break
        next_cursor = page_info.get("endCursor")
        if not next_cursor:
            # Defensive: if GitHub reports hasNextPage=True but does not
            # supply an endCursor, do NOT continue with after=None — that
            # would re-fetch page 1 and accumulate duplicates until
            # REVIEW_THREADS_MAX_PAGES. Bail with what we have; the
            # next-page envelope (if any) is unrecoverable from here.
            _emit_act(
                "act",
                "github_envelope_ack",
                f"pr-review-threads-{args.pr}-{ts}-partial",
                {
                    "verb": verb,
                    "envelope_sent": sent,
                    "http_status": status,
                    "pages_fetched": pages_fetched,
                    "warning": "hasNextPage=true with null endCursor; truncating",
                    "ts": _utc_ts(),
                },
            )
            break
        after = next_cursor

    _emit_act(
        "act",
        "github_envelope_ack",
        f"pr-review-threads-{args.pr}-{ts}",
        {
            "verb": verb,
            "envelope_sent": sent,
            "http_status": 200,
            "pages_fetched": pages_fetched,
            "thread_count": len(all_threads),
            "ts": _utc_ts(),
        },
    )

    rows = [_thread_inventory_row(t) for t in all_threads]
    if not args.include_resolved:
        rows = [r for r in rows if not r["is_resolved"]]
    _print_review_thread_rows(rows, as_json=args.json)
    return 0


# ─── Write-side review-thread verbs (resolve + reply) ──────────────────────
#
# Companion to cmd_pr_review_threads_list. These two verbs close the loop
# the principle `mesh-diagnostic-surface-keeps-pace-with-action-surface-v1`
# requires: a governed read (above) MUST be paired with governed writes
# (below) for the same domain, so Seer/Sentry threads can be classified
# AND closed without manual GitHub-UI bridging by a human.
#
# Surface kept minimal: resolve + reply. NO delete / dismiss / unresolve
# / transfer. The structural invariant test enforces exactly this list.

_RESOLVE_REVIEW_THREAD_MUTATION = """
mutation($threadId: ID!) {
  resolveReviewThread(input: {threadId: $threadId}) {
    thread {
      id
      isResolved
    }
  }
}
"""


_ADD_REVIEW_THREAD_REPLY_MUTATION = """
mutation($threadId: ID!, $body: String!) {
  addPullRequestReviewThreadReply(input: {pullRequestReviewThreadId: $threadId, body: $body}) {
    comment {
      id
      body
      author { login }
      createdAt
    }
  }
}
"""


def cmd_pr_review_threads_resolve(args: argparse.Namespace) -> int:
    """Mark a PR review thread as resolved (GraphQL resolveReviewThread).

    Thread-id is the GraphQL node id (e.g. `PRRT_kwDORNq-1M6Bp6Ej`).
    Get it from `arq-github pr review-threads list <pr> --json`.

    Substrate envelope trail preserved: `github_envelope_sent` before
    dispatch, `github_envelope_ack` after.
    """
    outcome, result = _primitive_invoke(
        "github.pr.review_threads.resolve",
        {"owner": args.owner, "repo": args.repo, "pr_number": args.pr,
         "thread_id": args.thread},
    )
    if outcome == "ok":
        print(json.dumps(result, indent=2))
        return 0
    if outcome == "error":
        print(f"arq-github pr review-threads resolve failed via primitive: "
              f"{json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    # Fallback: GraphQL via direct REST.
    verb = "pr.review_threads.resolve"
    ts = _utc_ts()
    sent = _emit_act(
        "act",
        "github_envelope_sent",
        f"pr-review-threads-resolve-{args.pr}-{args.thread}-{ts}",
        {
            "verb": verb,
            "owner": args.owner,
            "repo": args.repo,
            "pr_number": args.pr,
            "thread_id": args.thread,
            "actor": _gh_active_user(),
            "actor_peer": ACTOR_PEER_ADDRESS,
            "ts": ts,
            "dispatch_path": "direct_graphql_fallback",
        },
    )
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "POST",
        "/graphql",
        token=token,
        body={
            "query": _RESOLVE_REVIEW_THREAD_MUTATION,
            "variables": {"threadId": args.thread},
        },
    )
    gql_errors = (parsed or {}).get("errors") if isinstance(parsed, dict) else None
    resolved_state = (
        ((parsed or {}).get("data") or {})
        .get("resolveReviewThread", {})
        .get("thread", {})
        .get("isResolved")
    )
    _emit_act(
        "act",
        "github_envelope_ack",
        f"pr-review-threads-resolve-{args.pr}-{args.thread}-{ts}",
        {
            "verb": verb,
            "envelope_sent": sent,
            "http_status": status,
            "is_resolved": resolved_state,
            "graphql_errors": json.dumps(gql_errors)[:500] if gql_errors else None,
            "error": err if status >= 400 or status == 0 else None,
            "ts": _utc_ts(),
        },
    )
    if not (200 <= status < 300) or gql_errors:
        print(
            f"arq-github pr review-threads resolve failed: "
            f"status={status} err={err or json.dumps(gql_errors)}",
            file=sys.stderr,
        )
        return 1
    if resolved_state is not True:
        print(
            f"arq-github pr review-threads resolve: unexpected response — "
            f"thread {args.thread} reported isResolved={resolved_state!r}",
            file=sys.stderr,
        )
        return 1
    print(f"resolved: thread={args.thread}")
    return 0


def cmd_pr_review_threads_reply(args: argparse.Namespace) -> int:
    """Post a threaded reply to a PR review thread.

    Uses GraphQL `addPullRequestReviewThreadReply` which takes the thread
    node id directly — no need to look up the first comment's REST id.

    Substrate envelope trail preserved: `github_envelope_sent` before
    dispatch, `github_envelope_ack` after.
    """
    body = _resolve_body(args)
    if not body:
        print("arq-github pr review-threads reply: empty body", file=sys.stderr)
        return 2

    outcome, result = _primitive_invoke(
        "github.pr.review_threads.reply",
        {"owner": args.owner, "repo": args.repo, "pr_number": args.pr,
         "thread_id": args.thread, "body": body},
    )
    if outcome == "ok":
        print(json.dumps(result, indent=2))
        return 0
    if outcome == "error":
        print(f"arq-github pr review-threads reply failed via primitive: "
              f"{json.dumps(result, indent=2)}", file=sys.stderr)
        return 1

    verb = "pr.review_threads.reply"
    ts = _utc_ts()
    sent = _emit_act(
        "act",
        "github_envelope_sent",
        f"pr-review-threads-reply-{args.pr}-{args.thread}-{ts}",
        {
            "verb": verb,
            "owner": args.owner,
            "repo": args.repo,
            "pr_number": args.pr,
            "thread_id": args.thread,
            "body_length": len(body),
            "actor": _gh_active_user(),
            "actor_peer": ACTOR_PEER_ADDRESS,
            "ts": ts,
            "dispatch_path": "direct_graphql_fallback",
        },
    )
    token = _gh_auth_token()
    status, parsed, err = _api_call(
        "POST",
        "/graphql",
        token=token,
        body={
            "query": _ADD_REVIEW_THREAD_REPLY_MUTATION,
            "variables": {"threadId": args.thread, "body": body},
        },
    )
    gql_errors = (parsed or {}).get("errors") if isinstance(parsed, dict) else None
    new_comment = (
        ((parsed or {}).get("data") or {})
        .get("addPullRequestReviewThreadReply", {})
        .get("comment", {})
    )
    _emit_act(
        "act",
        "github_envelope_ack",
        f"pr-review-threads-reply-{args.pr}-{args.thread}-{ts}",
        {
            "verb": verb,
            "envelope_sent": sent,
            "http_status": status,
            "comment_id": new_comment.get("id"),
            "graphql_errors": json.dumps(gql_errors)[:500] if gql_errors else None,
            "error": err if status >= 400 or status == 0 else None,
            "ts": _utc_ts(),
        },
    )
    if not (200 <= status < 300) or gql_errors:
        print(
            f"arq-github pr review-threads reply failed: "
            f"status={status} err={err or json.dumps(gql_errors)}",
            file=sys.stderr,
        )
        return 1
    if not new_comment.get("id"):
        print(
            f"arq-github pr review-threads reply: unexpected response — "
            f"no comment id returned",
            file=sys.stderr,
        )
        return 1
    print(f"replied: thread={args.thread} comment_id={new_comment['id']}")
    return 0





# Column widths for the tabular `repo list` view. Used by _print_repo_rows
# to truncate long values so the table layout stays readable. Sentry finding
# 2026-05-12 PRRT_kwDORNq-1M6BlN1c flagged that the old printer lacked these
# truncations (and the prior `_format_repo_row` helper that did have them
# was dead code). Both bugs fixed together: helper removed, truncations
# applied in the active printer.
REPO_ROW_NAME_WIDTH = 40
REPO_ROW_BRANCH_WIDTH = 25
REPO_ROW_VISIBILITY_WIDTH = 8


def _repo_inventory_row(repo: dict) -> dict:
    """The canonical inventory shape for downstream consumers."""
    return {
        "name": repo.get("name"),
        "default_branch": repo.get("default_branch"),
        "archived": bool(repo.get("archived")),
        "visibility": repo.get("visibility")
            or ("private" if repo.get("private") else "public"),
        "updated_at": repo.get("updated_at"),
    }


REPO_LIST_MAX_PAGES = 20  # 20 pages × 100 = 2000 repos hard cap (safety)
REPO_LIST_PER_PAGE = 100


def cmd_repo_list(args: argparse.Namespace) -> int:
    """Enumerate repos under an organization.

    Read-only. Read-only. Read-only. The bridge MUST NOT expose any write verb
    for repos via this code path; that is enforced by the parser surface (only
    `list` is registered today) and by the fact that this function exclusively
    issues HTTP GET against /orgs/{org}/repos.

    Output shape (canonical, also used by tests + downstream inventory):
        {name, default_branch, archived, visibility, updated_at}

    The substrate envelope trail is preserved: a `github_envelope_sent` act is
    emitted before the dispatch and a `github_envelope_ack` act after every
    page (success or failure). This is the same envelope discipline that all
    other arq-github verbs follow.
    """
    # Primitive-first: when the daemon adds `github.repo.list`, this branch
    # automatically uses it and inherits the daemon-side envelope trail.
    outcome, result = _primitive_invoke(
        "github.repo.list",
        {
            "owner": args.owner,
            "type": args.type,
            "sort": args.sort,
            "limit": args.limit,
            "include_archived": args.include_archived,
        },
    )
    if outcome == "ok":
        rows = [_repo_inventory_row(r) for r in (result.get("repos") or [])]
        if not args.include_archived:
            rows = [r for r in rows if not r["archived"]]
        if args.limit > 0:
            rows = rows[: args.limit]
        _print_repo_rows(rows, as_json=args.json)
        return 0
    if outcome == "error":
        print(
            f"arq-github repo list failed via primitive: {json.dumps(result, indent=2)}",
            file=sys.stderr,
        )
        return 1

    # Fallback: direct REST against /orgs/{owner}/repos with manual pagination.
    verb = "repo.list"
    ts = _utc_ts()
    sent = _emit_act(
        "act",
        "github_envelope_sent",
        f"repo-list-{args.owner}-{ts}",
        {
            "verb": verb,
            "owner": args.owner,
            "type": args.type,
            "sort": args.sort,
            "limit": args.limit,
            "include_archived": args.include_archived,
            "actor": _gh_active_user(),
            "actor_peer": ACTOR_PEER_ADDRESS,
            "ts": ts,
            "dispatch_path": "direct_rest_fallback",
        },
    )
    token = _gh_auth_token()

    all_repos: list[dict] = []
    page = 1
    pages_fetched = 0  # incremented after each successful fetch; the
    # canonical count for the envelope_ack. Decoupled from `page` so that
    # the soft-cap early-exit (after `page += 1`) does NOT over-report by
    # one. Sentry 2026-05-12 PRRT_kwDORNq-1M6BlN1a.
    while page <= REPO_LIST_MAX_PAGES:
        path = (
            f"/orgs/{args.owner}/repos"
            f"?per_page={REPO_LIST_PER_PAGE}"
            f"&page={page}"
            f"&type={args.type}"
            f"&sort={args.sort}"
        )
        status, parsed, err = _api_call("GET", path, token=token)
        if not (200 <= status < 300):
            _emit_act(
                "act",
                "github_envelope_ack",
                f"repo-list-{args.owner}-{ts}",
                {
                    "verb": verb,
                    "envelope_sent": sent,
                    "http_status": status,
                    "page_attempted": page,
                    "pages_fetched": pages_fetched,
                    "error": err,
                    "ts": _utc_ts(),
                },
            )
            print(
                f"arq-github repo list failed: status={status} err={err}",
                file=sys.stderr,
            )
            return 1
        if not isinstance(parsed, list) or not parsed:
            break
        all_repos.extend(parsed)
        pages_fetched += 1  # ← count the page we just successfully fetched
        if len(parsed) < REPO_LIST_PER_PAGE:
            break
        page += 1
        # Soft-cap: if user asked for fewer than we've already collected
        # (across pages), stop fetching — but keep at least one extra page
        # of headroom so the limit can be applied accurately after filtering.
        if args.limit > 0 and not args.include_archived:
            non_archived = sum(1 for r in all_repos if not r.get("archived"))
            if non_archived >= args.limit:
                break
        elif args.limit > 0 and len(all_repos) >= args.limit:
            break

    _emit_act(
        "act",
        "github_envelope_ack",
        f"repo-list-{args.owner}-{ts}",
        {
            "verb": verb,
            "envelope_sent": sent,
            "http_status": 200,
            "pages_fetched": pages_fetched,
            "count_raw": len(all_repos),
            "ts": _utc_ts(),
        },
    )

    rows = [_repo_inventory_row(r) for r in all_repos]
    if not args.include_archived:
        rows = [r for r in rows if not r["archived"]]
    if args.limit > 0:
        rows = rows[: args.limit]
    _print_repo_rows(rows, as_json=args.json)
    return 0


def cmd_repo_edit(args: argparse.Namespace) -> int:
    """Edit repository properties (v1: visibility only) — substrate-attested.

    Routes through the arq mesh per arq://doc/policy/twin-dispatch-mesh-only-v2.
    Emits envelope_sent before dispatch + envelope_ack after, capturing the
    full mutation chain. GitHub requires explicit consent for visibility
    flips, mirrored here by --accept-visibility-change-consequences.

    Per arq://body/arqera_gap/arq-github-missing-repo-edit-verb-2026-05-27
    + arq://doc/principle/blocks-must-be-substrate-native-not-docs-v1: the
    blocker is the signal; build the native verb, then use it.
    """
    if not args.accept_visibility_change_consequences:
        print(
            "arq-github repo edit: visibility flip requires "
            "--accept-visibility-change-consequences (mirrors gh repo edit; "
            "explicit-consent for org-visible side effects)",
            file=sys.stderr,
        )
        return 2

    verb = "repo.edit"
    ts = _utc_ts()
    sent = _emit_act(
        "act",
        "github_envelope_sent",
        f"repo-edit-{args.owner}-{args.repo}-{ts}",
        {
            "verb": verb,
            "owner": args.owner,
            "repo": args.repo,
            "fields_changed": {"visibility": args.visibility},
            "actor": _gh_active_user(),
            "issued_at": ts,
        },
    )

    token = _gh_auth_token()
    # GitHub's PATCH /repos/{owner}/{repo} accepts {"visibility": "public|private|internal"}.
    # Setting visibility to public on a previously-private repo also exposes
    # the full commit history — mirroring gh repo edit's consent flag.
    body = {"visibility": args.visibility}
    status, parsed, err = _api_call(
        "PATCH", f"/repos/{args.owner}/{args.repo}", token=token, body=body,
    )

    ack_payload = {
        "verb": verb,
        "owner": args.owner,
        "repo": args.repo,
        "envelope_sent_act": sent,
        "http_status": status,
        "outcome": "ok" if status and 200 <= status < 300 else "error",
        "error": err,
        "actor": _gh_active_user(),
        "issued_at": _utc_ts(),
    }
    _emit_act(
        "act",
        "github_envelope_ack",
        f"repo-edit-ack-{args.owner}-{args.repo}-{_utc_ts()}",
        ack_payload,
    )

    if status and 200 <= status < 300:
        new_vis = (parsed or {}).get("visibility", "?")
        print(f"✓ {args.owner}/{args.repo} visibility = {new_vis}")
        return 0
    print(
        f"arq-github repo edit failed: HTTP {status} — {err or (parsed and json.dumps(parsed)[:200])}",
        file=sys.stderr,
    )
    return 1


def cmd_repo_create(args: argparse.Namespace) -> int:
    """Create a new repository — substrate-attested.

    Routes through arq mesh per arq://doc/policy/twin-dispatch-mesh-only-v2.
    Mirrors cmd_repo_edit envelope_sent → ack pattern; the only delta is
    POST /orgs/{owner}/repos (org) vs /user/repos (user) and the body
    shape. Used to unblock cross-language mesh-native client publication
    (arq-substrate-go, arq_substrate_ex, future ts/swift/kotlin).

    Per arq://body/arqera_gap/arq-github-missing-repo-create-verb-2026-05-27.
    """
    verb = "repo.create"
    ts = _utc_ts()
    sent = _emit_act(
        "act",
        "github_envelope_sent",
        f"repo-create-{args.owner}-{args.repo}-{ts}",
        {
            "verb": verb,
            "owner": args.owner,
            "repo": args.repo,
            "visibility": args.visibility,
            "description": args.description,
            "actor": _gh_active_user(),
            "issued_at": ts,
        },
    )

    token = _gh_auth_token()
    body: dict = {
        "name": args.repo,
        "private": args.visibility == "private",
        "auto_init": False,
        "has_issues": True,
        "has_projects": False,
        "has_wiki": False,
    }
    if args.visibility == "internal":
        body["visibility"] = "internal"
    if args.description:
        body["description"] = args.description
    if args.homepage:
        body["homepage"] = args.homepage

    if args.owner.lower() in ("@me", _gh_active_user().lower()):
        path = "/user/repos"
    else:
        path = f"/orgs/{args.owner}/repos"
    status, parsed, err = _api_call("POST", path, token=token, body=body)

    ack_payload = {
        "verb": verb,
        "owner": args.owner,
        "repo": args.repo,
        "envelope_sent_act": sent,
        "http_status": status,
        "outcome": "ok" if status and 200 <= status < 300 else "error",
        "error": err,
        "clone_url": (parsed or {}).get("clone_url"),
        "ssh_url": (parsed or {}).get("ssh_url"),
        "actor": _gh_active_user(),
        "issued_at": _utc_ts(),
    }
    _emit_act(
        "act",
        "github_envelope_ack",
        f"repo-create-ack-{args.owner}-{args.repo}-{_utc_ts()}",
        ack_payload,
    )

    if status and 200 <= status < 300:
        clone = (parsed or {}).get("clone_url") or ""
        print(f"✓ created {args.owner}/{args.repo} ({args.visibility}) — {clone}")
        return 0
    print(
        f"arq-github repo create failed: HTTP {status} — {err or (parsed and json.dumps(parsed)[:200])}",
        file=sys.stderr,
    )
    return 1


def _print_repo_rows(rows: list[dict], *, as_json: bool) -> None:
    if as_json:
        print(json.dumps(rows, indent=2))
        return
    for r in rows:
        archived_marker = "[A]" if r["archived"] else "   "
        # Truncate long values so the table layout stays readable —
        # particularly the repo name column, which GitHub allows up to
        # 100 chars. Without these the column widths would buckle on
        # long names. Sentry 2026-05-12 PRRT_kwDORNq-1M6BlN1c.
        visibility = ((r["visibility"] or "?")[: REPO_ROW_VISIBILITY_WIDTH])
        name = ((r["name"] or "")[: REPO_ROW_NAME_WIDTH])
        default_branch = ((r["default_branch"] or "")[: REPO_ROW_BRANCH_WIDTH])
        updated_at = r["updated_at"] or ""
        print(
            f"{archived_marker} {visibility:<{REPO_ROW_VISIBILITY_WIDTH}} "
            f"{name:<{REPO_ROW_NAME_WIDTH}} "
            f"{default_branch:<{REPO_ROW_BRANCH_WIDTH}} {updated_at}"
        )

from pathlib import Path as _ScopesPath
import sys as _scopes_sys
_scopes_sys.path.insert(0, str(_ScopesPath(__file__).parent))
from _arq_provider_base import handle_meta_flags  # noqa: E402

PROVIDER = "github"
REQUIRED_SCOPES: dict[str, list[str]] = {
    # GitHub fine-grained PAT permission names (classic PAT `repo` / `workflow`
    # satisfies these as a superset). Source: GitHub REST API docs per route.
    "fetch": ["contents:read"],
    "push": ["contents:write"],
    "issue create": ["issues:write"],
    "issue comment": ["issues:write"],
    "issue view": ["issues:read"],
    "issue list": ["issues:read"],
    "run view": ["actions:read"],
    "run list": ["actions:read"],
    "run jobs": ["actions:read"],
    "run rerun": ["actions:write"],
    "run logs": ["actions:read"],
    "workflow dispatch": ["actions:write"],
    "code scanning alerts": ["security_events:read"],
    "pr view": ["pull_requests:read"],
    "pr list": ["pull_requests:read"],
    "pr checks": ["actions:read", "pull_requests:read"],
    "pr wait check": ["actions:read", "pull_requests:read"],
    "pr wait required": ["actions:read", "pull_requests:read"],
    "pr create": ["pull_requests:write"],
    "pr review": ["pull_requests:write"],
    "pr review threads list": ["pull_requests:read"],
    "pr review threads reply": ["pull_requests:write"],
    "pr review threads resolve": ["pull_requests:write"],
    "pr edit body": ["pull_requests:write"],
    "pr files": ["pull_requests:read"],
    "pr close": ["pull_requests:write"],
    "pr retarget": ["pull_requests:write"],
    "pr merge": ["pull_requests:write", "contents:write"],
    "pr merge via substrate": ["pull_requests:write", "contents:write"],
    "pr substrate approve": ["pull_requests:write"],
    "repo list": ["metadata:read"],
    "project add": ["repository_projects:write"],
}

def main() -> int:
    handle_meta_flags(PROVIDER, REQUIRED_SCOPES)
    parser = argparse.ArgumentParser(
        prog="arq-github",
        description="GitHub bridge MVP — mesh-routed Twin dispatches. "
        "Every verb is wrapped in a signed envelope on the substrate.",
    )
    sub = parser.add_subparsers(dest="verb", required=True)

    # fetch
    p_fetch = sub.add_parser("fetch", help="git fetch wrapped in a substrate envelope")
    p_fetch.add_argument("remote", nargs="?", default="origin")
    p_fetch.add_argument("ref", nargs="?", default=None, help="optional specific ref to fetch")
    p_fetch.add_argument("--all", action="store_true", help="fetch --all remotes")
    p_fetch.set_defaults(func=cmd_fetch)

    # push
    p_push = sub.add_parser("push", help="git push wrapped in a substrate envelope")
    p_push.add_argument("remote")
    p_push.add_argument("branch")
    p_push.add_argument("--force-with-lease", action="store_true")
    p_push.set_defaults(func=cmd_push)

    def add_repo_args(p: argparse.ArgumentParser) -> None:
        p.add_argument("--owner", default=DEFAULT_OWNER)
        p.add_argument("--repo", default=DEFAULT_REPO)

    # issue
    issue_parent = sub.add_parser("issue", help="issue verbs")
    issue_sub = issue_parent.add_subparsers(dest="issue_verb", required=True)

    p_issue_create = issue_sub.add_parser("create")
    p_issue_create.add_argument("--title", required=True)
    p_issue_create.add_argument("--body", default="")
    p_issue_create.add_argument("--labels", default="", help="comma-separated label names")
    add_repo_args(p_issue_create)
    p_issue_create.set_defaults(func=cmd_issue_create)

    p_issue_comment = issue_sub.add_parser("comment")
    p_issue_comment.add_argument("issue", type=int)
    p_issue_comment.add_argument("--body", required=True)
    add_repo_args(p_issue_comment)
    p_issue_comment.set_defaults(func=cmd_issue_comment)

    p_issue_view = issue_sub.add_parser("view")
    p_issue_view.add_argument("issue", type=int)
    add_repo_args(p_issue_view)
    p_issue_view.set_defaults(func=cmd_issue_view)

    p_issue_list = issue_sub.add_parser("list")
    p_issue_list.add_argument("--state", default="open", choices=["open", "closed", "all"])
    p_issue_list.add_argument("--limit", type=int, default=30)
    p_issue_list.add_argument(
        "--label", default=None,
        help="filter by label (single name or comma-separated; matches all)",
    )
    add_repo_args(p_issue_list)
    p_issue_list.set_defaults(func=cmd_issue_list)

    # run — read verbs (GitHub Actions workflow runs)
    run_parent = sub.add_parser("run", help="workflow run queries (read-only)")
    run_sub = run_parent.add_subparsers(dest="run_verb", required=True)

    p_run_view = run_sub.add_parser("view")
    p_run_view.add_argument("run", type=int)
    add_repo_args(p_run_view)
    p_run_view.set_defaults(func=cmd_run_view)

    p_run_list = run_sub.add_parser("list")
    p_run_list.add_argument("--workflow", default=None, help="workflow filename or display-name")
    p_run_list.add_argument("--limit", type=int, default=10)
    add_repo_args(p_run_list)
    p_run_list.set_defaults(func=cmd_run_list)

    p_run_jobs = run_sub.add_parser(
        "jobs", help="list sub-jobs inside a workflow run with per-job status/conclusion",
    )
    p_run_jobs.add_argument("run", type=int)
    p_run_jobs.add_argument(
        "--steps", action="store_true",
        help="also print step-level status under each job",
    )
    add_repo_args(p_run_jobs)
    p_run_jobs.set_defaults(func=cmd_run_jobs)

    p_run_rerun = run_sub.add_parser(
        "rerun", help="re-run a workflow run (all jobs, or --failed for failed-only)",
    )
    p_run_rerun.add_argument("run", type=int)
    p_run_rerun.add_argument(
        "--failed", action="store_true",
        help="re-run only failed jobs (POST /actions/runs/<id>/rerun-failed-jobs)",
    )
    add_repo_args(p_run_rerun)
    p_run_rerun.set_defaults(func=cmd_run_rerun)

    p_run_logs = run_sub.add_parser(
        "logs", help="fetch plain-text logs for one job inside a workflow run",
    )
    p_run_logs.add_argument("--job", type=int, required=True, help="job id (from `run jobs`)")
    p_run_logs.add_argument(
        "--tail", type=int, default=0,
        help="only print the last N lines (0 = full log)",
    )
    add_repo_args(p_run_logs)
    p_run_logs.set_defaults(func=cmd_run_logs)

    # workflow — write verbs (workflow_dispatch)
    workflow_parent = sub.add_parser(
        "workflow",
        help="workflow dispatch verbs (substrate-attested write surface)",
    )
    workflow_sub = workflow_parent.add_subparsers(dest="workflow_verb", required=True)

    p_workflow_dispatch = workflow_sub.add_parser(
        "dispatch",
        help=(
            "trigger a workflow_dispatch run with substrate approval ref "
            "(replaces the gh-workflow-run + mesh-bypass path)"
        ),
    )
    p_workflow_dispatch.add_argument(
        "--workflow",
        required=True,
        help="workflow filename (e.g. deploy-staging.yml) or display name",
    )
    p_workflow_dispatch.add_argument(
        "--ref",
        required=True,
        help="branch or tag to dispatch on (e.g. main)",
    )
    p_workflow_dispatch.add_argument(
        "--substrate-approval-ref",
        dest="substrate_approval_ref",
        required=True,
        help=(
            "arq://act/... address authorising this dispatch. The act must "
            "resolve on substrate or the call is refused (no implicit "
            "approvals, no silent deploys)."
        ),
    )
    p_workflow_dispatch.add_argument(
        "--reason",
        required=True,
        help="audit-trail justification for this dispatch",
    )
    p_workflow_dispatch.add_argument(
        "-F", "--field",
        action="append",
        default=[],
        metavar="KEY=VALUE",
        help=(
            "workflow_dispatch input as key=value (repeatable). "
            "substrate_approval_ref is auto-injected — do not pass it via -F."
        ),
    )
    add_repo_args(p_workflow_dispatch)
    p_workflow_dispatch.set_defaults(func=cmd_workflow_dispatch)

    # code-scanning
    cs_parent = sub.add_parser(
        "code-scanning", help="GitHub Code Scanning alert queries (read-only)",
    )
    cs_sub = cs_parent.add_subparsers(dest="cs_verb", required=True)

    p_cs_alerts = cs_sub.add_parser(
        "alerts", help="list Code Scanning alerts filtered by tool/state/severity",
    )
    p_cs_alerts.add_argument(
        "--tool", default=None,
        help="filter by tool name (Trivy, CodeQL, semgrep, ...)",
    )
    p_cs_alerts.add_argument(
        "--state", default="open",
        choices=["open", "closed", "dismissed", "fixed"],
        help="alert state (default: open)",
    )
    p_cs_alerts.add_argument(
        "--severity", default=None,
        choices=["critical", "high", "medium", "low", "warning", "note", "error"],
        help="filter by alert severity",
    )
    p_cs_alerts.add_argument(
        "--ref", default=None,
        help="filter to a specific branch (default: all branches)",
    )
    p_cs_alerts.add_argument("--limit", type=int, default=30)
    p_cs_alerts.add_argument(
        "--verbose", action="store_true",
        help="also print alert message text under each row",
    )
    p_cs_alerts.add_argument(
        "--json", action="store_true",
        help="emit raw JSON (full alert objects) instead of tabular output",
    )
    add_repo_args(p_cs_alerts)
    p_cs_alerts.set_defaults(func=cmd_code_scanning_alerts)

    # pr
    pr_parent = sub.add_parser("pr", help="pull-request verbs")
    pr_sub = pr_parent.add_subparsers(dest="pr_verb", required=True)

    p_pr_view = pr_sub.add_parser("view")
    p_pr_view.add_argument("pr", type=int)
    add_repo_args(p_pr_view)
    p_pr_view.set_defaults(func=cmd_pr_view)

    p_pr_list = pr_sub.add_parser("list")
    p_pr_list.add_argument("--state", default="open", choices=["open", "closed", "all"])
    p_pr_list.add_argument("--limit", type=int, default=10)
    add_repo_args(p_pr_list)
    p_pr_list.set_defaults(func=cmd_pr_list)

    p_pr_checks = pr_sub.add_parser("checks")
    p_pr_checks.add_argument("pr", type=int)
    add_repo_args(p_pr_checks)
    p_pr_checks.set_defaults(func=cmd_pr_checks)

    # ── governed wait/watch primitives (close the polling-loop anti-pattern) ──
    p_pr_wait_check = pr_sub.add_parser(
        "wait-check",
        help=(
            "wait for a single named check on a PR to reach terminal state "
            "(substrate-attested replacement for bash polling loops)"
        ),
    )
    p_pr_wait_check.add_argument("pr", type=int)
    p_pr_wait_check.add_argument(
        "--check", required=True,
        help="check name (exact or substring match against check-runs.name)",
    )
    p_pr_wait_check.add_argument(
        "--timeout-s", dest="timeout_s", type=int, default=600,
        help="overall wait budget in seconds (default: 600)",
    )
    p_pr_wait_check.add_argument(
        "--poll-interval-s", dest="poll_interval_s", type=int, default=30,
        help="poll cadence in seconds (default: 30)",
    )
    add_repo_args(p_pr_wait_check)
    p_pr_wait_check.set_defaults(func=cmd_pr_wait_check)

    p_pr_wait_required = pr_sub.add_parser(
        "wait-required",
        help=(
            "wait for all branch-protection-required checks on a PR to "
            "reach terminal state (substrate-attested)"
        ),
    )
    p_pr_wait_required.add_argument("pr", type=int)
    p_pr_wait_required.add_argument(
        "--timeout-s", dest="timeout_s", type=int, default=1800,
        help="overall wait budget in seconds (default: 1800)",
    )
    p_pr_wait_required.add_argument(
        "--poll-interval-s", dest="poll_interval_s", type=int, default=30,
        help="poll cadence in seconds (default: 30)",
    )
    add_repo_args(p_pr_wait_required)
    p_pr_wait_required.set_defaults(func=cmd_pr_wait_required)

    # READ-ONLY: list files changed in a PR. Closes the substrate gap that
    # forced policy-approver Phase 2 to fall back to bare `gh api`.
    p_pr_files = pr_sub.add_parser(
        "files",
        help="list files changed in a PR (read-only)",
    )
    p_pr_files.add_argument("pr", type=int)
    p_pr_files.add_argument(
        "--json", action="store_true",
        help="emit full GitHub file objects (status, filename, additions, "
             "deletions, changes, patch, blob_url, raw_url, contents_url) "
             "instead of the default `<status>\\t<filename>` table",
    )
    add_repo_args(p_pr_files)
    p_pr_files.set_defaults(func=cmd_pr_files)

    p_pr_create = pr_sub.add_parser("create")
    p_pr_create.add_argument("--title", required=True)
    p_pr_create.add_argument("--head", required=True, help="source branch")
    p_pr_create.add_argument("--base", default="main")
    p_pr_create.add_argument("--body", default="")
    p_pr_create.add_argument("--body-file", default=None,
                             help="read body from file (sidesteps PreToolUse regex false-positives)")
    add_repo_args(p_pr_create)
    p_pr_create.set_defaults(func=cmd_pr_create)

    # substrate-attested approval — the governed counterpart to the
    # disabled `pr review --approve`. Emits a signed `review_approved`
    # substrate act + posts a discovery marker comment on the PR.
    p_pr_substrate_approve = pr_sub.add_parser(
        "substrate-approve",
        help="emit substrate-attested approval (signed act + indexable marker)",
    )
    p_pr_substrate_approve.add_argument("pr", type=int)
    p_pr_substrate_approve.add_argument(
        "--reason", required=True,
        help="explanation of why approval is granted; appears in the signed act payload",
    )
    p_pr_substrate_approve.add_argument(
        "--via-delegation", dest="via_delegation", default=None, metavar="GRANT_ADDRESS",
        help=(
            "Honor a substrate-attested approval delegation grant when "
            "approving. The granting peer must differ from the PR author. "
            "Twin emits the approval acting_for the grant's from_peer; the "
            "two-person rule is preserved at the SUBSTRATE level (substrate "
            "trail shows operator's grant + Twin's execution, both signed) "
            "rather than at the GitHub-login level. Grant body shape: "
            "arq://act/approval_delegation_grant/<from-login>__<to-peer-suffix>__<ts> "
            "with payload {from_peer, from_login, to_peer, scope, expires_at}. "
            "Fixes arq://body/arqera_delegation_authority_gap/v1."
        ),
    )
    p_pr_substrate_approve.add_argument(
        "--as", dest="as_peer", default=None, metavar="PEER_ADDRESS",
        help=(
            "Record the approval under a distinct bot-peer identity (e.g. "
            "arq://body/peer/arqera-policy-bot-v1) instead of the running "
            "user's Twin peer. Per "
            "arq://doc/principle/arqera-policy-bot-distinct-identity-v1 "
            "(DRAFT), the bot's GitHub App credentials MUST be provisioned "
            "via arq-connection at github.com/<bot-slug>.{app-id,private-key} "
            "before this flag will sign anything; absent credentials yield "
            "an operator_tier_surface act and exit code 3."
        ),
    )
    add_repo_args(p_pr_substrate_approve)
    p_pr_substrate_approve.set_defaults(func=cmd_pr_substrate_approve)

    p_pr_review = pr_sub.add_parser("review")
    p_pr_review.add_argument("pr", type=int)
    group = p_pr_review.add_mutually_exclusive_group(required=True)
    group.add_argument("--approve", dest="decision", action="store_const", const="approve")
    group.add_argument(
        "--request-changes", dest="decision", action="store_const", const="request-changes"
    )
    group.add_argument("--comment", dest="decision", action="store_const", const="comment")
    p_pr_review.add_argument("--body", default="")
    p_pr_review.add_argument("--body-file", default=None,
                             help="read body from file")
    add_repo_args(p_pr_review)
    p_pr_review.set_defaults(func=cmd_pr_review)

    # review-threads — READ-ONLY governed inspection of PR review threads.
    # Closes the substrate gap arq-github-review-thread-inspection-v1
    # surfaced by Seer/Sentry findings on PR #3687 (2026-05-12). Only `list`
    # is registered. There is intentionally NO resolve/dismiss/delete/reply
    # verb here — those are write actions and live elsewhere when (if) they
    # are added under a separate, audited principle.
    p_pr_review_threads = pr_sub.add_parser(
        "review-threads",
        help="read-only inspection of PR review threads (Seer/Sentry comments)",
    )
    review_threads_sub = p_pr_review_threads.add_subparsers(
        dest="review_threads_verb", required=True,
    )
    p_pr_review_threads_list = review_threads_sub.add_parser(
        "list",
        help="list review threads on a PR (read-only, GraphQL-backed)",
    )
    p_pr_review_threads_list.add_argument("pr", type=int)
    p_pr_review_threads_list.add_argument(
        "--include-resolved", action="store_true",
        help="include threads that are already resolved (default: only unresolved)",
    )
    p_pr_review_threads_list.add_argument(
        "--json", action="store_true",
        help="emit JSON array of {thread_id, is_resolved, is_outdated, path, line, comments}",
    )
    add_repo_args(p_pr_review_threads_list)
    p_pr_review_threads_list.set_defaults(func=cmd_pr_review_threads_list)

    p_pr_review_threads_resolve = review_threads_sub.add_parser(
        "resolve",
        help="mark a review thread as resolved (GraphQL resolveReviewThread)",
    )
    p_pr_review_threads_resolve.add_argument("pr", type=int)
    p_pr_review_threads_resolve.add_argument(
        "--thread", required=True,
        help="thread node id from `pr review-threads list --json` (e.g. PRRT_…)",
    )
    add_repo_args(p_pr_review_threads_resolve)
    p_pr_review_threads_resolve.set_defaults(func=cmd_pr_review_threads_resolve)

    p_pr_review_threads_reply = review_threads_sub.add_parser(
        "reply",
        help="post a threaded reply to a review thread (GraphQL addPullRequestReviewThreadReply)",
    )
    p_pr_review_threads_reply.add_argument("pr", type=int)
    p_pr_review_threads_reply.add_argument(
        "--thread", required=True,
        help="thread node id from `pr review-threads list --json`",
    )
    p_pr_review_threads_reply.add_argument("--body", default="")
    p_pr_review_threads_reply.add_argument(
        "--body-file", default=None,
        help="read body from file (sidesteps PreToolUse regex false-positives)",
    )
    add_repo_args(p_pr_review_threads_reply)
    p_pr_review_threads_reply.set_defaults(func=cmd_pr_review_threads_reply)

    p_pr_merge = pr_sub.add_parser("merge")
    p_pr_merge.add_argument("pr", type=int)
    mgroup = p_pr_merge.add_mutually_exclusive_group(required=True)
    mgroup.add_argument("--squash", dest="method", action="store_const", const="squash")
    mgroup.add_argument("--merge", dest="method", action="store_const", const="merge")
    mgroup.add_argument("--rebase", dest="method", action="store_const", const="rebase")
    p_pr_merge.add_argument("--auto", action="store_true", help="enable auto-merge via GraphQL")
    add_repo_args(p_pr_merge)
    p_pr_merge.set_defaults(func=cmd_pr_merge)

    # merge-via-substrate — substrate-decided + GitHub-dispatched merge
    # Implements arq://doc/principle/merge-decision-on-substrate-v1 (DRAFT)
    p_pr_mvs = pr_sub.add_parser(
        "merge-via-substrate",
        help="substrate-decided merge: emit merge_decided act with provenance, "
             "then dispatch to GitHub. Implements merge-decision-on-substrate-v1 (DRAFT).",
    )
    p_pr_mvs.add_argument("pr", type=int)
    mvs_method_group = p_pr_mvs.add_mutually_exclusive_group()
    mvs_method_group.add_argument("--squash", dest="method", action="store_const", const="squash")
    mvs_method_group.add_argument("--merge", dest="method", action="store_const", const="merge")
    mvs_method_group.add_argument("--rebase", dest="method", action="store_const", const="rebase")
    p_pr_mvs.set_defaults(method="squash")
    p_pr_mvs.add_argument(
        "--gate-only",
        action="store_true",
        help="Evaluate substrate gates and post arq-merge-gate check-run, but "
             "do not dispatch the GitHub merge. Used for backfill on open PRs "
             "and for dry-run gate evaluation.",
    )
    p_pr_mvs.add_argument(
        "--paired-peer",
        help="Explicit paired-peer attestation when substrate evidence, not a "
             "GitHub PR review, is the cross-peer approval source.",
    )
    add_repo_args(p_pr_mvs)
    p_pr_mvs.set_defaults(func=cmd_pr_merge_via_substrate)

    p_pr_retarget = pr_sub.add_parser(
        "retarget", help="change a PR's base branch (for stacked-PR cascades)"
    )
    p_pr_retarget.add_argument("pr", type=int)
    p_pr_retarget.add_argument("--base", required=True, help="new base branch (e.g. 'main')")
    add_repo_args(p_pr_retarget)
    p_pr_retarget.set_defaults(func=cmd_pr_retarget)

    p_pr_close = pr_sub.add_parser(
        "close", help="close a PR without merging (superseded, abandoned, etc.)"
    )
    p_pr_close.add_argument("pr", type=int)
    p_pr_close.add_argument("--reason", default=None, help="comment body explaining the close")
    add_repo_args(p_pr_close)
    p_pr_close.set_defaults(func=cmd_pr_close)

    p_pr_edit_body = pr_sub.add_parser(
        "edit-body", help="edit a PR's body (description); replace or append"
    )
    p_pr_edit_body.add_argument("pr", type=int)
    body_group = p_pr_edit_body.add_mutually_exclusive_group(required=True)
    body_group.add_argument("--body", help="inline body text")
    body_group.add_argument("--body-file", help="path to a file containing the body text")
    p_pr_edit_body.add_argument("--append", action="store_true", help="append to existing body rather than replace")
    add_repo_args(p_pr_edit_body)
    p_pr_edit_body.set_defaults(func=cmd_pr_edit_body)


    # repo — governed read-only repository enumeration. Closes the
    # arq-github-enterprise-repo-discovery-v1 substrate gap surfaced during
    # the 2026-05-12 enterprise inventory: previously, repo discovery
    # required scanning local clones (incomplete) or bare `gh repo list`
    # (mesh-blocked). Only `list` is exposed today — there is intentionally
    # no `create`, `delete`, `archive`, or any write verb.
    repo_parent = sub.add_parser(
        "repo", help="repository read verbs (governed enumeration)"
    )
    repo_sub = repo_parent.add_subparsers(dest="repo_verb", required=True)

    p_repo_list = repo_sub.add_parser(
        "list",
        help="enumerate repos under an organization (read-only)",
    )
    p_repo_list.add_argument(
        "--owner", default="Arqera-IO",
        help="organization to enumerate (default: Arqera-IO)",
    )
    p_repo_list.add_argument(
        "--type", default="all",
        choices=["all", "public", "private", "forks", "sources", "member"],
        help="GitHub repo-list filter (default: all)",
    )
    p_repo_list.add_argument(
        "--sort", default="updated",
        choices=["created", "updated", "pushed", "full_name"],
        help="GitHub sort key (default: updated)",
    )
    p_repo_list.add_argument(
        "--limit", type=int, default=200,
        help="max repos to return after filtering (default 200; hard cap 2000)",
    )
    p_repo_list.add_argument(
        "--include-archived", action="store_true",
        help="include archived repos in the output (default: exclude)",
    )
    p_repo_list.add_argument(
        "--json", action="store_true",
        help="emit JSON array of {name, default_branch, archived, visibility, updated_at}",
    )
    p_repo_list.set_defaults(func=cmd_repo_list)

    # `repo edit` — substrate-attested repo property mutations. v1 supports
    # only --visibility (public|private|internal); other repo properties
    # (default_branch, description, homepage, archived) are NOT exposed yet.
    # Substrate envelope trail is captured per ENVELOPE-OBLIGATIONS.
    p_repo_edit = repo_sub.add_parser(
        "edit",
        help="edit repository properties (substrate-attested write verb; v1: visibility only)",
    )
    p_repo_edit.add_argument(
        "--owner", default="Arqera-IO",
        help="repo owner (default: Arqera-IO)",
    )
    p_repo_edit.add_argument(
        "--repo", required=True,
        help="repository name (without owner/ prefix)",
    )
    p_repo_edit.add_argument(
        "--visibility", required=True,
        choices=["public", "private", "internal"],
        help="new visibility — GitHub PATCH /repos/{owner}/{repo} {visibility}",
    )
    p_repo_edit.add_argument(
        "--accept-visibility-change-consequences", action="store_true",
        help="GitHub requires this flag-equivalent for any visibility flip — must be explicit",
    )
    p_repo_edit.set_defaults(func=cmd_repo_edit)

    # `repo create` — substrate-attested repo creation. Unblocks
    # publication of mesh-native polyglot clients (go/elixir/ts/...).
    # Mirrors `repo edit` envelope pattern; org repo by default
    # (Arqera-IO), supports --owner @me for personal repos.
    p_repo_create = repo_sub.add_parser(
        "create",
        help="create a new repository (substrate-attested write verb)",
    )
    p_repo_create.add_argument(
        "--owner", default="Arqera-IO",
        help="repo owner: org name or '@me' for personal (default: Arqera-IO)",
    )
    p_repo_create.add_argument(
        "--repo", required=True,
        help="new repository name (without owner/ prefix)",
    )
    p_repo_create.add_argument(
        "--visibility", default="private",
        choices=["public", "private", "internal"],
        help="initial visibility (default: private)",
    )
    p_repo_create.add_argument(
        "--description", default=None,
        help="one-line repository description",
    )
    p_repo_create.add_argument(
        "--homepage", default=None,
        help="optional homepage URL",
    )
    p_repo_create.set_defaults(func=cmd_repo_create)

    # project subcommand — attach issues/PRs to an org-level Project v2
    proj = sub.add_parser("project", help="project board operations (v2)")
    proj_sub = proj.add_subparsers(dest="proj_verb", required=True)
    p_proj_add = proj_sub.add_parser(
        "add", help="attach an issue or PR to an org Project v2"
    )
    p_proj_add.add_argument("issue", type=int, help="issue or PR number to attach")
    p_proj_add.add_argument(
        "--org", default="Arqera-IO", help="org that owns the project (default: Arqera-IO)"
    )
    p_proj_add.add_argument(
        "--project", type=int, default=1, help="project number (default: 1)"
    )
    add_repo_args(p_proj_add)
    p_proj_add.set_defaults(func=cmd_project_add)

    args = parser.parse_args()

    # Normalise --repo=Owner/Name into owner=Owner, repo=Name. Without this,
    # passing the slash-form would double-prefix into /repos/Owner/Owner/Name/...
    # (404 from GitHub). Bare repo name (Name only) keeps the default owner.
    # Surfaced 2026-05-16 during runtime-activation arc when arq-github pr list
    # --repo Arqera-IO/ARQERA produced /repos/Arqera-IO/Arqera-IO/ARQERA/pulls.
    if getattr(args, "repo", None) and "/" in args.repo:
        owner_part, _, repo_part = args.repo.partition("/")
        if owner_part and repo_part and "/" not in repo_part:
            args.owner = owner_part
            args.repo = repo_part

    return args.func(args)


if __name__ == "__main__":
    sys.exit(main())
