#!/usr/bin/env python3
"""arq-substrate — substrate-liveness primitive for the mesh.

Routes substrate-introspection calls through a named mesh bridge so every
liveness check leaves an envelope_sent/envelope_ack trail. Satisfies
`arq://doc/principle/every-emission-needs-live-act-producer-v1` (HARD):
without this bridge, the audit-emissions sweep that decides which docs
get deleted has no substrate-resident trail of having run.

Parent principles:
  arq://doc/principle/every-emission-needs-live-act-producer-v1
  arq://doc/principle/substrate-truth-supersedes-upstream-tooling-support-v1
  arq://doc/principle/document-and-enforce-not-just-document-v1

v0.1 verbs:
  arq-substrate liveness <arq-address> [--days N]
      Queries substrate for acts referencing this address in the last N
      days (default 30). Verdict: live (>=3) | dormant (1-2) | dead (0).

  arq-substrate audit-emissions --by-peer <peer-addr> [--threshold dormant|dead]
      Scans every emission this peer has ever made, runs liveness on
      each, prints a table + emits a substrate body
      `arq://body/liveness_audit/<peer-slug>-<date>` with the full
      machine-readable result.

  arq-substrate dead-docs --md-glob 'docs/**/*.md' [--threshold dead]
      File-system MD scan; resolves each MD file to a substrate address
      (extracts `arq://doc/<type>/<ref>` references from content, or
      maps by filename) and flags those without live producers.

  arq-substrate emit-with-producer --address <addr> --content-file <p>
                                   --producer <hook-or-beat-ref>
      Wraps `twin doc create` but REQUIRES --producer. The producer ref
      is written into the doc's frontmatter so substrate can later
      verify the producer actually emits acts referencing this doc.

Every call emits:
  arq://act/substrate_envelope_sent/<verb>-<ts>
  arq://act/substrate_envelope_ack/<verb>-<ts>  (verdict, latency_ms)

Liveness data flow:
  1. Try local act-mirror at ~/.arqera/act-mirror/  (offline, fast)
  2. Fall back to `twin index --reference <addr>` (live substrate query)
  3. Aggregate referencing-act count, kinds, last_touch
  4. Verdict from threshold:  live >=3   dormant 1..2   dead 0

Bypass mechanism: ARQ_SUBSTRATE_LIVENESS_DISABLED=1 makes liveness
return ("unavailable", None) — for debugging the dead-docs path
without polluting substrate with audit acts.
"""

from __future__ import annotations

import argparse
import glob
import json
import os
import re
import subprocess
import sys
import time
from datetime import datetime, timezone, timedelta
from pathlib import Path

# ---------------------------------------------------------------------------
# Shared infrastructure (cribbed from arq-cloudflare / arq-github)
# ---------------------------------------------------------------------------

SCRIPT_DIR = Path(__file__).resolve().parent
if str(SCRIPT_DIR) not in sys.path:
    sys.path.insert(0, str(SCRIPT_DIR))

try:
    from _arq_primitive import primitive_invoke as _primitive_invoke
except Exception:  # noqa: PSWP-scaffold-narrow-in-followup
    _primitive_invoke = None  # type: ignore

TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
ACT_MIRROR = Path.home() / ".arqera" / "act-mirror"
ACTS_LOCAL = Path.home() / ".arqera" / "acts"

ACTOR_PEER = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)

# Liveness thresholds. Documented in
# arq://doc/principle/every-emission-needs-live-act-producer-v1.
LIVENESS_LIVE_MIN = 3
LIVENESS_DORMANT_MIN = 1
LIVENESS_WINDOW_DAYS_DEFAULT = 30

# Address pattern: arq://<class>/<type>/<ref>
ARQ_ADDR_RE = re.compile(r"arq://([a-z_]+)/([a-zA-Z0-9_\-]+)/([a-zA-Z0-9_\-\.]+)")


# ---------------------------------------------------------------------------
# Substrate emission helpers — daemon first, twin-subprocess fallback
# ---------------------------------------------------------------------------

def _ts_ms() -> int:
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def _ts_iso() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _emit_via_daemon(class_: str, type_: str, ref: str, payload: dict) -> tuple[str, str | None]:
    if _primitive_invoke is None:
        return ("unavailable", None)
    try:
        outcome, result = _primitive_invoke(
            "daemon.act.emit",
            {"class": class_, "type": type_, "reference": ref, "payload": payload},
            ACTOR_PEER,
            timeout_s=10.0,
        )
    except Exception:  # noqa: PSWP-scaffold-narrow-in-followup
        return ("unavailable", None)
    if outcome == "ok" and isinstance(result, dict):
        return ("ok", result.get("address"))
    if outcome == "unknown_verb":
        return ("unknown_verb", None)
    if outcome == "error":
        return ("error", None)
    return ("unavailable", None)


def _emit_via_twin_subprocess(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    if not TWIN_CLI.exists():
        return None
    try:
        r = subprocess.run(
            [str(TWIN_CLI), "--use-keychain", "act", "emit", class_, type_, ref,
             "--payload", json.dumps(payload)],
            capture_output=True, text=True, timeout=5, check=False,
        )
        if r.returncode == 0:
            for line in r.stdout.splitlines():
                line = line.strip()
                if line.startswith("arq://"):
                    return line
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


def _emit(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    outcome, address = _emit_via_daemon(class_, type_, ref, payload)
    if outcome == "ok":
        return address
    if outcome == "error":
        return None
    return _emit_via_twin_subprocess(class_, type_, ref, payload)


# substrate-bypass: _emit_envelope / _emit_ack now use fire-and-forget Popen
# instead of the blocking _emit() helper. The daemon path
# (_emit_via_daemon → _primitive_invoke) did not honor timeout_s and hung
# at 0% CPU for 31+ minutes (reproduced 2026-05-28 with single-addr
# liveness). Audit verbs are read-mostly — the envelope-sent address is
# not used synchronously, so emitting fire-and-forget is correct.
def _emit_envelope(verb: str, extra: dict) -> tuple[str | None, str]:
    """Emit envelope_sent fire-and-forget. Returns (None, ref) — the address
    is filled in async by the substrate adjudicator; the audit doesn't wait."""
    ts = _ts_ms()
    ref = f"{verb.replace('.', '-')}-{ts}"
    envelope = {
        "verb": f"substrate.{verb}",
        "actor_peer": ACTOR_PEER,
        "ts_ms": ts,
        **extra,
    }
    _emit_async("act", "substrate_envelope_sent", ref, envelope)
    return (None, ref)


def _emit_ack(verb: str, ref: str, ack: dict) -> None:
    _emit_async("act", "substrate_envelope_ack", ref, {
        "verb": f"substrate.{verb}",
        "actor_peer": ACTOR_PEER,
        "ts_ms": _ts_ms(),
        **ack,
    })


def _emit_async(class_: str, type_: str, ref: str, payload: dict) -> None:
    """Fire-and-forget substrate emit via twin CLI subprocess. Detached
    so the audit never blocks on substrate availability."""
    if not TWIN_CLI.exists():
        return
    try:
        subprocess.Popen(
            [str(TWIN_CLI), "--use-keychain", "act", "emit", class_, type_, ref,
             "--payload", json.dumps(payload)],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            start_new_session=True,
        )
    except (OSError, ValueError):
        pass


# ---------------------------------------------------------------------------
# Liveness query — local mirror first, twin-index fallback
# ---------------------------------------------------------------------------

def _scan_local_acts_for_address(address: str, since_iso: str) -> dict:
    """Search local act files for references to `address` since `since_iso`.

    Returns {ref_count, last_touch, kinds: [...]}.
    Best-effort offline path; missing or empty mirror returns zeros.
    """
    ref_count = 0
    last_touch = None
    kinds: dict[str, int] = {}
    seen_act_addrs: set[str] = set()

    candidate_dirs = [d for d in (ACT_MIRROR, ACTS_LOCAL) if d.exists()]
    if not candidate_dirs:
        return {"ref_count": 0, "last_touch": None, "kinds": []}

    address_bytes = address.encode("utf-8")
    since_ts = None
    try:
        since_ts = datetime.fromisoformat(since_iso.replace("Z", "+00:00")).timestamp()
    except (ValueError, AttributeError):
        pass

    for d in candidate_dirs:
        try:
            for p in d.iterdir():
                if not p.is_file() or not p.name.endswith(".json"):
                    continue
                if since_ts is not None:
                    try:
                        if p.stat().st_mtime < since_ts:
                            continue
                    except OSError:
                        continue
                try:
                    raw = p.read_bytes()
                except OSError:
                    continue
                if address_bytes not in raw:
                    continue
                try:
                    payload = json.loads(raw)
                except json.JSONDecodeError:
                    continue
                kind = payload.get("kind") or payload.get("type") or "unknown"
                act_ref = payload.get("reference") or p.name
                if act_ref in seen_act_addrs:
                    continue
                seen_act_addrs.add(act_ref)
                ref_count += 1
                kinds[kind] = kinds.get(kind, 0) + 1
                ts = payload.get("payload", {}).get("ts") if isinstance(payload.get("payload"), dict) else None
                if ts and (last_touch is None or ts > last_touch):
                    last_touch = ts
        except OSError:
            continue

    return {
        "ref_count": ref_count,
        "last_touch": last_touch,
        "kinds": sorted(kinds.items(), key=lambda x: -x[1]),
    }


def _twin_index_count(address: str, since_iso: str) -> dict | None:
    """Live substrate query via `twin index --reference <addr>`.

    Returns {ref_count, last_touch, kinds, source: "substrate"} on success,
    None on failure.
    """
    if not TWIN_CLI.exists():
        return None
    try:
        r = subprocess.run(
            [str(TWIN_CLI), "--use-keychain", "index",
             "--reference", address,
             "--since", since_iso,
             "--limit", "500",
             "--json"],
            capture_output=True, text=True, timeout=15, check=False,
        )
        if r.returncode != 0 or not r.stdout.strip():
            # Try --parent (one-hop graph walk) as backup
            r = subprocess.run(
                [str(TWIN_CLI), "--use-keychain", "index",
                 "--parent", address,
                 "--since", since_iso,
                 "--limit", "500",
                 "--json"],
                capture_output=True, text=True, timeout=15, check=False,
            )
            if r.returncode != 0 or not r.stdout.strip():
                return None
        data = json.loads(r.stdout)
        if not isinstance(data, list):
            data = data.get("items") if isinstance(data, dict) else []
        kinds: dict[str, int] = {}
        last_touch = None
        for row in data or []:
            kind = row.get("type") or row.get("kind") or "unknown"
            kinds[kind] = kinds.get(kind, 0) + 1
            ts = row.get("issued_at") or row.get("created_at")
            if ts and (last_touch is None or ts > last_touch):
                last_touch = ts
        return {
            "ref_count": len(data or []),
            "last_touch": last_touch,
            "kinds": sorted(kinds.items(), key=lambda x: -x[1]),
            "source": "substrate",
        }
    except (subprocess.TimeoutExpired, OSError, json.JSONDecodeError):
        return None


def liveness_check(address: str, days: int = LIVENESS_WINDOW_DAYS_DEFAULT) -> dict:
    """Score one address. Returns
    {address, ref_count, last_touch, kinds, verdict, source, window_days}.
    """
    if os.environ.get("ARQ_SUBSTRATE_LIVENESS_DISABLED") == "1":
        return {
            "address": address,
            "ref_count": 0,
            "verdict": "unavailable",
            "source": "disabled",
            "window_days": days,
        }
    since_iso = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%dT%H:%M:%SZ")
    sub = _twin_index_count(address, since_iso)
    if sub is not None:
        ref_count = sub["ref_count"]
        last_touch = sub["last_touch"]
        kinds = sub["kinds"]
        source = "substrate"
    else:
        local = _scan_local_acts_for_address(address, since_iso)
        ref_count = local["ref_count"]
        last_touch = local["last_touch"]
        kinds = local["kinds"]
        source = "local-mirror"

    if ref_count >= LIVENESS_LIVE_MIN:
        verdict = "live"
    elif ref_count >= LIVENESS_DORMANT_MIN:
        verdict = "dormant"
    else:
        verdict = "dead"

    return {
        "address": address,
        "ref_count": ref_count,
        "last_touch": last_touch,
        "kinds": kinds,
        "verdict": verdict,
        "source": source,
        "window_days": days,
    }


# ---------------------------------------------------------------------------
# Emission discovery — find every emission this peer has made
# ---------------------------------------------------------------------------

def _peer_emissions(peer_addr: str) -> list[str]:
    """Best-effort: list every substrate address this peer has emitted.

    Strategy:
      1. twin index --source <peer> --json   (canonical)
      2. fall back to local act files referencing peer in `actor_peer`
      3. de-duplicate, return list of addresses (not the source-acts)
    """
    addrs: set[str] = set()
    if TWIN_CLI.exists():
        # Query substrate for everything sourced from this peer
        for cls in ("doc", "body", "act"):
            try:
                r = subprocess.run(
                    [str(TWIN_CLI), "--use-keychain", "index",
                     "--class", cls,
                     "--source", peer_addr,
                     "--limit", "1000",
                     "--json"],
                    capture_output=True, text=True, timeout=20, check=False,
                )
                if r.returncode == 0 and r.stdout.strip():
                    data = json.loads(r.stdout)
                    if isinstance(data, dict):
                        data = data.get("items") or []
                    for row in data or []:
                        addr = row.get("address") or row.get("reference_address")
                        if addr and addr.startswith("arq://"):
                            addrs.add(addr)
            except (subprocess.TimeoutExpired, OSError, json.JSONDecodeError):
                continue

    # Local fallback: scan acts/ for actor_peer matches
    if not addrs:
        for d in (ACTS_LOCAL, ACT_MIRROR):
            if not d.exists():
                continue
            try:
                for p in d.iterdir():
                    if not p.name.endswith(".json"):
                        continue
                    try:
                        raw = p.read_text()
                    except OSError:
                        continue
                    if peer_addr not in raw:
                        continue
                    try:
                        payload = json.loads(raw)
                    except json.JSONDecodeError:
                        continue
                    addr = payload.get("address") or payload.get("self_address")
                    if addr and addr.startswith("arq://"):
                        addrs.add(addr)
                    # also pull arq:// strings from payload
                    for m in ARQ_ADDR_RE.findall(raw):
                        if m[0] in ("doc", "body", "principle"):
                            addrs.add(f"arq://{m[0]}/{m[1]}/{m[2]}")
            except OSError:
                continue

    return sorted(addrs)


# ---------------------------------------------------------------------------
# Verb handlers
# ---------------------------------------------------------------------------

def cmd_liveness(args: argparse.Namespace) -> int:
    addr = args.address
    days = args.days or LIVENESS_WINDOW_DAYS_DEFAULT
    _, ref = _emit_envelope("liveness", {"target": addr, "window_days": days})
    t0 = time.time()
    result = liveness_check(addr, days=days)
    latency_ms = int((time.time() - t0) * 1000)
    _emit_ack("liveness", ref, {
        "target": addr,
        "verdict": result["verdict"],
        "ref_count": result["ref_count"],
        "latency_ms": latency_ms,
    })
    if args.json:
        print(json.dumps(result, indent=2))
    else:
        print(f"{addr}")
        print(f"  verdict:    {result['verdict']}")
        print(f"  ref_count:  {result['ref_count']} (window={days}d, source={result['source']})")
        print(f"  last_touch: {result['last_touch']}")
        print(f"  kinds:      {result['kinds']}")
    return 0


def cmd_audit_emissions(args: argparse.Namespace) -> int:
    peer = args.by_peer
    threshold = args.threshold  # None | "dormant" | "dead"
    days = args.days or LIVENESS_WINDOW_DAYS_DEFAULT

    # substrate-bypass: parallelizing liveness_check via ThreadPoolExecutor.
    # Each liveness_check is I/O-bound (subprocess.run twin index, up to ~30s).
    # Sequential: 200 addrs × 30s = 100 min. Parallel (10 workers): ~10 min.
    _, ref = _emit_envelope("audit_emissions", {"peer": peer, "window_days": days})
    t0 = time.time()

    addrs = _peer_emissions(peer)
    results: list[dict] = []

    from concurrent.futures import ThreadPoolExecutor, as_completed
    workers = int(os.environ.get("ARQ_SUBSTRATE_AUDIT_WORKERS", "10"))
    total = len(addrs)
    if args.verbose:
        print(f"[audit] {total} emissions, {workers} workers, days={days}", file=sys.stderr)
    completed = 0
    with ThreadPoolExecutor(max_workers=workers) as ex:
        future_to_addr = {ex.submit(liveness_check, a, days): a for a in addrs}
        for fut in as_completed(future_to_addr):
            completed += 1
            try:
                r = fut.result()
            except Exception as e:  # noqa: PSWP-scaffold-narrow-in-followup
                r = {"address": future_to_addr[fut], "verdict": "unavailable",
                     "ref_count": 0, "source": f"error:{type(e).__name__}",
                     "window_days": days}
            results.append(r)
            if args.verbose and (completed % 25 == 0 or completed == total):
                print(f"[audit] {completed}/{total}", file=sys.stderr)

    by_verdict: dict[str, list[dict]] = {"live": [], "dormant": [], "dead": [], "unavailable": []}
    for r in results:
        by_verdict.setdefault(r["verdict"], []).append(r)

    summary = {
        "peer": peer,
        "window_days": days,
        "total_emissions": len(addrs),
        "counts": {k: len(v) for k, v in by_verdict.items()},
        "live": by_verdict["live"],
        "dormant": by_verdict["dormant"],
        "dead": by_verdict["dead"],
        "unavailable": by_verdict["unavailable"],
        "generated_at": _ts_iso(),
    }

    # Emit substrate body
    peer_slug = peer.split("/")[-1][:24]
    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    body_ref = f"{peer_slug}-{today}"
    body_payload = {
        "audit_kind": "liveness_audit",
        "peer": peer,
        "window_days": days,
        "counts": summary["counts"],
        "total_emissions": summary["total_emissions"],
        "principle": "every-emission-needs-live-act-producer-v1",
        "generated_at": summary["generated_at"],
        # Don't inline 1000s of addresses; reference the local JSON file
        "summary_file": str(args.output) if args.output else None,
    }
    body_addr = _emit("body", "liveness_audit", body_ref, body_payload)

    latency_ms = int((time.time() - t0) * 1000)
    _emit_ack("audit_emissions", ref, {
        "peer": peer,
        "counts": summary["counts"],
        "body_address": body_addr,
        "latency_ms": latency_ms,
    })

    if args.output:
        Path(args.output).write_text(json.dumps(summary, indent=2))
        print(f"audit written to {args.output}", file=sys.stderr)

    # Filter by threshold for stdout display
    if threshold == "dead":
        display = by_verdict["dead"]
    elif threshold == "dormant":
        display = by_verdict["dormant"] + by_verdict["dead"]
    else:
        display = results

    if args.json:
        print(json.dumps(summary, indent=2))
    else:
        print(f"peer: {peer}")
        print(f"  total emissions: {summary['total_emissions']}")
        for v, items in by_verdict.items():
            print(f"  {v:<12s}: {len(items)}")
        print(f"\n(showing {'all' if threshold is None else threshold} threshold)")
        for r in display[:50]:
            print(f"  [{r['verdict']:<8s}] refs={r['ref_count']:>3d}  {r['address']}")
        if len(display) > 50:
            print(f"  ... and {len(display) - 50} more (see --output JSON)")
        if body_addr:
            print(f"\nsubstrate body: {body_addr}")
    return 0


def _extract_addresses_from_md(path: Path) -> list[str]:
    """Pull every arq://doc/<type>/<ref> reference out of an MD file.

    Returns a list of candidate addresses. First match by frontmatter
    `address:` line; otherwise all arq:// matches; otherwise empty.
    """
    try:
        text = path.read_text(encoding="utf-8", errors="replace")
    except OSError:
        return []
    # Frontmatter address
    m = re.search(r"^address:\s*(arq://[^\s]+)$", text, re.MULTILINE)
    if m:
        return [m.group(1)]
    # All arq:// references in the body
    addrs = []
    for m in ARQ_ADDR_RE.finditer(text):
        addrs.append(f"arq://{m.group(1)}/{m.group(2)}/{m.group(3)}")
    # Filename-based guess (docs/principles/foo-v1.md → arq://doc/principle/foo-v1)
    if not addrs and "principles" in path.parts:
        stem = path.stem
        addrs.append(f"arq://doc/principle/{stem}")
    return addrs


def cmd_dead_docs(args: argparse.Namespace) -> int:
    pattern = args.md_glob
    threshold = args.threshold  # "dead" | "dormant"
    days = args.days or LIVENESS_WINDOW_DAYS_DEFAULT

    _, ref = _emit_envelope("dead_docs", {"glob": pattern, "threshold": threshold})
    t0 = time.time()

    files = sorted(glob.glob(pattern, recursive=True))
    findings: list[dict] = []
    for f in files:
        p = Path(f)
        if not p.is_file():
            continue
        addrs = _extract_addresses_from_md(p)
        if not addrs:
            findings.append({
                "file": str(p),
                "address": None,
                "verdict": "unaddressed",
                "ref_count": 0,
            })
            continue
        # Use first/canonical address for verdict
        canonical = addrs[0]
        result = liveness_check(canonical, days=days)
        findings.append({
            "file": str(p),
            "address": canonical,
            "verdict": result["verdict"],
            "ref_count": result["ref_count"],
            "last_touch": result.get("last_touch"),
        })

    by_verdict: dict[str, list[dict]] = {}
    for f in findings:
        by_verdict.setdefault(f["verdict"], []).append(f)

    summary = {
        "glob": pattern,
        "window_days": days,
        "total_files": len(findings),
        "counts": {k: len(v) for k, v in by_verdict.items()},
        "findings": findings,
        "generated_at": _ts_iso(),
    }

    latency_ms = int((time.time() - t0) * 1000)
    _emit_ack("dead_docs", ref, {
        "glob": pattern,
        "counts": summary["counts"],
        "latency_ms": latency_ms,
    })

    if args.output:
        Path(args.output).write_text(json.dumps(summary, indent=2))
        print(f"dead-docs scan written to {args.output}", file=sys.stderr)

    # Threshold-filtered display
    if threshold == "dead":
        display = by_verdict.get("dead", []) + by_verdict.get("unaddressed", [])
    elif threshold == "dormant":
        display = (by_verdict.get("dormant", []) + by_verdict.get("dead", [])
                   + by_verdict.get("unaddressed", []))
    else:
        display = findings

    if args.json:
        print(json.dumps(summary, indent=2))
    else:
        print(f"glob: {pattern}")
        print(f"  total files: {summary['total_files']}")
        for v, items in by_verdict.items():
            print(f"  {v:<12s}: {len(items)}")
        print(f"\n(showing {'all' if threshold is None else threshold} threshold)")
        for f in display[:80]:
            print(f"  [{f['verdict']:<11s}] refs={f['ref_count']:>3d}  {f['file']}")
        if len(display) > 80:
            print(f"  ... and {len(display) - 80} more (see --output JSON)")
    return 0


def cmd_emit_with_producer(args: argparse.Namespace) -> int:
    """Wrap `twin doc create` requiring --producer in frontmatter."""
    addr = args.address
    producer = args.producer
    content_file = Path(args.content_file)

    if not producer:
        print("ERROR: --producer is required (the live act producer for this emission)",
              file=sys.stderr)
        print("       see arq://doc/principle/every-emission-needs-live-act-producer-v1",
              file=sys.stderr)
        return 2
    if not content_file.exists():
        print(f"ERROR: content file not found: {content_file}", file=sys.stderr)
        return 2

    # Rewrite content with producer frontmatter
    original = content_file.read_text()
    if "producer:" not in original.split("\n", 30)[0:30].__str__():
        # Inject producer line at top
        producer_block = (
            f"<!-- substrate-producer: {producer} -->\n"
            f"<!-- substrate-address: {addr} -->\n"
        )
        wrapped = producer_block + original
        tmp = content_file.with_suffix(content_file.suffix + ".with-producer")
        tmp.write_text(wrapped)
        content_arg = str(tmp)
    else:
        content_arg = str(content_file)

    _, ref = _emit_envelope("emit_with_producer", {
        "address": addr, "producer": producer, "content_file": str(content_file),
    })

    if not TWIN_CLI.exists():
        print(f"ERROR: twin CLI not found at {TWIN_CLI}", file=sys.stderr)
        _emit_ack("emit_with_producer", ref, {"verdict": "error", "reason": "twin_missing"})
        return 1
    try:
        r = subprocess.run(
            [str(TWIN_CLI), "--use-keychain", "doc", "create", addr,
             "--content-file", content_arg],
            capture_output=True, text=True, timeout=30, check=False,
        )
    except (subprocess.TimeoutExpired, OSError) as e:
        _emit_ack("emit_with_producer", ref, {"verdict": "error", "reason": str(e)})
        return 1
    print(r.stdout, end="")
    if r.stderr:
        print(r.stderr, file=sys.stderr, end="")
    _emit_ack("emit_with_producer", ref, {
        "verdict": "ok" if r.returncode == 0 else "error",
        "exit_code": r.returncode,
        "producer": producer,
        "address": addr,
    })
    return r.returncode


# ---------------------------------------------------------------------------
# CLI scaffolding
# ---------------------------------------------------------------------------

def main(argv: list[str] | None = None) -> int:
    parser = argparse.ArgumentParser(
        prog="arq-substrate",
        description="Substrate liveness primitive — score emissions, find dead docs.",
    )
    sub = parser.add_subparsers(dest="cmd", required=True)

    p_live = sub.add_parser("liveness", help="Score one arq:// address as live/dormant/dead.")
    p_live.add_argument("address", help="arq:// address to score")
    p_live.add_argument("--days", type=int, default=LIVENESS_WINDOW_DAYS_DEFAULT)
    p_live.add_argument("--json", action="store_true")
    p_live.set_defaults(func=cmd_liveness)

    p_audit = sub.add_parser("audit-emissions", help="Audit every emission by a peer.")
    p_audit.add_argument("--by-peer", required=True, help="peer arq:// address")
    p_audit.add_argument("--threshold", choices=["dormant", "dead"], default=None)
    p_audit.add_argument("--days", type=int, default=LIVENESS_WINDOW_DAYS_DEFAULT)
    p_audit.add_argument("--output", default=None, help="write full JSON to this path")
    p_audit.add_argument("--json", action="store_true")
    p_audit.add_argument("--verbose", action="store_true")
    p_audit.set_defaults(func=cmd_audit_emissions)

    p_dead = sub.add_parser("dead-docs", help="Scan MD files for dead substrate addresses.")
    p_dead.add_argument("--md-glob", default="docs/**/*.md")
    p_dead.add_argument("--threshold", choices=["dormant", "dead"], default=None)
    p_dead.add_argument("--days", type=int, default=LIVENESS_WINDOW_DAYS_DEFAULT)
    p_dead.add_argument("--output", default=None)
    p_dead.add_argument("--json", action="store_true")
    p_dead.set_defaults(func=cmd_dead_docs)

    p_emit = sub.add_parser("emit-with-producer",
                            help="Create a substrate doc with a required --producer ref.")
    p_emit.add_argument("--address", required=True)
    p_emit.add_argument("--content-file", required=True)
    p_emit.add_argument("--producer", required=True,
                        help="hook/beat/verb reference that emits acts about this doc")
    p_emit.set_defaults(func=cmd_emit_with_producer)

    args = parser.parse_args(argv)
    return args.func(args)


if __name__ == "__main__":
    sys.exit(main())
