#!/usr/bin/env python3
"""arq-config — substrate-decided runtime configuration bridge.

Implements arq://doc/principle/substrate-is-the-runtime-config-v1 (DRAFT).

Frame D of the merge-mechanism Honest-Ledger arc: substrate is not just where
decisions land — substrate IS the runtime configuration of every external
system the protocol-network operates. Platforms hold projections of substrate
decisions; substrate is the source of truth.

Three verbs implement the protocol contract:

    arq-config decide <surface> <field> <value> [--scope ...] [--reason ...]
        Emit arq://act/runtime_config_decided/<surface>-<field>-<ts>
        with the desired-state for that field. Scope can be `global`,
        `tenant:<id>`, `pr:<num>`, `request:<id>`, `until:<iso8601>`.
        High-stakes fields (e.g. github.branch-protection.enforce_admins)
        require trust-graded different-peer; the verb refuses single-peer
        toggles by emitting a config_blocked act.

    arq-config reconcile <surface> [--field <field>] [--apply]
        Read most-recent runtime_config_decided act(s) per (surface, field),
        compare to platform-actual, emit:
          - config_drift_observed if observed != desired (always)
          - config_reconciled with apply receipt (only if --apply)
        Without --apply: dry-run reporting drift. With --apply: dispatch to
        platform via the appropriate adapter and close the loop.

    arq-config inspect <surface> [--field <field>]
        Read substrate's substrate's last-decided + last-reconciled state
        for a (surface, field). Returns the audit-grade answer to:
        "what's the operating state of this control?"

Adapters (Phase 1 — github only; subsequent PRs add cloudflare, k8s, ...)
shipped in this verb:

    github.branch-protection.<field>      — REST PUT/PATCH
    github.branch-protection.enforce_admins — POST/DELETE (special endpoint)

Composes:
    - arq://doc/principle/substrate-is-the-runtime-config-v1 (DRAFT, this principle)
    - arq://doc/principle/merge-decision-on-substrate-v1 (DRAFT) — merge-via-
      substrate's inline enforce_admins toggle is refactored to use this verb
    - arq://doc/principle/migrate-soak-delete-no-half-shipped (DRAFT) —
      Phase 1 dual-mode; Phase 2 default; Phase 3 sole write path
    - arq://doc/principle/trust-graded-merge-v1 (canonised) — high-stakes
      runtime-config toggles get the same different-peer requirement
"""

from __future__ import annotations

import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path

GITHUB_API = "https://api.github.com"
CLOUDFLARE_API = "https://api.cloudflare.com/client/v4"
DEFAULT_OWNER = "Arqera-IO"
DEFAULT_REPO = "ARQERA"
TWIN_CLI = Path.home() / ".local" / "bin" / "twin"
HIGH_STAKES_FIELDS = {
    # Toggling these requires trust-graded different-peer per
    # trust-graded-merge-v1 — single-peer toggles refused
    "github.branch-protection.enforce_admins",
    "github.branch-protection.required_status_checks.contexts",
    "github.branch-protection.required_approving_review_count",
    "github.branch-protection.required_pull_request_reviews",
    "github.branch-protection.required_pull_request_reviews.required_approving_review_count",
    "github.branch-protection.required_pull_request_reviews.require_code_owner_reviews",
    "k8s.deployment.replicas",  # autoscale floor/ceiling matters for cost + availability
    "cloudflare.worker.routes",  # routing changes affect customer traffic
    "cloudflare.zone.always_use_https",  # security posture
}

ACTOR_PEER_ADDRESS = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)


def _utc_ts() -> str:
    """ISO-8601 timestamp with second precision for act references."""
    return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")


def _emit_act(class_: str, type_: str, reference: str, payload: dict) -> str | None:
    """Emit a substrate act via twin CLI subprocess.

    Returns the act address on success, None on failure. Errors are non-
    fatal to the caller — Frame D acts are audit-grade evidence, not
    pre-conditions for the dispatch itself.
    """
    if not TWIN_CLI.exists():
        return None
    payload_str = json.dumps(payload, default=str)  # noqa: ARQ-NO-JSON-HOT-PATH twin CLI input boundary — twin accepts --payload <json>
    try:
        result = subprocess.run(
            [str(TWIN_CLI), "--use-keychain", "act", "emit",
             class_, type_, reference, "--source", "arq-config",
             "--payload", payload_str],
            capture_output=True, text=True, timeout=10,
        )
        if result.returncode == 0:
            for line in result.stdout.splitlines():
                if line.startswith("arq://"):
                    return line.strip().split()[0]
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


# Well-known absolute paths where `gh` may live. PATH-resolution alone is
# fragile in launchd / cron / systemd-user contexts where the daemon's
# inherited PATH may not include the directory `gh` was installed into.
# When this resolver returns None, the daemon emits 401 "Requires
# authentication" on every observe-cycle and truth-infra silently rots —
# see `arq-substrate-sync` cycle 2026-05-14T09:20:24Z for the runtime
# pressure that surfaced this. Per `live-diagnosis-beats-static` +
# `repeated-bypasses-are-undiscovered-primitives-v1`: we resolve via
# shutil.which() first, fall back to a fixed list of absolute paths.
_GH_CANDIDATE_PATHS = (
    "/Users/ayo/bin/gh",          # primary install location on this host
    "/opt/homebrew/bin/gh",       # Homebrew on Apple Silicon
    "/usr/local/bin/gh",          # Homebrew on Intel + manual installs
    str(Path.home() / ".local" / "bin" / "gh"),
)


def _gh_resolve() -> str | None:
    """Resolve the `gh` executable path. PATH first, then absolute fallbacks.
    Returns None if gh cannot be found anywhere — caller must handle that."""
    via_path = shutil.which("gh")
    if via_path:
        return via_path
    for candidate in _GH_CANDIDATE_PATHS:
        if Path(candidate).exists() and os.access(candidate, os.X_OK):
            return candidate
    return None


def _gh_active_user() -> str:
    """Best-effort lookup of the gh CLI's active user — used for
    decider-attribution on substrate."""
    gh = _gh_resolve()
    if not gh:
        return "unknown"
    try:
        result = subprocess.run(
            [gh, "auth", "status"], capture_output=True, text=True, timeout=5,
        )
        for line in (result.stdout + result.stderr).splitlines():
            m = re.search(r"Logged in to github\.com account (\S+).*Active account: true", line)
            if m:
                return m.group(1)
            m = re.search(r"Logged in to github\.com.* as (\S+)", line)
            if m:
                return m.group(1)
    except (subprocess.TimeoutExpired, OSError):
        pass
    return "unknown"


def _gh_auth_token() -> str | None:
    """Get the active gh user's token for API calls.

    Resolution order:
      1. GH_TOKEN env var (matches gh's own precedence — supports plistless overrides)
      2. shutil.which('gh') → `gh auth token`
      3. Well-known absolute gh paths → `gh auth token`
    Returns None if all paths fail; caller is expected to skip the
    Authorization header (which yields 401 on private resources, the
    runtime-evidence signal that this resolver failed)."""
    env_token = os.environ.get("GH_TOKEN") or os.environ.get("GITHUB_TOKEN")
    if env_token:
        return env_token.strip()
    gh = _gh_resolve()
    if not gh:
        return None
    try:
        result = subprocess.run(
            [gh, "auth", "token"], capture_output=True, text=True, timeout=5,
        )
        if result.returncode == 0:
            tok = result.stdout.strip()
            return tok or None
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


def _api_call(
    method: str,
    path: str,
    token: str | None = None,
    body: dict | None = None,
    base_url: str = GITHUB_API,
    extra_headers: dict | None = None,
) -> tuple[int, dict | list | None, str | None]:
    """Minimal REST helper — used for substrate-side reads (verifying
    desired) and platform-side reconciles (applying desired) across all
    adapters. base_url defaults to GitHub; pass CLOUDFLARE_API or
    similar for other surfaces."""
    url = f"{base_url}{path}"
    headers = {
        "Accept": "application/json",
        # Cloudflare WAF rule 1010 blocks the default Python-urllib UA on
        # several APIs (notably Groq). Set a plausible browser-like UA to
        # match the workaround documented in
        # ~/.claude/projects/-Users-ayo-Desktop-Project-ARQERA/memory/
        # feedback_groq_cloudflare_waf_blocks_python_urllib.md
        "User-Agent": "arq-config/0.1 (+https://arqera.io)",
    }
    if base_url == GITHUB_API:
        headers["Accept"] = "application/vnd.github+json"
    if token:
        # GitHub uses `token`, Cloudflare + Sentry + most modern APIs use `Bearer`
        if base_url == GITHUB_API:
            headers["Authorization"] = f"token {token}"
        else:
            headers["Authorization"] = f"Bearer {token}"
    if extra_headers:
        headers.update(extra_headers)
    data = json.dumps(body).encode() if body is not None else None  # noqa: ARQ-NO-JSON-HOT-PATH GitHub/Cloudflare/Sentry vendor wire format
    if data:
        headers["Content-Type"] = "application/json"
    req = urllib.request.Request(url, data=data, headers=headers, method=method)
    try:
        with urllib.request.urlopen(req, timeout=15) as resp:
            raw = resp.read()
            try:
                return resp.status, (json.loads(raw) if raw else None), None
            except json.JSONDecodeError:
                return resp.status, None, raw.decode("utf-8", "replace")[:500]
    except urllib.error.HTTPError as e:
        try:
            err_body = e.read().decode("utf-8", "replace")[:500]
        except OSError:
            err_body = str(e)
        return e.code, None, err_body
    except urllib.error.URLError as e:
        return 0, None, str(e)


def _kubectl_run(args_list: list[str], timeout: float = 15) -> tuple[int, str, str]:
    """Run kubectl with a fixed staging context. Returns (rc, stdout, stderr)."""
    ctx = os.environ.get(
        "ARQ_CONFIG_KUBE_CONTEXT",
        "gke_gen-lang-client-0790194393_europe-west2_arqera-gke",
    )
    cmd = ["kubectl", "--context", ctx, *args_list]
    try:
        r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
        return r.returncode, r.stdout, r.stderr
    except (subprocess.TimeoutExpired, OSError) as e:
        return -1, "", str(e)


# ─────────────────────────────────────────────────────────────────────
# Surface adapters — read + apply for each platform/field combination
# Phase 1 ships github only; cloudflare/k8s/sentry follow in subsequent PRs
# ─────────────────────────────────────────────────────────────────────

def _github_branch_protection_observe(
    branch: str, field: str, owner: str, repo: str, token: str | None,
):
    """Read current platform value for a branch-protection field.

    Returns (status, observed_value, error). The field is dotted —
    e.g. `enforce_admins`, `required_status_checks.contexts`,
    `required_pull_request_reviews.required_approving_review_count`.
    """
    status, body, err = _api_call(
        "GET", f"/repos/{owner}/{repo}/branches/{branch}/protection",
        token=token,
    )
    if status >= 400 or not isinstance(body, dict):
        return status, None, err
    # Walk the dotted field path
    cur = body
    for part in field.split("."):
        if isinstance(cur, dict):
            cur = cur.get(part)
        else:
            return status, None, f"path {field} broke at {part}"
    # GitHub wraps several primitives in {"enabled": true, ...}; unwrap to
    # the primitive when the dict carries an explicit `enabled` key. Other
    # fields (url, contexts_url) are GitHub's introspection metadata and
    # not part of the field's substrate-tracked value.
    if isinstance(cur, dict) and "enabled" in cur:
        cur = cur["enabled"]
    return status, cur, None


def _github_branch_protection_apply(
    branch: str, field: str, value, owner: str, repo: str, token: str | None,
):
    """Apply a desired value to a branch-protection field via the field-
    appropriate REST endpoint."""
    if field == "enforce_admins":
        if value is True:
            return _api_call(
                "POST",
                f"/repos/{owner}/{repo}/branches/{branch}/protection/enforce_admins",
                token=token,
            )
        else:
            return _api_call(
                "DELETE",
                f"/repos/{owner}/{repo}/branches/{branch}/protection/enforce_admins",
                token=token,
            )
    if field.startswith("required_status_checks") or field.startswith("required_pull_request_reviews"):
        # Update via main protection PUT, preserving other fields
        st, current, err = _api_call(
            "GET", f"/repos/{owner}/{repo}/branches/{branch}/protection",
            token=token,
        )
        if st >= 400 or not isinstance(current, dict):
            return st, None, err or "could not read current protection"
        # Build a minimal payload: required_status_checks + reviews + admins
        rsc = dict(current.get("required_status_checks") or {})
        rsc.pop("url", None)
        rsc.pop("contexts_url", None)
        rsc.pop("checks", None)
        if field.startswith("required_status_checks"):
            sub = field[len("required_status_checks"):].lstrip(".")
        else:
            sub = ""
        if field.startswith("required_status_checks") and sub == "":
            rsc = value if isinstance(value, dict) else rsc
        elif field.startswith("required_status_checks") and sub == "contexts":
            rsc["contexts"] = value
        elif field.startswith("required_status_checks") and sub == "strict":
            rsc["strict"] = value
        rprr = current.get("required_pull_request_reviews") or {}
        if field == "required_pull_request_reviews":
            review_payload = value
        else:
            review_payload = {
                "required_approving_review_count": rprr.get("required_approving_review_count", 1),
                "dismiss_stale_reviews": rprr.get("dismiss_stale_reviews", True),
                "require_code_owner_reviews": rprr.get("require_code_owner_reviews", True),
            }
            if field.startswith("required_pull_request_reviews."):
                review_sub = field[len("required_pull_request_reviews."):].strip()
                if review_sub in {
                    "required_approving_review_count",
                    "dismiss_stale_reviews",
                    "require_code_owner_reviews",
                }:
                    review_payload[review_sub] = value
        body = {
            "required_status_checks": rsc,
            "enforce_admins": (current.get("enforce_admins") or {}).get("enabled", True),
            "required_pull_request_reviews": review_payload,
            "restrictions": current.get("restrictions"),
            "allow_force_pushes": (current.get("allow_force_pushes") or {}).get("enabled", False),
            "allow_deletions": (current.get("allow_deletions") or {}).get("enabled", False),
            "required_linear_history": (current.get("required_linear_history") or {}).get("enabled", False),
            "required_conversation_resolution": (current.get("required_conversation_resolution") or {}).get("enabled", False),
        }
        return _api_call(
            "PUT",
            f"/repos/{owner}/{repo}/branches/{branch}/protection",
            token=token, body=body,
        )
    return 0, None, f"github.branch-protection.{field}: apply not implemented"


# ─────────────────────────────────────────────────────────────────────
# k8s adapter — Deployment/StatefulSet/CronJob fields via kubectl
# Substrate decides desired-state; reconciler applies via kubectl patch.
# Body-agnostic: works for any k8s resource that supports JSON-merge patch.
# Field shape: <namespace>/<kind>/<name>.<json-merge-pointer>
#   e.g. arqera-staging/deployment/arqera-backend-blue.spec.replicas
# ─────────────────────────────────────────────────────────────────────

def _k8s_observe(field: str, namespace: str | None = None):
    """Read current platform value for a k8s resource field.

    Field syntax: `<namespace>/<kind>/<name>.<dotted-jq-path>`
    Example: `arqera-staging/deployment/arqera-backend-blue.spec.replicas`

    Returns (status, observed, error). status = http-status-equivalent
    (0 = error, 200 = ok) so this composes with the rest of the
    reconciler.
    """
    parts = field.split(".", 1)
    if len(parts) != 2:
        return 0, None, "field must be 'ns/kind/name.path' shape"
    target, dotted_path = parts
    target_parts = target.split("/")
    if len(target_parts) != 3:
        return 0, None, "target must be 'namespace/kind/name'"
    ns, kind, name = target_parts
    rc, stdout, stderr = _kubectl_run(
        ["-n", ns, "get", kind, name, "-o", "json"], timeout=20,
    )
    if rc != 0:
        return 0, None, stderr.strip()[:500] or "kubectl get failed"
    try:
        body = json.loads(stdout)  # noqa: ARQ-NO-JSON-HOT-PATH kubectl vendor wire format (-o json)
    except json.JSONDecodeError as e:
        return 0, None, f"kubectl json parse: {e}"
    cur = body
    for p in dotted_path.split("."):
        # Allow [N] array index notation
        m = re.match(r"^([^\[]*)\[(\d+)\]$", p)
        if m:
            key, idx = m.group(1), int(m.group(2))
            if key:
                cur = cur.get(key) if isinstance(cur, dict) else None
            cur = cur[idx] if isinstance(cur, list) and idx < len(cur) else None
        else:
            cur = cur.get(p) if isinstance(cur, dict) else None
        if cur is None:
            return 200, None, None
    return 200, cur, None


def _k8s_apply(field: str, value):
    """Apply a desired value to a k8s resource field via JSON-merge patch.

    Uses `kubectl patch --type=json` with an op=replace/add at the path.
    Returns (status, body, error).
    """
    parts = field.split(".", 1)
    if len(parts) != 2:
        return 0, None, "field must be 'ns/kind/name.path' shape"
    target, dotted_path = parts
    target_parts = target.split("/")
    if len(target_parts) != 3:
        return 0, None, "target must be 'namespace/kind/name'"
    ns, kind, name = target_parts
    # Convert dotted path to JSON-pointer
    jsonptr = "/" + dotted_path.replace(".", "/")
    patch = json.dumps([{"op": "replace", "path": jsonptr, "value": value}])
    rc, stdout, stderr = _kubectl_run(
        ["-n", ns, "patch", kind, name, "--type=json", "-p", patch],
        timeout=20,
    )
    if rc != 0:
        return 0, None, stderr.strip()[:500] or "kubectl patch failed"
    return 200, {"patched": stdout.strip()}, None


# ─────────────────────────────────────────────────────────────────────
# cloudflare adapter — Zone settings + Worker config via REST API
# Field shape: <zone-or-account>:<id>.<dotted-path>
#   e.g. zone:<zone_id>.always_use_https
#        account:<account_id>.workers.<worker_name>.<setting>
# ─────────────────────────────────────────────────────────────────────

def _sops_extract(path_expr: str) -> str | None:
    """Extract a value from the operator's SOPS-encrypted secrets file.

    Returns None on any failure (file missing, age key missing, sops not on
    PATH, extraction failed). Mirrors the resolver shape in arq-cli-exec —
    same source chain that backend's credential_reader_service uses
    cluster-side; this is the operator-local mirror.

    `path_expr` is a sops --extract expression: '["sentry"]["sentry_auth_token"]'.
    """
    sops_file = Path.home() / "Desktop" / "Project" / ".secrets.yaml"
    age_key = Path.home() / ".config" / "sops" / "age" / "keys.txt"
    if not sops_file.exists() or not age_key.exists():
        return None
    try:
        r = subprocess.run(
            ["sops", "-d", "--extract", path_expr, str(sops_file)],
            capture_output=True, text=True, timeout=10,
            env={**os.environ, "SOPS_AGE_KEY_FILE": str(age_key)},
        )
        if r.returncode == 0:
            v = r.stdout.strip()
            return v or None
    except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
        return None
    return None


def _mask_token(s: str | None) -> str:
    """Redaction-safe representation of a credential — never emit raw."""
    if not s or len(s) < 8:
        return "***"
    return f"{s[:4]}...{s[-2:]}"


# Secret-source body addresses on substrate (per Frame D extension —
# every secret source is substrate-addressable so credential_resolved
# acts can link back to the canonical body for that source).
_SECRET_SOURCE_BODIES = {
    "env": "arq://body/secret_source/env-twin-process-2026-04-26",  # not strictly emitted but referenced
    "sops": "arq://body/secret_source/sops-secrets-yaml-2026-04-26",
    "keychain": "arq://body/secret_source/keychain-macos-twin-2026-04-26",
    "k8s-shared": "arq://body/secret_source/k8s-arqera-shared-env-2026-04-26",
    "k8s-backend": "arq://body/secret_source/k8s-arqera-backend-env-2026-04-26",
    "vault": "arq://body/secret_source/vault-staging-db-2026-04-26",
    "nango": "arq://body/secret_source/nango-cloud-2026-04-26",
    "onepassword": "arq://body/secret_source/onepassword-cli-optional-2026-04-26",
    "bitwarden": "arq://body/secret_source/bitwarden-cli-optional-2026-04-26",
}


def _resolve_credential(surface: str) -> tuple[str | None, str]:
    """Resolve the API token for a surface from operator-local sources.

    Order matches arq-cli-exec / credential_reader_service:
      1. Ambient env var (most-specific wins)
      2. SOPS .secrets.yaml (the operator's encrypted vault)
      3. macOS Keychain generic-password lookup

    Emits a `credential_resolved` substrate act on every read so
    SOC 2 CC6.1 / ISO 27001 A.5.10 / EU AI Act Art 12 can answer
    "show me every credential access" with one substrate query class.
    Composes Frame D's secret-substrate extension: every secret
    source is addressable (arq://body/secret_source/...); every
    access leaves a credential_resolved act linking back.

    Returns (token, source_label). source_label is 'env' / 'sops' /
    'keychain' / 'unavailable'.
    """
    surface_envs = {
        "cloudflare": ("CLOUDFLARE_API_TOKEN", "CF_API_TOKEN"),
        "sentry": ("SENTRY_AUTH_TOKEN",),
        "posthog": ("POSTHOG_API_KEY", "POSTHOG_PERSONAL_API_KEY"),
        "stripe": ("STRIPE_SECRET_KEY", "STRIPE_API_KEY"),
        "nango": ("NANGO_SECRET_KEY",),
    }
    surface_sops = {
        "cloudflare": '["cloudflare"]["cloudflare_api_token"]',
        "sentry": '["sentry"]["sentry_auth_token"]',
        "posthog": '["posthog"]["posthog_personal_api_key"]',
        "stripe": '["stripe"]["stripe_secret_key"]',
        "nango": '["nango"]["nango_secret_key"]',
    }
    keychain_services = {
        "cloudflare": ("cloudflare-api-token", "CLOUDFLARE_API_TOKEN"),
        "sentry": ("sentry-auth-token", "SENTRY_AUTH_TOKEN"),
        "posthog": ("posthog-api-key",),
        "stripe": ("stripe-secret-key",),
        "nango": ("nango-secret-key",),
    }
    token: str | None = None
    source_label = "unavailable"
    source_detail: str | None = None
    for env_name in surface_envs.get(surface, ()):
        v = os.environ.get(env_name)
        if v:
            token = v
            source_label = "env"
            source_detail = env_name
            break
    if token is None:
        sops_path = surface_sops.get(surface)
        if sops_path:
            v = _sops_extract(sops_path)
            if v:
                token = v
                source_label = "sops"
                source_detail = sops_path
    if token is None:
        for kc_service in keychain_services.get(surface, ()):
            try:
                r = subprocess.run(
                    ["security", "find-generic-password", "-s", kc_service, "-w"],
                    capture_output=True, text=True, timeout=5,
                )
                if r.returncode == 0 and r.stdout.strip():
                    token = r.stdout.strip()
                    source_label = "keychain"
                    source_detail = kc_service
                    break
            except (subprocess.TimeoutExpired, OSError):
                continue

    # Substrate audit: emit credential_resolved act linking back to the
    # secret-source body. Even unavailable resolutions emit (operators
    # auditing "why did this fail" need that signal too).
    _emit_act(
        "act", "credential_resolved",
        f"{surface}-{source_label}-{_utc_ts()}",
        {
            "surface": surface,
            "source_label": source_label,
            "source_detail": source_detail,
            "secret_source_body": _SECRET_SOURCE_BODIES.get(source_label),
            "token_fingerprint": _mask_token(token),
            "outcome": "resolved" if token else "unavailable",
            "actor_peer": ACTOR_PEER_ADDRESS,
            "principle": "substrate-is-the-runtime-config-v1",
            "frame_d_extension": "secrets-substrate",
            "ts": _utc_ts(),
        },
    )
    return token, source_label


def _cloudflare_token() -> str | None:
    """Cloudflare API token via the unified resolver."""
    token, _ = _resolve_credential("cloudflare")
    return token


def _sentry_token() -> str | None:
    """Sentry auth token via the unified resolver."""
    token, _ = _resolve_credential("sentry")
    return token


def _cloudflare_observe(field: str):
    """Read a Cloudflare runtime config field.

    Field syntax: `<scope>:<id>.<dotted-path>`
      zone:<zone_id>.<setting_name>      → /zones/{id}/settings/{setting}
      account:<account_id>.workers       → /accounts/{id}/workers/scripts
    """
    token = _cloudflare_token()
    if not token:
        return 0, None, "CLOUDFLARE_API_TOKEN not in env or keychain"
    scope_part, _, dotted = field.partition(".")
    scope, _, ident = scope_part.partition(":")
    if not (scope and ident and dotted):
        return 0, None, "field must be 'zone:<id>.<setting>' or 'account:<id>.<path>'"
    if scope == "zone":
        # Setting endpoint returns {result: {id, value, ...}}
        st, body, err = _api_call(
            "GET", f"/zones/{ident}/settings/{dotted}",
            token=token, base_url=CLOUDFLARE_API,
        )
        if st >= 400 or not isinstance(body, dict):
            return st, None, err
        result = body.get("result") or {}
        return st, result.get("value"), None
    if scope == "account":
        # Top-level: workers list, etc. dotted path walks the JSON.
        # Phase 1 supports: account:<id>.workers (list scripts)
        if dotted == "workers":
            st, body, err = _api_call(
                "GET", f"/accounts/{ident}/workers/scripts",
                token=token, base_url=CLOUDFLARE_API,
            )
            if st >= 400 or not isinstance(body, dict):
                return st, None, err
            return st, body.get("result"), None
        return 0, None, f"cloudflare account.{dotted}: not implemented in Phase 1"
    return 0, None, f"unknown cloudflare scope: {scope}"


def _cloudflare_apply(field: str, value):
    """Apply a Cloudflare runtime config field.

    Phase 1 supports `zone:<id>.<setting>` via PATCH /zones/{id}/settings/{setting}.
    """
    token = _cloudflare_token()
    if not token:
        return 0, None, "CLOUDFLARE_API_TOKEN not in env/sops/keychain"
    scope_part, _, dotted = field.partition(".")
    scope, _, ident = scope_part.partition(":")
    if scope == "zone":
        return _api_call(
            "PATCH", f"/zones/{ident}/settings/{dotted}",
            token=token, base_url=CLOUDFLARE_API,
            body={"value": value},
        )
    return 0, None, f"cloudflare {scope}.{dotted}: apply not implemented in Phase 1"


# ─────────────────────────────────────────────────────────────────────
# sentry adapter — alert rules + project settings via REST API
# Field shape: <org>:<project>.<dotted-path>
#   Examples:
#     arqera:arqera-backend.alert-rules.<id>.threshold
#     arqera:arqera-frontend.platform                  (read-only metadata)
# ─────────────────────────────────────────────────────────────────────

SENTRY_API = "https://sentry.io/api/0"
STRIPE_API = "https://api.stripe.com/v1"


def _find_platforms_manifest() -> Path | None:
    """The manifest lives in the ARQERA repo (not next to the installed
    arq-config binary). Try canonical locations in order."""
    candidates = [
        Path(__file__).resolve().parent.parent.parent / "infra" / "runtime-config-platforms.yaml",
        Path.home() / "Desktop" / "Project" / "ARQERA" / "infra" / "runtime-config-platforms.yaml",
        Path("/Users/ayo/Desktop/Project/ARQERA/infra/runtime-config-platforms.yaml"),
        Path.cwd() / "infra" / "runtime-config-platforms.yaml",
    ]
    for c in candidates:
        if c.exists():
            return c
    return None


PLATFORMS_MANIFEST = _find_platforms_manifest()


def _sentry_observe(field: str):
    """Read a sentry runtime config field.

    Field syntax: `<org>:<project>.<dotted-path>`
      <org>:<project>.alert-rules                  -> list of rules
      <org>:<project>.alert-rules.<id>.<setting>   -> single setting on a rule
      <org>:<project>.platform                     -> project metadata
    """
    token = _sentry_token()
    if not token:
        return 0, None, "SENTRY_AUTH_TOKEN not in env/sops/keychain"
    proj_part, _, dotted = field.partition(".")
    org, _, project = proj_part.partition(":")
    if not (org and project and dotted):
        return 0, None, "field must be '<org>:<project>.<setting>'"

    # Alert-rules path
    if dotted.startswith("alert-rules"):
        st, body, err = _api_call(
            "GET", f"/projects/{org}/{project}/rules/",
            token=token, base_url=SENTRY_API,
        )
        if st >= 400 or not isinstance(body, list):
            return st, None, err
        # If only the list is requested, return the count (cheap audit signal)
        sub = dotted[len("alert-rules"):].lstrip(".")
        if not sub:
            return st, len(body), None
        # Rule-specific access: alert-rules.<id>.<setting>
        rule_id, _, rule_field = sub.partition(".")
        rule = next((r for r in body if str(r.get("id")) == str(rule_id)), None)
        if rule is None:
            return 404, None, f"rule {rule_id} not found"
        if not rule_field:
            return st, rule, None
        cur = rule
        for p in rule_field.split("."):
            if isinstance(cur, dict):
                cur = cur.get(p)
            else:
                return st, None, f"path {rule_field} broke at {p}"
        return st, cur, None

    # Project-level metadata
    if dotted == "platform" or dotted.startswith("platform.") or dotted in (
        "name", "slug", "hasAccess", "isMember"
    ):
        st, body, err = _api_call(
            "GET", f"/projects/{org}/{project}/",
            token=token, base_url=SENTRY_API,
        )
        if st >= 400 or not isinstance(body, dict):
            return st, None, err
        cur = body
        for p in dotted.split("."):
            if isinstance(cur, dict):
                cur = cur.get(p)
            else:
                return st, None, f"path {dotted} broke at {p}"
        return st, cur, None

    return 0, None, f"sentry {dotted}: not implemented in Phase 1"


def _stripe_token() -> str | None:
    """Stripe secret key via the unified resolver (sops/keychain/env)."""
    token, _ = _resolve_credential("stripe")
    return token


# ─────────────────────────────────────────────────────────────────────
# stripe adapter — account + portal-configurations + tax settings
# Field shape: <resource>.<dotted-path>
#   account.<setting>                         — top-level account field
#   account.settings.<group>.<field>          — nested settings
#   tax-settings.<setting>                    — tax/auto-collection config
#   customer-portal-configurations            — list (count)
#   customer-portal-configurations.<id>.<feat> — per-config feature toggle
# ─────────────────────────────────────────────────────────────────────

def _stripe_observe(field: str):
    """Read a stripe runtime config field. Returns (status, observed, error)."""
    token = _stripe_token()
    if not token:
        return 0, None, "STRIPE_SECRET_KEY not in env/sops/keychain"
    if field == "customer-portal-configurations":
        st, body, err = _api_call(
            "GET", "/billing_portal/configurations?limit=10",
            token=token, base_url=STRIPE_API,
        )
        if st >= 400 or not isinstance(body, dict):
            return st, None, err
        return st, len(body.get("data") or []), None
    if field.startswith("customer-portal-configurations."):
        rest = field[len("customer-portal-configurations."):]
        cfg_id, _, sub = rest.partition(".")
        st, body, err = _api_call(
            "GET", f"/billing_portal/configurations/{cfg_id}",
            token=token, base_url=STRIPE_API,
        )
        if st >= 400 or not isinstance(body, dict):
            return st, None, err
        if not sub:
            return st, body, None
        cur = body
        for p in sub.split("."):
            if isinstance(cur, dict):
                cur = cur.get(p)
            else:
                return st, None, f"path {sub} broke at {p}"
        return st, cur, None
    if field.startswith("account") or field == "account":
        # Stripe's /v1/account returns the connected/own account
        st, body, err = _api_call(
            "GET", "/account",
            token=token, base_url=STRIPE_API,
        )
        if st >= 400 or not isinstance(body, dict):
            return st, None, err
        if field == "account":
            return st, body.get("id"), None
        sub = field[len("account."):]
        cur = body
        for p in sub.split("."):
            if isinstance(cur, dict):
                cur = cur.get(p)
            else:
                return st, None, f"path {sub} broke at {p}"
        return st, cur, None
    return 0, None, f"stripe {field}: not implemented in Phase 1"


def _stripe_apply(field: str, value):
    """Apply a stripe runtime-config field. Phase 1 supports
    customer-portal-configurations.<id>.<feature_path> via POST (Stripe
    API form-encoded; we send JSON body and let urllib send it as-is —
    Stripe accepts both for most endpoints, but for full coverage a
    follow-up should switch to application/x-www-form-urlencoded for
    nested fields)."""
    token = _stripe_token()
    if not token:
        return 0, None, "STRIPE_SECRET_KEY not in env/sops/keychain"
    if field.startswith("customer-portal-configurations."):
        rest = field[len("customer-portal-configurations."):]
        cfg_id, _, sub = rest.partition(".")
        if not sub:
            return 0, None, "must specify customer-portal-configurations.<id>.<feature>"
        # Stripe expects form-encoded for nested updates; for primitive
        # leaf values we encode top-level only. Phase 1 supports
        # `<feature>.enabled` shape (matches portal-configuration schema).
        body = {sub: {"enabled": value}} if isinstance(value, bool) else {sub: value}
        return _api_call(
            "POST", f"/billing_portal/configurations/{cfg_id}",
            token=token, base_url=STRIPE_API, body=body,
        )
    return 0, None, f"stripe {field}: apply not implemented in Phase 1"


def _sentry_apply(field: str, value):
    """Apply a sentry runtime config field via REST.

    Phase 1: alert-rules.<id>.<setting> via PUT /projects/<o>/<p>/rules/<id>/.
    The PUT replaces the rule body; we fetch first, splice in `value` at
    the dotted path, then PUT.
    """
    token = _sentry_token()
    if not token:
        return 0, None, "SENTRY_AUTH_TOKEN not in env/sops/keychain"
    proj_part, _, dotted = field.partition(".")
    org, _, project = proj_part.partition(":")
    if not dotted.startswith("alert-rules."):
        return 0, None, f"sentry {dotted}: apply not implemented in Phase 1"
    rest = dotted[len("alert-rules."):]
    rule_id, _, rule_field = rest.partition(".")
    if not rule_field:
        return 0, None, "must specify alert-rules.<id>.<setting>"
    # Fetch current rule
    st, rules, err = _api_call(
        "GET", f"/projects/{org}/{project}/rules/",
        token=token, base_url=SENTRY_API,
    )
    if st >= 400 or not isinstance(rules, list):
        return st, None, err
    rule = next((r for r in rules if str(r.get("id")) == str(rule_id)), None)
    if rule is None:
        return 404, None, f"rule {rule_id} not found"
    # Splice value into the rule at the dotted path
    target = dict(rule)
    cursor = target
    parts = rule_field.split(".")
    for p in parts[:-1]:
        if not isinstance(cursor.get(p), dict):
            cursor[p] = {}
        cursor = cursor[p]
    cursor[parts[-1]] = value
    return _api_call(
        "PUT", f"/projects/{org}/{project}/rules/{rule_id}/",
        token=token, base_url=SENTRY_API, body=target,
    )


# ─────────────────────────────────────────────────────────────────────
# Generic REST adapter — config-driven body-agnostic dispatch
# Reads infra/runtime-config-platforms.yaml for platform shapes; one
# loader handles N platforms (grafana, workos, posthog, slack, sendgrid,
# twilio, anthropic, openai, groq, microsoft_graph, nango).
# ─────────────────────────────────────────────────────────────────────

_PLATFORMS_CACHE: dict | None = None


def _load_platforms_manifest() -> dict:
    """Load + cache the generic-platform manifest. Returns {} if missing
    (the per-surface adapters above still work)."""
    global _PLATFORMS_CACHE
    if _PLATFORMS_CACHE is not None:
        return _PLATFORMS_CACHE
    if PLATFORMS_MANIFEST is None or not PLATFORMS_MANIFEST.exists():
        _PLATFORMS_CACHE = {}
        return _PLATFORMS_CACHE
    try:
        import yaml  # type: ignore[import-not-found]
        with PLATFORMS_MANIFEST.open() as f:
            _PLATFORMS_CACHE = yaml.safe_load(f) or {}
    except ImportError:
        _PLATFORMS_CACHE = {}
    except (OSError, Exception):
        _PLATFORMS_CACHE = {}
    return _PLATFORMS_CACHE


def _generic_platform_token(platform_cfg: dict) -> tuple[str | None, str]:
    """Resolve a platform token via the cred_sources block in the manifest.

    For basic-auth platforms, the token is the password component; the
    user component comes from cred_sources.sops_user (separate SOPS path).
    Caller composes them as `<user>:<password>` and sets auth_kind=basic.
    """
    cred_sources = platform_cfg.get("cred_sources") or {}
    env_name = cred_sources.get("env")
    if env_name:
        v = os.environ.get(env_name)
        if v:
            return v, "env"
    sops_path = cred_sources.get("sops")
    if sops_path:
        v = _sops_extract(sops_path)
        if v:
            return v, "sops"
    kc_service = cred_sources.get("keychain")
    if kc_service:
        try:
            r = subprocess.run(
                ["security", "find-generic-password", "-s", kc_service, "-w"],
                capture_output=True, text=True, timeout=5,
            )
            if r.returncode == 0 and r.stdout.strip():
                return r.stdout.strip(), "keychain"
        except (subprocess.TimeoutExpired, OSError):
            pass
    return None, "unavailable"


def _generic_platform_user(platform_cfg: dict) -> str | None:
    """For basic-auth platforms, resolve the user/account-id component.
    Falls back to env, then SOPS at sops_user path."""
    cred_sources = platform_cfg.get("cred_sources") or {}
    env_user = cred_sources.get("env_user")
    if env_user:
        v = os.environ.get(env_user)
        if v:
            return v
    sops_user = cred_sources.get("sops_user")
    if sops_user:
        v = _sops_extract(sops_user)
        if v:
            return v
    return None


def _generic_rest_observe(platform: str, field: str):
    """Generic observe via the manifest. Field shape:
       <resource> | <resource>.<dotted-path> | <resource>.{param=value}.dotted-path

    The {param=value} segments substitute placeholders in the manifest
    URL templates (e.g. {project_id} for posthog, {account_sid} for twilio).
    """
    manifest = _load_platforms_manifest()
    cfg = manifest.get(platform)
    if not cfg:
        return 0, None, f"platform '{platform}' not in manifest"
    token, source_label = _generic_platform_token(cfg)
    if not token:
        return 0, None, f"{platform} token not in env/sops/keychain"

    # Emit credential_resolved act for substrate audit (mirrors
    # _resolve_credential's emission for per-surface adapters).
    _emit_act(
        "act", "credential_resolved",
        f"{platform}-{source_label}-{_utc_ts()}",
        {
            "surface": platform, "source_label": source_label,
            "secret_source_body": _SECRET_SOURCE_BODIES.get(source_label),
            "token_fingerprint": _mask_token(token),
            "outcome": "resolved",
            "principle": "substrate-is-the-runtime-config-v1",
            "frame_d_extension": "generic-rest-adapter",
            "ts": _utc_ts(),
        },
    )

    # Parse field into resource + params + dotted-path
    parts = field.split(".")
    resource = parts[0]
    rest_parts = parts[1:]
    params: dict[str, str] = {}
    dotted_walk: list[str] = []
    for p in rest_parts:
        if p.startswith("{") and "=" in p and p.endswith("}"):
            k, v = p[1:-1].split("=", 1)
            params[k] = v
        else:
            dotted_walk.append(p)

    resources = cfg.get("resources") or {}
    res_cfg = resources.get(resource)
    if not res_cfg:
        return 0, None, f"resource '{resource}' not in manifest for {platform}"
    observe_template = res_cfg.get("observe")
    if not observe_template:
        return 0, None, f"resource '{resource}' has no observe template"

    # Parse "GET /path/{param}" → method, path
    method_path = observe_template.split(None, 1)
    method = method_path[0] if len(method_path) == 2 else "GET"
    path = method_path[-1]
    # Substitute params
    for k, v in params.items():
        path = path.replace(f"{{{k}}}", v)

    api_base = cfg.get("api_base", "")
    auth_kind = cfg.get("auth_kind", "bearer")
    extra_headers: dict[str, str] = cfg.get("extra_headers") or {}

    # Substitute manifest-default params (e.g. {project_id} from cfg.defaults)
    defaults = cfg.get("defaults") or {}
    for k, v in defaults.items():
        path = path.replace(f"{{{k}}}", str(v))
        api_base = api_base.replace(f"{{{k}}}", str(v))

    # Build auth header(s)
    headers = dict(extra_headers)
    token_arg: str | None = None
    if auth_kind == "x-api-key":
        headers["x-api-key"] = token
    elif auth_kind == "basic":
        import base64
        user = _generic_platform_user(cfg) or ""
        creds = base64.b64encode(f"{user}:{token}".encode()).decode()
        headers["Authorization"] = f"Basic {creds}"
    elif auth_kind == "query-string-key":
        # Some APIs (Google Maps) authenticate via ?key= rather than header
        sep = "&" if "?" in path else "?"
        param_name = cfg.get("query_string_param", "key")
        path = f"{path}{sep}{param_name}={token}"
    else:
        token_arg = token  # bearer (default)

    st, body, err = _api_call(
        method, path, token=token_arg, base_url=api_base,
        extra_headers=headers if headers else None,
    )
    if st >= 400 or st == 0:
        return st, None, err
    if not dotted_walk:
        # Return a count for list-shaped responses, full body otherwise
        if isinstance(body, list):
            return st, len(body), None
        if isinstance(body, dict) and isinstance(body.get("data"), list):
            return st, len(body["data"]), None
        if isinstance(body, dict) and isinstance(body.get("results"), list):
            return st, len(body["results"]), None
        return st, body, None

    cur = body
    for p in dotted_walk:
        if isinstance(cur, dict):
            cur = cur.get(p)
        elif isinstance(cur, list):
            try:
                cur = cur[int(p)]
            except (ValueError, IndexError):
                return st, None, f"path broke at {p} (not int or out of range)"
        else:
            return st, None, f"path broke at {p}"
    return st, cur, None


# ─────────────────────────────────────────────────────────────────────
# Adapter registry — body-agnostic dispatch table
# Adding a new surface = add an entry to OBSERVERS + APPLIERS.
# ─────────────────────────────────────────────────────────────────────

def _observe_dispatch(surface: str, field: str, **kwargs):
    """Surface-agnostic observe. Returns (status, observed, error)."""
    if surface == "github" and field.startswith("branch-protection."):
        sub = field[len("branch-protection."):]
        return _github_branch_protection_observe(
            kwargs.get("branch", "main"), sub,
            kwargs.get("owner", DEFAULT_OWNER),
            kwargs.get("repo", DEFAULT_REPO),
            kwargs.get("token") or _gh_auth_token(),
        )
    if surface == "k8s":
        return _k8s_observe(field)
    if surface == "cloudflare":
        return _cloudflare_observe(field)
    if surface == "sentry":
        return _sentry_observe(field)
    if surface == "stripe":
        return _stripe_observe(field)
    # Fallback: generic-rest adapter via infra/runtime-config-platforms.yaml
    manifest = _load_platforms_manifest()
    if surface in manifest:
        return _generic_rest_observe(surface, field)
    return 0, None, f"unknown surface: {surface}"


def _apply_dispatch(surface: str, field: str, value, **kwargs):
    """Surface-agnostic apply. Returns (status, body, error)."""
    if surface == "github" and field.startswith("branch-protection."):
        sub = field[len("branch-protection."):]
        return _github_branch_protection_apply(
            kwargs.get("branch", "main"), sub, value,
            kwargs.get("owner", DEFAULT_OWNER),
            kwargs.get("repo", DEFAULT_REPO),
            kwargs.get("token") or _gh_auth_token(),
        )
    if surface == "k8s":
        return _k8s_apply(field, value)
    if surface == "cloudflare":
        return _cloudflare_apply(field, value)
    if surface == "sentry":
        return _sentry_apply(field, value)
    if surface == "stripe":
        return _stripe_apply(field, value)
    return 0, None, f"unknown surface: {surface}"


# ─────────────────────────────────────────────────────────────────────
# Verb implementations
# ─────────────────────────────────────────────────────────────────────

def cmd_decide(args: argparse.Namespace) -> int:
    """Emit a runtime_config_decided act: the substrate-side declaration
    that this (surface, field) SHOULD have value X.

    The act IS the authorization. Nothing changes on the platform until
    `arq-config reconcile` reads this and applies it.
    """
    full_field = f"{args.surface}.{args.field}"
    actor = _gh_active_user()
    ts = _utc_ts()

    # Trust-graded check for high-stakes fields. We can't enforce
    # cross-peer here without a peer-of-record system; the runtime
    # invariant is: when this verb is called for a high-stakes field,
    # caller MUST attest a different-peer-merger or similar paired-trust
    # signal via --paired-peer. Without that, refuse.
    if full_field in HIGH_STAKES_FIELDS and not args.paired_peer:
        _emit_act(
            "act", "config_blocked", f"{args.surface}-{args.field}-{ts}",
            {
                "surface": args.surface, "field": args.field,
                "blocked_reason": "high_stakes_field_requires_paired_peer",
                "high_stakes_fields": sorted(HIGH_STAKES_FIELDS),
                "actor": actor, "ts": ts,
                "principle": "trust-graded-merge-v1",
            },
        )
        print(
            f"arq-config decide {full_field}: high-stakes field requires "
            f"--paired-peer attestation; refused.",
            file=sys.stderr,
        )
        return 1

    # Parse value: try JSON first (for true/false/null/list/object), else string
    try:
        parsed_value = json.loads(args.value)
    except json.JSONDecodeError:
        parsed_value = args.value

    payload = {
        "surface": args.surface,
        "field": args.field,
        "value": parsed_value,
        "scope": args.scope or "global",
        "reason": args.reason or "",
        "decider_peer": ACTOR_PEER_ADDRESS,
        "decider_actor": actor,
        "paired_peer": args.paired_peer,
        "valid_from": ts,
        "valid_until": args.valid_until,
        "principle": "substrate-is-the-runtime-config-v1",
        "principle_status": "DRAFT",
        "ts": ts,
    }
    addr = _emit_act(
        "act", "runtime_config_decided", f"{args.surface}-{args.field}-{ts}", payload,
    )
    if not addr:
        print(
            f"arq-config decide: failed to emit substrate act (twin CLI unavailable?); "
            f"would have decided: {full_field}={parsed_value} scope={args.scope or 'global'}",
            file=sys.stderr,
        )
        return 1
    print(f"runtime_config_decided: {addr}")
    print(f"  {full_field} = {json.dumps(parsed_value)} scope={payload['scope']}")
    return 0


def _twin_index(class_: str, type_: str, ref_prefix: str | None = None,
                limit: int = 50, since_days: int = 60) -> list[dict]:
    """Query substrate via `twin index`. Returns list of records (each a
    dict with at least `address`, `issued_at`). Time-DESC order — first
    record is most recent.

    Implementation note: the addressing-service's index endpoint
    rejects type-filtered queries without a time window for some
    low-cardinality act types (server-side query-plan limit). This
    helper always passes `--since` set to <since_days> ago to keep
    queries within the optimised path.

    Falls back to non-JSON output parsing because `twin index --json`
    is failing on some recent versions; we parse the human-readable
    `<ts>  <address>  parent=...` shape instead.
    """
    if not TWIN_CLI.exists():
        return []
    from datetime import timedelta
    since = (datetime.now(timezone.utc) - timedelta(days=since_days)).strftime("%Y-%m-%dT%H:%M:%SZ")
    cmd = [str(TWIN_CLI), "--use-keychain", "index",
           "--class", class_, "--type", type_,
           "--since", since,
           "--limit", str(limit)]
    if ref_prefix:
        cmd.extend(["--ref-prefix", ref_prefix])
    try:
        r = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
        if r.returncode != 0:
            return []
        # Parse `<ts>  <address>  parent=<x>` lines into pseudo-records.
        # We don't have payload here — payload retrieval is via doc fetch
        # downstream; for the diff sweep we only need addresses.
        records = []
        for line in r.stdout.splitlines():
            line = line.strip()
            if not line:
                continue
            parts = line.split()
            if len(parts) < 2:
                continue
            ts = parts[0]
            addr = parts[1]
            if addr.startswith("arq://"):
                records.append({"address": addr, "issued_at": ts})
        return records
    except (subprocess.TimeoutExpired, OSError):
        return []


def _twin_doc_fetch(addr: str) -> dict | None:
    """Fetch a substrate act/doc payload from the addressing-service.

    Uses the HTTP /address/<addr> endpoint (peer-key auth not required for
    public reads). Returns the parsed payload dict or None.

    `twin doc fetch` only works for `arq://doc/...` addresses; for `arq://act/...`
    we hit the addressing-service directly. The endpoint returns a record
    where `payload_preview` is the JSON-stringified payload (truncated to
    ~500 chars by the substrate; sufficient for runtime_config_decided
    which carries small primitive values).
    """
    try:
        # The address itself is the URL path component
        slug = addr.replace("arq://", "")
        url = f"https://addressing.arqera.io/address/{slug}"
        req = urllib.request.Request(
            url, headers={"Accept": "application/json"},
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            raw = resp.read()
            if not raw:
                return None
            try:
                rec = json.loads(raw)
            except json.JSONDecodeError:
                return None
            # The record's payload_preview is the JSON-stringified payload.
            preview = rec.get("payload_preview")
            if isinstance(preview, str):
                try:
                    return json.loads(preview)
                except json.JSONDecodeError:
                    return None
            if isinstance(preview, dict):
                return preview
    except (urllib.error.URLError, urllib.error.HTTPError, OSError):
        pass
    return None


def _recall_decided(surface: str, field: str) -> tuple[dict | None, str | None]:
    """Read the most-recent runtime_config_decided act for (surface, field)
    via twin index. Returns (payload, addr) or (None, None) if no decision
    exists.

    Closes Path 3 — substrate IS the source of desired-state, not the
    --desired CLI flag. Adapter for resolver-of-record-v1.

    The addressing-service normalises slashes/dots in the act reference,
    so we cannot reconstruct (surface, field) from address alone. Strategy:
    fetch payload per address (time-DESC order) until we find one whose
    payload.surface == surface and payload.field == field. Capped at a
    bounded number of fetches per call.
    """
    records = _twin_index("act", "runtime_config_decided", limit=200)
    fetched = 0
    fetch_limit = 30
    for rec in records:
        if fetched >= fetch_limit:
            break
        addr = str(rec.get("address", ""))
        if not addr.startswith("arq://act/runtime_config_decided/"):
            continue
        payload = _twin_doc_fetch(addr)
        fetched += 1
        if not isinstance(payload, dict):
            continue
        if payload.get("surface") == surface and payload.get("field") == field:
            return payload, addr
    return None, None


def cmd_reconcile(args: argparse.Namespace) -> int:
    """Read the most-recent runtime_config_decided per (surface, field),
    compare to platform-observed, emit drift + apply on request.

    Phase 1 + Path 3 integration: --desired is now optional. If omitted,
    the verb queries substrate (`twin recall act_type=runtime_config_decided`)
    for the most-recent decision on (surface, field). If substrate has no
    decision, the verb refuses (substrate IS the source of truth — there
    is no operating policy without a decision).
    """
    if not args.field:
        print(
            "arq-config reconcile: --field required",
            file=sys.stderr,
        )
        return 2

    # Resolve desired: explicit flag wins; otherwise substrate-recall.
    desired_source = "flag"
    desired_addr = None
    if args.desired is not None:
        try:
            desired = json.loads(args.desired)
        except json.JSONDecodeError:
            desired = args.desired
    else:
        recalled, recalled_addr = _recall_decided(args.surface, args.field)
        if recalled is None:
            print(
                f"arq-config reconcile: no runtime_config_decided act on substrate "
                f"for {args.surface}.{args.field}; substrate IS the runtime config — "
                f"emit `arq-config decide` first or pass --desired explicitly",
                file=sys.stderr,
            )
            return 2
        desired = recalled.get("value")
        desired_addr = recalled_addr
        desired_source = "substrate"

    # Observe platform via the body-agnostic dispatcher
    st_obs, observed, err_obs = _observe_dispatch(
        args.surface, args.field,
        branch=args.branch, owner=args.owner, repo=args.repo,
    )
    if st_obs >= 400 or st_obs == 0:
        print(
            f"arq-config reconcile: failed to read platform; "
            f"status={st_obs} err={err_obs}",
            file=sys.stderr,
        )
        return 2

    drift_detected = observed != desired
    ts = _utc_ts()
    short_field = args.field

    # Always emit observation act (Frame D requires every reconcile to emit
    # evidence, drift or no-drift; subsequent SOC2/ISO queries scan these)
    if drift_detected:
        _emit_act(
            "act", "config_drift_observed", f"{args.surface}-{short_field}-{ts}",
            {
                "surface": args.surface,
                "field": short_field,
                "branch": args.branch,
                "desired": desired,
                "desired_source": desired_source,
                "decided_addr": desired_addr,
                "observed": observed,
                "drift_kind": "value_mismatch",
                "ts": ts,
                "principle": "substrate-is-the-runtime-config-v1",
            },
        )
        print(f"DRIFT: {args.surface}.{short_field}: desired={json.dumps(desired)} observed={json.dumps(observed)} (source={desired_source})")
    else:
        print(f"in-sync: {args.surface}.{short_field} = {json.dumps(observed)} (source={desired_source})")

    if not args.apply or not drift_detected:
        return 0

    # Apply: drift + --apply → dispatch to platform via body-agnostic dispatcher
    st_app, body_app, err_app = _apply_dispatch(
        args.surface, args.field, desired,
        branch=args.branch, owner=args.owner, repo=args.repo,
    )
    if 200 <= st_app < 300:
        # Re-observe to confirm
        st_after, observed_after, _ = _observe_dispatch(
            args.surface, args.field,
            branch=args.branch, owner=args.owner, repo=args.repo,
        )
        addr = _emit_act(
            "act", "config_reconciled", f"{args.surface}-{short_field}-{ts}",
            {
                "surface": args.surface,
                "field": short_field,
                "branch": args.branch,
                "desired": desired,
                "decided_addr": desired_addr,
                "observed_before": observed,
                "observed_after": observed_after,
                "http_status": st_app,
                "dispatch_outcome": "success",
                "ts": ts,
                "principle": "substrate-is-the-runtime-config-v1",
            },
        )
        print(f"reconciled: {addr}")
        print(f"  {args.surface}.{short_field} = {json.dumps(observed_after)} (was {json.dumps(observed)})")
        return 0
    else:
        _emit_act(
            "act", "config_reconcile_failed", f"{args.surface}-{short_field}-{ts}",
            {
                "surface": args.surface,
                "field": short_field,
                "branch": args.branch,
                "desired": desired,
                "decided_addr": desired_addr,
                "observed_before": observed,
                "http_status": st_app,
                "error": err_app,
                "ts": ts,
                "principle": "substrate-is-the-runtime-config-v1",
            },
        )
        print(
            f"reconcile failed: status={st_app} err={err_app}",
            file=sys.stderr,
        )
        return 2


def cmd_inspect(args: argparse.Namespace) -> int:
    """Report current platform state for a (surface, field) — body-agnostic
    via the dispatcher. Audit-grade read companion to `decide` per
    arq://doc/principle/mesh-diagnostic-surface-keeps-pace-with-action-surface-v1.
    """
    if not args.field:
        print("arq-config inspect: --field required", file=sys.stderr)
        return 2
    st, observed, err = _observe_dispatch(
        args.surface, args.field,
        branch=args.branch, owner=args.owner, repo=args.repo,
    )
    if st >= 400 or st == 0:
        print(f"inspect failed: status={st} err={err}", file=sys.stderr)
        return 2
    out = {
        "surface": args.surface,
        "field": args.field,
        "observed": observed,
        "ts": _utc_ts(),
    }
    if args.surface == "github":
        out["branch"] = args.branch
    print(json.dumps(out, indent=2))
    return 0


def cmd_diff(args: argparse.Namespace) -> int:
    """Sweep substrate-decided fields and report drift across all (surface,
    field) pairs that have a runtime_config_decided act. Substrate-vs-
    platform sweep — the audit-grade `what's drifted?` query.

    Closes Path 3: substrate IS the source of truth, this verb proves it
    by reading from substrate and comparing to N platforms in one pass.
    """
    records = _twin_index("act", "runtime_config_decided", limit=200)
    if not records:
        print(
            "arq-config diff: no runtime_config_decided acts indexed yet "
            "(or twin index unavailable)",
            file=sys.stderr,
        )
        return 0

    # The substrate addressing-service normalises slashes/dots in the act
    # reference, so reconstructing (surface, field) from the address loses
    # information for fields with slashes (e.g. k8s namespace/kind/name).
    # Authoritative source = the payload. Fetch each act's payload via the
    # /address/<slug> HTTP endpoint and key by (payload.surface, payload.field).
    # First-fetch-per-key wins (records are time-DESC).
    seen: dict[tuple[str, str], tuple[str, dict]] = {}
    for rec in records:
        addr = str(rec.get("address", ""))
        if not addr.startswith("arq://act/runtime_config_decided/"):
            continue
        payload = _twin_doc_fetch(addr)
        if not isinstance(payload, dict):
            continue
        surface = payload.get("surface")
        field = payload.get("field")
        if not (isinstance(surface, str) and isinstance(field, str)):
            continue
        key = (surface, field)
        if key in seen:
            continue
        seen[key] = (addr, payload)

    if not seen:
        print("no runtime_config_decided acts on substrate yet", file=sys.stderr)
        return 0

    print(f"# substrate-vs-platform sweep — {len(seen)} fields under substrate decision")
    drift_count = 0
    insync_count = 0
    error_count = 0
    for (surface, field), (addr, payload) in sorted(seen.items()):
        desired = payload.get("value")
        st, observed, err = _observe_dispatch(
            surface, field,
            branch=args.branch, owner=args.owner, repo=args.repo,
        )
        if st >= 400 or st == 0:
            print(f"  !  {surface}.{field}  observe-failed: status={st} {err}")
            error_count += 1
            continue
        if observed == desired:
            print(f"  ✓  {surface}.{field} = {json.dumps(observed)}")
            insync_count += 1
        else:
            print(f"  ✗  {surface}.{field}  desired={json.dumps(desired)}  observed={json.dumps(observed)}")
            drift_count += 1
    print(f"# total: {insync_count} in-sync, {drift_count} drift, {error_count} error")
    return 0 if drift_count == 0 else 1


def main() -> int:
    parser = argparse.ArgumentParser(
        prog="arq-config",
        description="Substrate-decided runtime configuration — Frame D of "
        "merge-decision-on-substrate-v1; implements substrate-is-the-runtime-config-v1.",
    )
    sub = parser.add_subparsers(dest="verb", required=True)

    def add_repo_args(p: argparse.ArgumentParser) -> None:
        p.add_argument("--owner", default=DEFAULT_OWNER)
        p.add_argument("--repo", default=DEFAULT_REPO)

    p_decide = sub.add_parser(
        "decide",
        help="emit runtime_config_decided act for a (surface, field) — "
             "this is the substrate-side declaration of desired state",
    )
    p_decide.add_argument(
        "surface", help="e.g. 'github', 'cloudflare', 'k8s', 'sentry'",
    )
    p_decide.add_argument(
        "field", help="dotted field path, e.g. 'branch-protection.enforce_admins'",
    )
    p_decide.add_argument(
        "value", help="desired value; JSON-parsed if possible, else string",
    )
    p_decide.add_argument(
        "--scope", default=None,
        help="global | tenant:<id> | pr:<num> | request:<id> | until:<iso8601>",
    )
    p_decide.add_argument("--reason", default=None)
    p_decide.add_argument(
        "--paired-peer", default=None,
        help="for high-stakes fields: name a paired-peer attestation "
             "(e.g. 'gashiru cross-approved PR #3521 head_sha 9853dc983')",
    )
    p_decide.add_argument("--valid-until", default=None, help="ISO-8601")
    p_decide.set_defaults(func=cmd_decide)

    p_reconcile = sub.add_parser(
        "reconcile",
        help="read substrate's most-recent decided value, compare to "
             "platform-observed, emit drift + apply on request",
    )
    p_reconcile.add_argument("surface")
    p_reconcile.add_argument(
        "--field", required=True,
        help="dotted field path, e.g. 'branch-protection.enforce_admins'",
    )
    p_reconcile.add_argument(
        "--desired", default=None,
        help="Phase-1: provide the desired value explicitly (JSON or string). "
             "Phase-2: dropped once twin recall serves runtime_config_decided.",
    )
    p_reconcile.add_argument(
        "--apply", action="store_true",
        help="dispatch to platform if drift detected; without --apply "
             "this is a dry-run that reports drift only",
    )
    p_reconcile.add_argument("--branch", default="main")
    add_repo_args(p_reconcile)
    p_reconcile.set_defaults(func=cmd_reconcile)

    p_inspect = sub.add_parser(
        "inspect",
        help="read current platform state — audit-grade read companion to "
             "`decide` per mesh-diagnostic-surface-keeps-pace-with-action-surface-v1",
    )
    p_inspect.add_argument("surface")
    p_inspect.add_argument("--field", required=True)
    p_inspect.add_argument("--branch", default="main")
    add_repo_args(p_inspect)
    p_inspect.set_defaults(func=cmd_inspect)

    p_diff = sub.add_parser(
        "diff",
        help="substrate-vs-platform sweep: list every (surface, field) "
             "with a runtime_config_decided act and report drift across all",
    )
    p_diff.add_argument("--branch", default="main")
    add_repo_args(p_diff)
    p_diff.set_defaults(func=cmd_diff)

    args = parser.parse_args()
    return args.func(args)


if __name__ == "__main__":
    sys.exit(main())
