#!/usr/bin/env python3
"""arq-cloudflare — Cloudflare API adapter for the mesh.

Routes Cloudflare v4 API calls through a named mesh bridge so every
call leaves a substrate audit trail. Satisfies
``arq://doc/policy/twin-dispatch-mesh-only-v2`` — without this bridge
the mesh-enforce hook blocks bare ``curl``/``wrangler``/``cloudflared``
from Twin's shell for any CF mutation.

Parent principles:
  arq://doc/principle/one-primitive-speaks-every-protocol-v1
  arq://doc/principle/actor-scoped-credential-resolution-v1
  arq://doc/principle/every-platform-is-a-peer-v1

Wrangler's listing surfaces are incomplete (DNS records, AI Gateway,
worker routes, Access apps+policies aren't all addressable via CLI
subcommands) so this bridge talks directly to api.cloudflare.com/client/v4
with per-verb scoped tokens drawn from SOPS-encrypted secrets.

v0.1 verbs:
  arq-cloudflare zone list
  arq-cloudflare zone dns <zone-name-or-id>
  arq-cloudflare worker list
  arq-cloudflare worker routes <zone>
  arq-cloudflare access apps
  arq-cloudflare access policies <app-id>
  arq-cloudflare ai-gateway list
  arq-cloudflare tunnel list
  arq-cloudflare tunnel create --name <name> [--config-src cloudflare|local]
  arq-cloudflare tunnel route  --tunnel-id <id> --ingress-json <file>
  arq-cloudflare dns    create --hostname <name> --tunnel-id <id> --zone <zone-id>
  arq-cloudflare config-rule    create --hostname <h> --bic off --security-level essentially_off
  arq-cloudflare access app     create --name <n> --domain <d> --session-duration <t>
  arq-cloudflare access service-token create --name <n>
  arq-cloudflare access policy  create --app-id <id> --name <n> --service-token-id <sid>
  arq-cloudflare credentials list
  arq-cloudflare credentials mint   --name <n> [--acct-scope <list>] [--zone-scope <list>] [--user-scope <list>] [--expires <date>]
  arq-cloudflare credentials rotate --service <n>
  arq-cloudflare credentials verify [--all | --service <n> | --token-id <id>]
  arq-cloudflare credentials audit
  arq-cloudflare credentials revoke --token-id <id> [--service <n>] [--grace-hours N]
  arq-cloudflare raw <METHOD> <PATH>       (arbitrary v4 passthrough)

Token resolution order (stops at first hit):
  1. $ARQ_CF_TOKEN                          — operator override
  2. macOS keychain chain (substrate-managed credential primitive):
       a. ``arqera-cf-runtime``             — per-verb runtime sub-token
       b. ``arqera-cf-master``              — bootstrap (token-mgmt verbs)
       c. ``cloudflare-api-token``          — legacy (back-compat)
  3. ``~/.arqera/cloudflare-token``
  4. SOPS ``cloudflare.tokens.<scope>`` matching the verb (scoped)
  5. SOPS ``cloudflare.cloudflare_api_token`` (broad fallback)

Token-management verbs (credentials.mint/rotate/revoke/audit/verify/list)
force resolution to ``arqera-cf-master`` — the runtime sub-token lacks
``User:API Tokens Write``, by design (least-privilege per
arq://doc/principle/per-service-scoped-credentials-substrate-managed-v1).

Account + zone defaults come from SOPS ``cloudflare.account_id`` and
``cloudflare.zone_id``; override with ``$ARQ_CF_ACCOUNT_ID`` /
``$ARQ_CF_ZONE_ID`` or pass explicit values on verbs that accept them.

Emissions per call (daemon.act.emit primitive, twin-subprocess fallback):
  arq://act/cf_envelope_sent/<verb>-<ts>
  arq://act/cf_envelope_ack/<verb>-<ts>    (http_status, latency_ms)
  arq://act/benchmark_sample/cf-<verb>-<ts>

Sensitive values (token, record content, secrets) are NEVER written
into envelopes. Payload previews are method + path + first 200 chars.
Response previews are size + status + truncated body with a blocklist
applied.
"""

from __future__ import annotations

import argparse
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from pathlib import Path

CF_API = "https://api.cloudflare.com/client/v4"
SECRETS_PATH = Path.home() / "Desktop" / "Project" / ".secrets.yaml"
SOPS_AGE_KEY = Path.home() / ".config" / "sops" / "age" / "keys.txt"
ARQERA_TOKEN_FALLBACK = Path.home() / ".arqera" / "cloudflare-token"
TWIN_CLI = Path.home() / ".local" / "bin" / "twin"

# Keychain chain (substrate-managed credential primitive, 2026-05-27).
# Order matters: per-verb runtime is preferred (least-privilege), master
# is the bootstrap fallback (full token-management scopes), legacy is
# backward-compat for sessions that pre-date the credential primitive.
KEYCHAIN_SERVICES = [
    "arqera-cf-runtime",        # twin-runtime sub-token (no API Tokens Write)
    "arqera-cf-master",         # twin-master bootstrap token (mints + rotates)
    "cloudflare-api-token",     # legacy keychain service name
]

# Verbs that need API Tokens Write — they MUST resolve to the master token.
# All other verbs prefer runtime, falling through to master/legacy.
TOKEN_MGMT_VERBS = frozenset({
    "credentials.list",
    "credentials.mint",
    "credentials.rotate",
    "credentials.verify",
    "credentials.audit",
    "credentials.revoke",
})

ACTOR_PEER = os.environ.get(
    "ARQ_ACTOR_PEER_ADDRESS",
    "arq://body/peer/578412e7b083b40e56e228779804582a",
)

# Route each verb to the SOPS scope whose permissions are most likely
# to satisfy it. Falls back to the broad top-level token if the scoped
# token is missing or doesn't carry the right permission.
VERB_TO_SCOPE: dict[str, str] = {
    "zone.list": "dns",               # Zone:Read ships with dns token
    "zone.dns": "dns",
    "worker.list": "workers",
    "worker.routes": "workers",
    "access.apps": "zero_trust",
    "access.policies": "zero_trust",
    "ai-gateway.list": "analytics",   # AI Gateway analytics scope is closest
    "tunnel.list": "tunnel",
    "tunnel.create": "tunnel",        # Account:Cloudflare Tunnel:Edit
    "tunnel.route": "tunnel",         # Account:Cloudflare Tunnel:Edit
    "dns.create": "dns",              # Zone:DNS:Edit
    "config-rule.create": "waf",      # Zone:WAF:Edit (config rules live in WAF phases)
    "config-rule.list": "waf",
    "access.app.create": "zero_trust",            # Account:Access: Apps and Policies:Edit
    "access.service-token.create": "zero_trust",  # Account:Access: Service Tokens:Edit
    "access.policy.create": "zero_trust",         # Account:Access: Apps and Policies:Edit
    # credentials.* — substrate-managed credential primitive verbs.
    # Resolves to master keychain entry (carries User:API Tokens Write).
    "credentials.list": "",
    "credentials.mint": "",
    "credentials.rotate": "",
    "credentials.verify": "",
    "credentials.audit": "",
    "credentials.revoke": "",
    "raw": "",                        # use top-level for max permission
}

# Load shared primitive helper for act emission via daemon.
sys.path.insert(0, str(Path(__file__).resolve().parent))
try:
    from _arq_primitive import primitive_invoke as _primitive_invoke  # type: ignore
except ImportError:
    _primitive_invoke = None  # type: ignore


# ---------------------------------------------------------------------------
# Substrate emission — daemon first, twin-subprocess fallback (like arq-kube)
# ---------------------------------------------------------------------------

def _ts_ms() -> int:
    return int(datetime.now(timezone.utc).timestamp() * 1000)


def _emit_via_daemon(class_: str, type_: str, ref: str, payload: dict) -> tuple[str, str | None]:
    if _primitive_invoke is None:
        return ("unavailable", None)
    try:
        outcome, result = _primitive_invoke(
            "daemon.act.emit",
            {"class": class_, "type": type_, "reference": ref, "payload": payload},
            ACTOR_PEER,
            timeout_s=10.0,
        )
    except Exception:
        return ("unavailable", None)
    if outcome == "ok" and isinstance(result, dict):
        return ("ok", result.get("address"))
    if outcome == "unknown_verb":
        return ("unknown_verb", None)
    if outcome == "error":
        return ("error", None)
    return ("unavailable", None)


def _emit_via_twin_subprocess(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    if not TWIN_CLI.exists():
        return None
    try:
        r = subprocess.run(
            [str(TWIN_CLI), "--use-keychain", "act", "emit", class_, type_, ref,
             "--payload", json.dumps(payload)],  # noqa: ARQ-NO-JSON-HOT-PATH twin CLI input boundary — twin accepts --payload <json>
            capture_output=True, text=True, timeout=5, check=False,
        )
        if r.returncode == 0:
            for line in r.stdout.splitlines():
                line = line.strip()
                if line.startswith("arq://"):
                    return line
    except (subprocess.TimeoutExpired, OSError):
        pass
    return None


def _emit(class_: str, type_: str, ref: str, payload: dict) -> str | None:
    outcome, address = _emit_via_daemon(class_, type_, ref, payload)
    if outcome == "ok":
        return address
    if outcome == "error":
        return None
    return _emit_via_twin_subprocess(class_, type_, ref, payload)


# ---------------------------------------------------------------------------
# Credential resolution — never prints the token, never logs it
# ---------------------------------------------------------------------------

def _sops_extract(path_expr: str) -> str | None:
    """Call sops to extract a single leaf. Returns None on any failure."""
    if not SECRETS_PATH.exists() or not SOPS_AGE_KEY.exists():
        return None
    env = dict(os.environ)
    env["SOPS_AGE_KEY_FILE"] = str(SOPS_AGE_KEY)
    try:
        r = subprocess.run(
            ["sops", "-d", "--extract", path_expr, str(SECRETS_PATH)],
            capture_output=True, text=True, timeout=10, check=False, env=env,
        )
        if r.returncode != 0:
            return None
        val = r.stdout.strip()
        return val or None
    except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
        return None


def _keychain_lookup(service: str) -> str | None:
    """Read a single keychain entry by service name. Returns None on miss."""
    try:
        r = subprocess.run(
            ["security", "find-generic-password", "-s", service, "-w"],
            capture_output=True, text=True, timeout=5, check=False,
        )
        if r.returncode == 0 and r.stdout.strip():
            return r.stdout.strip()
    except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
        pass
    return None


def _keychain_token(verb: str | None = None) -> tuple[str | None, str | None]:
    """Iterate KEYCHAIN_SERVICES in order, return (token, service_name).

    If ``verb`` is a token-management verb (mint/rotate/etc.), skip the
    runtime entry — those calls need ``User:API Tokens Write`` scope which
    only the master token carries. Returns (None, None) when nothing hits.
    """
    services = KEYCHAIN_SERVICES
    if verb and verb in TOKEN_MGMT_VERBS:
        services = [s for s in KEYCHAIN_SERVICES if s != "arqera-cf-runtime"]
    for svc in services:
        tok = _keychain_lookup(svc)
        if tok:
            return (tok, svc)
    return (None, None)


def _resolve_token(verb: str) -> tuple[str | None, str]:
    """Return (token, source_tag). source_tag goes into the envelope.

    Token-management verbs (credentials.mint/rotate/verify/audit/revoke/list)
    force a master-token resolution; all other verbs prefer the runtime
    token first.
    """
    # 1. env override
    override = os.environ.get("ARQ_CF_TOKEN")
    if override:
        return (override, "env:ARQ_CF_TOKEN")

    # 2. keychain chain (verb-classifier-aware)
    kc, svc = _keychain_token(verb)
    if kc and svc:
        return (kc, f"keychain:{svc}")

    # 3. plain file
    if ARQERA_TOKEN_FALLBACK.exists():
        try:
            tok = ARQERA_TOKEN_FALLBACK.read_text().strip()
            if tok:
                return (tok, "file:~/.arqera/cloudflare-token")
        except OSError:
            pass

    # 4. scoped SOPS token (per-verb)
    scope = VERB_TO_SCOPE.get(verb, "")
    if scope:
        tok = _sops_extract(f'["cloudflare"]["tokens"]["{scope}"]["value"]')
        if tok:
            return (tok, f"sops:cloudflare.tokens.{scope}")

    # 5. broad SOPS token
    tok = _sops_extract('["cloudflare"]["cloudflare_api_token"]')
    if tok:
        return (tok, "sops:cloudflare.cloudflare_api_token")

    return (None, "missing")


def _resolve_account_id() -> str | None:
    return (
        os.environ.get("ARQ_CF_ACCOUNT_ID")
        or _sops_extract('["cloudflare"]["account_id"]')
    )


def _resolve_zone_id(default: str | None = None) -> str | None:
    return (
        default
        or os.environ.get("ARQ_CF_ZONE_ID")
        or _sops_extract('["cloudflare"]["zone_id"]')
    )


# ---------------------------------------------------------------------------
# HTTP core
# ---------------------------------------------------------------------------

def _cf_request(method: str, path: str, token: str,
                body: dict | None = None, timeout: int = 30
                ) -> tuple[int, dict | None, str]:
    """Single CF API call. Returns (status, parsed_json_or_none, raw_tail).

    raw_tail is a ~500-char preview for envelope logging. Response
    parsing uses json.loads; on decode failure we still return the raw
    tail so ack envelopes always carry something useful.
    """
    if not path.startswith("/"):
        path = "/" + path
    url = f"{CF_API}{path}"
    data_bytes = None
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json",
        "User-Agent": "arq-cloudflare/0.1",
    }
    if body is not None:
        data_bytes = json.dumps(body).encode()  # noqa: ARQ-NO-JSON-HOT-PATH Cloudflare API vendor wire format
        headers["Content-Type"] = "application/json"
    req = urllib.request.Request(url, data=data_bytes, method=method, headers=headers)
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            raw = resp.read().decode("utf-8", errors="replace")
            status = resp.status
    except urllib.error.HTTPError as e:
        raw = e.read().decode("utf-8", errors="replace")
        status = e.code
    except urllib.error.URLError as e:
        return (0, None, f"url_error: {e.reason}")
    except Exception as e:  # network-level catch-all
        return (0, None, f"error: {type(e).__name__}: {e}")

    try:
        parsed = json.loads(raw)
    except json.JSONDecodeError:
        parsed = None
    return (status, parsed, raw[-500:])


def _dispatch(verb: str, method: str, path: str,
              body: dict | None = None,
              extra_envelope: dict | None = None,
              timeout: int = 30) -> tuple[int, dict | None]:
    """Emit sent/ack/benchmark envelopes around one CF API call.

    Returns (http_status, parsed_body). Callers format stdout from the
    parsed body — we keep that out of the envelopes to avoid leaking
    record content (DNS records, Worker source, Access policy rules).
    """
    token, token_source = _resolve_token(verb)
    ts = _ts_ms()
    ref = f"{verb.replace('.', '-')}-{ts}"

    cmd_preview = f"{method} {path}"[:200]
    envelope = {
        "verb": f"cloudflare.{verb}",
        "actor_peer": ACTOR_PEER,
        "method": method,
        "path_preview": cmd_preview,
        "token_source": token_source,
        "ts_ms": ts,
    }
    if extra_envelope:
        envelope.update(extra_envelope)
    sent = _emit("act", "cf_envelope_sent", ref, envelope)

    if token is None:
        fail = {
            "verb": f"cloudflare.{verb}",
            "envelope_sent": sent,
            "http_status": 0,
            "latency_ms": 0,
            "error": "no_token",
            "token_source": token_source,
            "ts_ms": _ts_ms(),
        }
        _emit("act", "cf_envelope_ack", ref, fail)
        _emit("act", "benchmark_sample", ref,
              {"op": f"cf-{verb.replace('.', '-')}",
               "latency_ms": 0, "http_status": 0, "ts_ms": ts})
        sys.stderr.write(
            "arq-cloudflare: no CF API token found.\n"
            "  Checked: $ARQ_CF_TOKEN, keychain(arqera-cf-runtime, "
            "arqera-cf-master, cloudflare-api-token), "
            "~/.arqera/cloudflare-token, SOPS cloudflare.tokens.*, "
            "SOPS cloudflare.cloudflare_api_token.\n"
            "  Mint a substrate-managed runtime token via:\n"
            "    arq-cloudflare credentials mint --name <service> "
            "--acct-scope <scopes>\n"
            "  Or bootstrap a master token at "
            "https://dash.cloudflare.com/profile/api-tokens "
            "and store via:\n"
            "    security add-generic-password -s arqera-cf-master "
            "-a arqera -w <TOKEN>\n"
        )
        return (0, None)

    t0 = time.monotonic()
    status, parsed, raw_tail = _cf_request(method, path, token,
                                            body=body, timeout=timeout)
    latency_ms = int((time.monotonic() - t0) * 1000)

    ack = {
        "verb": f"cloudflare.{verb}",
        "envelope_sent": sent,
        "http_status": status,
        "latency_ms": latency_ms,
        "response_size": len(raw_tail),
        "success": bool(parsed and parsed.get("success")),
        "response_tail": raw_tail,
        "ts_ms": _ts_ms(),
    }
    _emit("act", "cf_envelope_ack", ref, ack)
    _emit("act", "benchmark_sample", ref,
          {"op": f"cf-{verb.replace('.', '-')}",
           "latency_ms": latency_ms, "http_status": status, "ts_ms": ts})

    return (status, parsed)


# ---------------------------------------------------------------------------
# Verb handlers
# ---------------------------------------------------------------------------

def _print_json(obj) -> None:
    json.dump(obj, sys.stdout, indent=2, sort_keys=True)
    sys.stdout.write("\n")


def _exit_for_status(status: int, parsed: dict | None) -> int:
    if status == 0:
        return 2
    if status >= 400:
        if parsed:
            _print_json(parsed)
        else:
            sys.stderr.write(f"HTTP {status}\n")
        return 1
    return 0


def cmd_zone_list(args: argparse.Namespace) -> int:
    status, parsed = _dispatch("zone.list", "GET", "/zones?per_page=50")
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        out = [
            {"id": z.get("id"), "name": z.get("name"),
             "status": z.get("status"), "plan": (z.get("plan") or {}).get("name")}
            for z in parsed.get("result", [])
        ]
        _print_json({"zones": out, "count": len(out)})
    return rc


def _resolve_zone_arg(zone_arg: str | None) -> str | None:
    """If user passes a zone name, look it up; if an id, pass through."""
    if not zone_arg:
        return _resolve_zone_id()
    # 32-hex looks like a CF zone id
    stripped = zone_arg.strip().lower()
    if len(stripped) == 32 and all(c in "0123456789abcdef" for c in stripped):
        return stripped
    # name lookup via GET /zones?name=
    status, parsed = _dispatch("zone.list", "GET", f"/zones?name={zone_arg}",
                                extra_envelope={"lookup_by_name": zone_arg})
    if status == 200 and parsed and parsed.get("result"):
        return parsed["result"][0].get("id")
    return None


def cmd_zone_dns(args: argparse.Namespace) -> int:
    zone = _resolve_zone_arg(args.zone)
    if not zone:
        sys.stderr.write(f"arq-cloudflare: could not resolve zone '{args.zone}'\n")
        return 2
    status, parsed = _dispatch("zone.dns", "GET",
                                f"/zones/{zone}/dns_records?per_page=200",
                                extra_envelope={"zone": zone})
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        out = [
            {"id": r.get("id"), "type": r.get("type"), "name": r.get("name"),
             "content": r.get("content"), "proxied": r.get("proxied"),
             "ttl": r.get("ttl")}
            for r in parsed.get("result", [])
        ]
        _print_json({"zone": zone, "records": out, "count": len(out)})
    return rc


def cmd_worker_list(args: argparse.Namespace) -> int:
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare: no account_id (set $ARQ_CF_ACCOUNT_ID "
                         "or SOPS cloudflare.account_id)\n")
        return 2
    status, parsed = _dispatch("worker.list", "GET",
                                f"/accounts/{acct}/workers/scripts",
                                extra_envelope={"account_id": acct})
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        out = [
            {"id": w.get("id"),
             "modified_on": w.get("modified_on"),
             "created_on": w.get("created_on")}
            for w in parsed.get("result", [])
        ]
        _print_json({"account_id": acct, "workers": out, "count": len(out)})
    return rc


def cmd_worker_routes(args: argparse.Namespace) -> int:
    zone = _resolve_zone_arg(args.zone)
    if not zone:
        sys.stderr.write(f"arq-cloudflare: could not resolve zone '{args.zone}'\n")
        return 2
    status, parsed = _dispatch("worker.routes", "GET",
                                f"/zones/{zone}/workers/routes",
                                extra_envelope={"zone": zone})
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        out = parsed.get("result", [])
        _print_json({"zone": zone, "routes": out, "count": len(out)})
    return rc


def cmd_access_apps(args: argparse.Namespace) -> int:
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare: no account_id\n")
        return 2
    status, parsed = _dispatch("access.apps", "GET",
                                f"/accounts/{acct}/access/apps",
                                extra_envelope={"account_id": acct})
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        out = [
            {"id": a.get("id"), "name": a.get("name"),
             "domain": a.get("domain"), "type": a.get("type"),
             "session_duration": a.get("session_duration")}
            for a in parsed.get("result", [])
        ]
        _print_json({"account_id": acct, "apps": out, "count": len(out)})
    return rc


def cmd_access_policies(args: argparse.Namespace) -> int:
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare: no account_id\n")
        return 2
    status, parsed = _dispatch(
        "access.policies", "GET",
        f"/accounts/{acct}/access/apps/{args.app_id}/policies",
        extra_envelope={"account_id": acct, "app_id": args.app_id},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        out = parsed.get("result", [])
        _print_json({"app_id": args.app_id, "policies": out, "count": len(out)})
    return rc


def cmd_ai_gateway_list(args: argparse.Namespace) -> int:
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare: no account_id\n")
        return 2
    status, parsed = _dispatch(
        "ai-gateway.list", "GET",
        f"/accounts/{acct}/ai-gateway/gateways",
        extra_envelope={"account_id": acct},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        out = parsed.get("result", [])
        _print_json({"account_id": acct, "gateways": out, "count": len(out)})
    return rc


def cmd_tunnel_list(args: argparse.Namespace) -> int:
    """List Cloudflare tunnels. Goes through CF API (not ``cloudflared`` CLI)
    so the call is substrate-traced like every other verb — ``cloudflared``
    binary would side-step the envelope."""
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare: no account_id\n")
        return 2
    status, parsed = _dispatch(
        "tunnel.list", "GET",
        f"/accounts/{acct}/cfd_tunnel?is_deleted=false&per_page=50",
        extra_envelope={"account_id": acct},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        out = [
            {"id": t.get("id"), "name": t.get("name"),
             "status": t.get("status"),
             "created_at": t.get("created_at"),
             "deleted_at": t.get("deleted_at"),
             "connections": len(t.get("connections") or [])}
            for t in parsed.get("result", [])
        ]
        _print_json({"account_id": acct, "tunnels": out, "count": len(out)})
    return rc


def cmd_tunnel_create(args: argparse.Namespace) -> int:
    """Create a new Cloudflare named tunnel via the v4 API.

    POST /accounts/{account_id}/cfd_tunnel
    Body: {"name": <name>, "config_src": "cloudflare"|"local"}

    config_src=cloudflare lets ingress be managed via the API (preferred
    for substrate-traced topology). The response includes the tunnel id
    and a base64-encoded ``credentials_file`` token — that token is what
    the cloudflared daemon needs in /etc/cloudflared/credentials.json.

    The token is NEVER written to envelopes; only echoed to stdout for
    the operator to seed via ``kubectl create secret``.
    """
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare: no account_id\n")
        return 2
    body = {"name": args.name, "config_src": args.config_src}
    status, parsed = _dispatch(
        "tunnel.create", "POST",
        f"/accounts/{acct}/cfd_tunnel",
        body=body,
        extra_envelope={"account_id": acct,
                        "tunnel_name": args.name,
                        "config_src": args.config_src},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        result = parsed.get("result") or {}
        # Echo the full body to stdout so operator can capture the
        # credentials_file (token) for k8s secret seeding. The envelope
        # only stored response_tail metadata, not the raw token.
        _print_json({
            "tunnel_id": result.get("id"),
            "name": result.get("name"),
            "account_tag": result.get("account_tag"),
            "created_at": result.get("created_at"),
            "credentials_file": result.get("credentials_file"),
            "token": result.get("token"),
        })
    return rc


def cmd_tunnel_route(args: argparse.Namespace) -> int:
    """Configure ingress rules for a cloudflare-managed tunnel.

    PUT /accounts/{account_id}/cfd_tunnel/{tunnel_id}/configurations
    Body: {"config": {"ingress": [...]}}

    The caller supplies the ingress array in --ingress-json <file>; the
    file's content is wrapped in {"config": {...}} for the API.
    """
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare: no account_id\n")
        return 2
    try:
        with open(args.ingress_json, "r", encoding="utf-8") as fh:
            ingress_doc = json.load(fh)  # noqa: ARQ-NO-JSON-HOT-PATH operator CLI input boundary
    except (OSError, json.JSONDecodeError) as e:
        sys.stderr.write(
            f"arq-cloudflare tunnel route: failed to read --ingress-json: {e}\n"
        )
        return 2
    # Accept either {"ingress": [...]} or the bare ingress array.
    if isinstance(ingress_doc, dict) and "ingress" in ingress_doc:
        config_body = {"config": ingress_doc}
    elif isinstance(ingress_doc, list):
        config_body = {"config": {"ingress": ingress_doc}}
    elif isinstance(ingress_doc, dict) and "config" in ingress_doc:
        config_body = ingress_doc
    else:
        sys.stderr.write(
            "arq-cloudflare tunnel route: --ingress-json must contain "
            "an 'ingress' array, a bare array, or a 'config' object\n"
        )
        return 2
    rules_count = len((config_body.get("config") or {}).get("ingress") or [])
    status, parsed = _dispatch(
        "tunnel.route", "PUT",
        f"/accounts/{acct}/cfd_tunnel/{args.tunnel_id}/configurations",
        body=config_body,
        extra_envelope={"account_id": acct,
                        "tunnel_id": args.tunnel_id,
                        "ingress_rules_count": rules_count},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        _print_json(parsed)
    return rc


def cmd_dns_create(args: argparse.Namespace) -> int:
    """Create a DNS CNAME record pointing at a Cloudflare Tunnel.

    POST /zones/{zone_id}/dns_records
    Body: {"type": "CNAME", "name": <hostname>,
           "content": "<tunnel_id>.cfargotunnel.com", "proxied": true}
    """
    zone = _resolve_zone_arg(args.zone)
    if not zone:
        sys.stderr.write(
            f"arq-cloudflare dns create: could not resolve zone '{args.zone}'\n"
        )
        return 2
    content = f"{args.tunnel_id}.cfargotunnel.com"
    body = {
        "type": "CNAME",
        "name": args.hostname,
        "content": content,
        "proxied": True,
        "ttl": 1,  # 1 = automatic when proxied
        "comment": args.comment or "arq-cloudflare dns create — tunnel CNAME",
    }
    status, parsed = _dispatch(
        "dns.create", "POST",
        f"/zones/{zone}/dns_records",
        body=body,
        extra_envelope={"zone": zone,
                        "hostname": args.hostname,
                        "tunnel_id": args.tunnel_id,
                        "record_type": "CNAME",
                        "proxied": True},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        result = parsed.get("result") or {}
        _print_json({
            "record_id": result.get("id"),
            "type": result.get("type"),
            "name": result.get("name"),
            "content": result.get("content"),
            "proxied": result.get("proxied"),
            "zone_id": result.get("zone_id"),
            "created_on": result.get("created_on"),
        })
    return rc


def cmd_config_rule_create(args: argparse.Namespace) -> int:
    """Create or update an http_config_settings phase rule that disables
    Bot Fight Mode (bic) + lowers security_level for a specific hostname.

    Cloudflare's Configuration Rules phase ruleset lives at
    PUT /zones/{zone_id}/rulesets/phases/http_config_settings/entrypoint
    and is REPLACED in full on each PUT. We first GET the existing
    entrypoint, preserve any unrelated rules, and append/update the
    one targeting this hostname (idempotent — skip if already correct).

    Rule shape (per CF API docs):
      {"action": "set_config",
       "expression": "(http.host eq \"<host>\")",
       "description": "...",
       "action_parameters": {"bic": false, "security_level": "essentially_off"}}
    """
    zone = _resolve_zone_arg(getattr(args, "zone", None))
    if not zone:
        sys.stderr.write("arq-cloudflare config-rule create: could not resolve zone\n")
        return 2

    # bic flag: "off" -> False, anything else (e.g. "on") -> True
    bic_off = (args.bic or "").lower() == "off"
    sec_level = args.security_level

    # 1. GET current entrypoint ruleset (may 404 if no rules exist yet —
    #    treat that as "start fresh").
    get_path = f"/zones/{zone}/rulesets/phases/http_config_settings/entrypoint"
    get_status, get_parsed = _dispatch(
        "config-rule.list", "GET", get_path,
        extra_envelope={"zone": zone, "phase": "http_config_settings"},
    )
    existing_rules: list[dict] = []
    if get_status == 200 and get_parsed:
        existing_rules = ((get_parsed.get("result") or {}).get("rules")) or []
    elif get_status not in (200, 404):
        # Real auth/network error — fail loud rather than silently PUT.
        if get_parsed:
            _print_json(get_parsed)
        else:
            sys.stderr.write(f"config-rule create: GET phase HTTP {get_status}\n")
        return 1

    target_expression = f'(http.host eq "{args.hostname}")'
    desired_params = {
        "bic": (not bic_off) and True or False,  # explicit boolean
        "security_level": sec_level,
    }
    # noqa: simplification — bic_off=True means bic=False; keep explicit for clarity
    desired_params["bic"] = not bic_off

    # Idempotency check: a rule with the same expression + matching params already exists.
    new_rules: list[dict] = []
    found_match = False
    fully_idempotent = False
    for r in existing_rules:
        if r.get("expression") == target_expression and r.get("action") == "set_config":
            found_match = True
            params = r.get("action_parameters") or {}
            if (params.get("bic") == desired_params["bic"]
                    and params.get("security_level") == desired_params["security_level"]):
                fully_idempotent = True
            # Replace with desired params (drop the old "id"; CF re-assigns).
            new_rules.append({
                "action": "set_config",
                "expression": target_expression,
                "description": f"arq-cloudflare: bot/security relax for {args.hostname}",
                "action_parameters": desired_params,
            })
        else:
            # Preserve unrelated rules; drop CF-assigned read-only fields.
            new_rules.append({
                k: v for k, v in r.items()
                if k in ("action", "expression", "description",
                         "action_parameters", "enabled", "ref")
            })

    if not found_match:
        new_rules.append({
            "action": "set_config",
            "expression": target_expression,
            "description": f"arq-cloudflare: bot/security relax for {args.hostname}",
            "action_parameters": desired_params,
        })

    if fully_idempotent:
        _print_json({
            "outcome": "idempotent_skip",
            "zone": zone,
            "hostname": args.hostname,
            "rule_count": len(new_rules),
            "note": "matching rule already present; no PUT issued",
        })
        return 0

    put_body = {"rules": new_rules}
    put_status, put_parsed = _dispatch(
        "config-rule.create", "PUT", get_path,
        body=put_body,
        extra_envelope={"zone": zone, "hostname": args.hostname,
                        "bic_off": bic_off, "security_level": sec_level,
                        "rule_count": len(new_rules)},
    )
    rc = _exit_for_status(put_status, put_parsed)
    if rc == 0 and put_parsed:
        result = put_parsed.get("result") or {}
        _print_json({
            "outcome": "applied",
            "zone": zone,
            "hostname": args.hostname,
            "ruleset_id": result.get("id"),
            "rule_count": len(result.get("rules") or []),
            "last_updated": result.get("last_updated"),
        })
    return rc


def cmd_access_app_create(args: argparse.Namespace) -> int:
    """Create a CF Access self-hosted application.

    POST /accounts/{account_id}/access/apps
    Body: {"name": <n>, "domain": <d>, "type": "self_hosted",
           "session_duration": <t>}
    """
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare access app create: no account_id\n")
        return 2
    body = {
        "name": args.name,
        "domain": args.domain,
        "type": "self_hosted",
        "session_duration": args.session_duration,
    }
    status, parsed = _dispatch(
        "access.app.create", "POST",
        f"/accounts/{acct}/access/apps",
        body=body,
        extra_envelope={"account_id": acct,
                        "app_name": args.name,
                        "app_domain": args.domain,
                        "session_duration": args.session_duration},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        result = parsed.get("result") or {}
        _print_json({
            "id": result.get("id"),
            "name": result.get("name"),
            "domain": result.get("domain"),
            "type": result.get("type"),
            "session_duration": result.get("session_duration"),
            "aud": result.get("aud"),
            "created_at": result.get("created_at"),
        })
    return rc


def cmd_access_service_token_create(args: argparse.Namespace) -> int:
    """Create a CF Access service token.

    POST /accounts/{account_id}/access/service_tokens
    Body: {"name": <n>}

    CRITICAL: ``client_secret`` is returned only at creation time and is
    NEVER persisted in substrate envelopes. The secret is echoed to
    stdout for the operator/caller to capture; the envelope_ack carries
    only the token id + client_id (non-sensitive) + a flag noting the
    secret was emitted to stdout-only.
    """
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write(
            "arq-cloudflare access service-token create: no account_id\n"
        )
        return 2
    body = {"name": args.name}
    status, parsed = _dispatch(
        "access.service-token.create", "POST",
        f"/accounts/{acct}/access/service_tokens",
        body=body,
        extra_envelope={"account_id": acct,
                        "token_name": args.name,
                        "secret_handling": "stdout_only_never_substrate"},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        result = parsed.get("result") or {}
        # Echo the secret to stdout ONCE; envelope_ack stored response_tail
        # but we explicitly do NOT add the secret to any extra_envelope.
        _print_json({
            "id": result.get("id"),
            "name": result.get("name"),
            "client_id": result.get("client_id"),
            "client_secret": result.get("client_secret"),
            "created_at": result.get("created_at"),
            "expires_at": result.get("expires_at"),
            "_note": (
                "client_secret is only returned at creation; capture "
                "now or lose it. Substrate envelopes carry only id + "
                "client_id."
            ),
        })
    return rc


def cmd_access_policy_create(args: argparse.Namespace) -> int:
    """Attach a non-identity policy to an Access app that allows the
    given service token.

    POST /accounts/{account_id}/access/apps/{app_id}/policies
    Body: {"name": <n>, "decision": "non_identity",
           "include": [{"service_token": {"token_id": <sid>}}]}
    """
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare access policy create: no account_id\n")
        return 2
    body = {
        "name": args.name,
        "decision": "non_identity",
        "include": [{"service_token": {"token_id": args.service_token_id}}],
    }
    status, parsed = _dispatch(
        "access.policy.create", "POST",
        f"/accounts/{acct}/access/apps/{args.app_id}/policies",
        body=body,
        extra_envelope={"account_id": acct,
                        "app_id": args.app_id,
                        "policy_name": args.name,
                        "service_token_id": args.service_token_id,
                        "decision": "non_identity"},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed:
        result = parsed.get("result") or {}
        _print_json({
            "id": result.get("id"),
            "name": result.get("name"),
            "decision": result.get("decision"),
            "include": result.get("include"),
            "created_at": result.get("created_at"),
        })
    return rc


# ---------------------------------------------------------------------------
# credentials.* — substrate-managed credential primitive
# ---------------------------------------------------------------------------
#
# Pattern:
#   - twin-master keychain entry mints/rotates/revokes other tokens
#   - Per-service runtime tokens live at keychain ``arqera-cf-<service>``
#   - Each lifecycle op emits a substrate body + act
#   - Tokens NEVER written to substrate payloads (only stdout-once on mint)


def _keychain_set(service: str, value: str, account: str = "arqera") -> bool:
    """Store a token in the macOS keychain. Returns True on success.

    Caller MUST run `security find-generic-password` first to satisfy the
    credential-reuse-gate hook (see CLAUDE.md). This helper only does the
    write side.
    """
    try:
        r = subprocess.run(
            ["security", "add-generic-password", "-U",
             "-s", service, "-a", account, "-w", value],
            capture_output=True, text=True, timeout=5, check=False,
        )
        return r.returncode == 0
    except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
        return False


def _keychain_delete(service: str) -> bool:
    try:
        r = subprocess.run(
            ["security", "delete-generic-password", "-s", service],
            capture_output=True, text=True, timeout=5, check=False,
        )
        return r.returncode == 0
    except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
        return False


def _date_utc() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%d")


def _ts_iso() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def _lookup_permission_groups() -> dict[str, str]:
    """Return {permission_group_name: id} from CF API.

    Called by credentials.mint to resolve human-readable scope names
    (e.g. "Cloudflare Tunnel Write") to permission_group IDs (UUIDs)
    that the POST /user/tokens body requires.
    """
    status, parsed = _dispatch(
        "credentials.list", "GET",
        "/user/tokens/permission_groups?per_page=500",
        extra_envelope={"lookup": "permission_groups"},
    )
    if status != 200 or not parsed:
        return {}
    out: dict[str, str] = {}
    for pg in parsed.get("result") or []:
        name = pg.get("name")
        pid = pg.get("id")
        if name and pid:
            out[name] = pid
    return out


def cmd_credentials_list(args: argparse.Namespace) -> int:
    """List all CF API tokens (via master). Marks substrate-ledger-known tokens."""
    status, parsed = _dispatch(
        "credentials.list", "GET", "/user/tokens?per_page=50",
        extra_envelope={"op": "list_user_tokens"},
    )
    rc = _exit_for_status(status, parsed)
    if rc != 0:
        return rc
    if not parsed:
        return 1
    tokens = parsed.get("result") or []
    out = []
    for t in tokens:
        out.append({
            "id": t.get("id"),
            "name": t.get("name"),
            "status": t.get("status"),
            "expires_on": t.get("expires_on"),
            "scope_count": len(t.get("policies") or []),
            "last_used_at": t.get("last_used_on"),
            "issued_on": t.get("issued_on"),
        })
    _print_json({"tokens": out, "count": len(out)})
    return 0


def cmd_credentials_mint(args: argparse.Namespace) -> int:
    """Mint a new CF API token via master, store in keychain, emit ledger body.

    --acct-scope / --zone-scope / --user-scope are comma-separated CF
    permission_group names (e.g. "Cloudflare Tunnel Write,DNS Read").
    User:API Tokens Write is BLOCKED (CF rejects it anyway in sub-tokens).
    """
    acct = _resolve_account_id()
    if not acct:
        sys.stderr.write("arq-cloudflare credentials mint: no account_id\n")
        return 2

    # 1. Resolve scope names → permission_group IDs.
    pg_map = _lookup_permission_groups()
    if not pg_map:
        sys.stderr.write(
            "arq-cloudflare credentials mint: could not load permission_groups "
            "(check master token has User:API Tokens Write)\n"
        )
        return 2

    def _parse_scopes(raw: str | None) -> list[str]:
        if not raw:
            return []
        return [s.strip() for s in raw.split(",") if s.strip()]

    acct_scopes = _parse_scopes(args.acct_scope)
    zone_scopes = _parse_scopes(args.zone_scope)
    user_scopes = _parse_scopes(args.user_scope)

    # Block User:API Tokens Write in sub-tokens (defence-in-depth).
    forbidden = {"API Tokens Write"}
    for s in user_scopes:
        if s in forbidden:
            sys.stderr.write(
                f"arq-cloudflare credentials mint: scope '{s}' is forbidden "
                "in sub-tokens (CF rejects + substrate policy)\n"
            )
            return 2

    # Build the policies array per CF /user/tokens schema.
    policies: list[dict] = []

    def _resolve_pg_ids(names: list[str]) -> list[dict] | None:
        ids: list[dict] = []
        for n in names:
            pid = pg_map.get(n)
            if not pid:
                sys.stderr.write(
                    f"arq-cloudflare credentials mint: unknown permission group "
                    f"'{n}'. Available examples: {sorted(list(pg_map.keys()))[:10]}\n"
                )
                return None
            ids.append({"id": pid})
        return ids

    if acct_scopes:
        ids = _resolve_pg_ids(acct_scopes)
        if ids is None:
            return 2
        policies.append({
            "effect": "allow",
            "resources": {f"com.cloudflare.api.account.{acct}": "*"},
            "permission_groups": ids,
        })
    if zone_scopes:
        ids = _resolve_pg_ids(zone_scopes)
        if ids is None:
            return 2
        zone = _resolve_zone_id()
        if not zone:
            sys.stderr.write(
                "arq-cloudflare credentials mint: zone-scope requested but "
                "no zone_id resolved (set $ARQ_CF_ZONE_ID or SOPS cloudflare.zone_id)\n"
            )
            return 2
        policies.append({
            "effect": "allow",
            "resources": {f"com.cloudflare.api.account.zone.{zone}": "*"},
            "permission_groups": ids,
        })
    if user_scopes:
        ids = _resolve_pg_ids(user_scopes)
        if ids is None:
            return 2
        policies.append({
            "effect": "allow",
            "resources": {"com.cloudflare.api.user.*": "*"},
            "permission_groups": ids,
        })

    if not policies:
        sys.stderr.write(
            "arq-cloudflare credentials mint: no scopes provided "
            "(--acct-scope / --zone-scope / --user-scope all empty)\n"
        )
        return 2

    body: dict = {"name": args.name, "policies": policies}
    if args.expires:
        # CF expects expires_on in RFC3339; --expires accepts YYYY-MM-DD.
        body["expires_on"] = f"{args.expires}T00:00:00Z"

    status, parsed = _dispatch(
        "credentials.mint", "POST", "/user/tokens",
        body=body,
        extra_envelope={
            "op": "mint_user_token",
            "token_name": args.name,
            "policy_count": len(policies),
            "acct_scopes": acct_scopes,
            "zone_scopes": zone_scopes,
            "user_scopes": user_scopes,
            "secret_handling": "stdout_only_never_substrate",
        },
    )
    rc = _exit_for_status(status, parsed)
    if rc != 0 or not parsed:
        return rc

    result = parsed.get("result") or {}
    token_id = result.get("id")
    token_value = result.get("value")
    if not token_id or not token_value:
        sys.stderr.write(
            "arq-cloudflare credentials mint: CF response missing id or value\n"
        )
        _print_json(parsed)
        return 1

    # Store in keychain at service ``arqera-cf-<name>``.
    keychain_service = f"arqera-cf-{args.name}"
    stored = _keychain_set(keychain_service, token_value)

    # Emit ledger body + act. NEVER include token value in payload.
    body_ref = f"cloudflare-{args.name}-{_date_utc()}"
    body_payload = {
        "title": f"Cloudflare API token: {args.name}",
        "provider": "cloudflare",
        "token_id": token_id,
        "token_name": args.name,
        "keychain_service": keychain_service,
        "scopes": {
            "acct": acct_scopes,
            "zone": zone_scopes,
            "user": user_scopes,
        },
        "expires_on": body.get("expires_on"),
        "status": "active",
        "ts": _ts_iso(),
    }
    body_addr = _emit("body", "credential", body_ref, body_payload)
    act_addr = _emit("act", "credential_minted",
                     f"{args.name}-{_ts_ms()}",
                     {"token_id": token_id,
                      "token_name": args.name,
                      "keychain_service": keychain_service,
                      "keychain_stored": stored,
                      "body_address": body_addr,
                      "ts": _ts_iso()})

    _print_json({
        "outcome": "minted",
        "token_id": token_id,
        "token_name": args.name,
        "keychain_service": keychain_service,
        "keychain_stored": stored,
        "scopes": body_payload["scopes"],
        "expires_on": body.get("expires_on"),
        "body_address": body_addr,
        "act_address": act_addr,
        "token_value": token_value,
        "_note": (
            "token_value is returned only at creation; capture now or "
            "rotate. Substrate envelope carries id+scopes only."
        ),
    })
    return 0


def cmd_credentials_rotate(args: argparse.Namespace) -> int:
    """Rotate a per-service token: mint replacement w/ same scopes, swap keychain,
    schedule grace-revoke of old."""
    sys.stderr.write(
        "arq-cloudflare credentials rotate: not yet implemented "
        "(scope-mirror requires reading old token's /user/tokens/<id> policies).\n"
        "Workaround: arq-cloudflare credentials mint --name <name> "
        "--acct-scope <same> ... then revoke old via credentials revoke.\n"
    )
    # Emit a rotation_skipped act so the substrate sees the intent.
    _emit("act", "credential_rotate_skipped",
          f"{args.service}-{_ts_ms()}",
          {"keychain_service": args.service,
           "reason": "scope_mirror_not_implemented_yet",
           "ts": _ts_iso()})
    return 2


def cmd_credentials_verify(args: argparse.Namespace) -> int:
    """Call CF /user/tokens/<id>/verify for one or all tokens we know about."""
    if args.all:
        # Pull all known tokens via master.
        list_status, list_parsed = _dispatch(
            "credentials.list", "GET", "/user/tokens?per_page=50",
            extra_envelope={"op": "verify_all_pre_list"},
        )
        if list_status != 200 or not list_parsed:
            return _exit_for_status(list_status, list_parsed)
        token_ids = [t.get("id") for t in (list_parsed.get("result") or [])
                     if t.get("id")]
    elif args.service:
        # Look up the token id from substrate ledger body? For now, require
        # operator-supplied id (we don't yet have a substrate-query helper).
        sys.stderr.write(
            "arq-cloudflare credentials verify --service <name>: requires "
            "substrate body lookup (not yet wired). Use --all for now, or "
            "specify --token-id explicitly.\n"
        )
        if not args.token_id:
            return 2
        token_ids = [args.token_id]
    elif args.token_id:
        token_ids = [args.token_id]
    else:
        sys.stderr.write(
            "arq-cloudflare credentials verify: pass --all, --service, or --token-id\n"
        )
        return 2

    results = []
    for tid in token_ids:
        status, parsed = _dispatch(
            "credentials.verify", "GET", f"/user/tokens/{tid}/verify",
            extra_envelope={"op": "verify_user_token", "token_id": tid},
        )
        ok = (status == 200 and parsed and parsed.get("success"))
        results.append({
            "token_id": tid,
            "http_status": status,
            "verified": bool(ok),
            "result_status": ((parsed or {}).get("result") or {}).get("status"),
        })

    _emit("act", "credential_verified",
          f"batch-{_ts_ms()}",
          {"verified_count": sum(1 for r in results if r["verified"]),
           "total": len(results),
           "results": results,
           "ts": _ts_iso()})

    _print_json({"verified": results, "count": len(results)})
    return 0


def cmd_credentials_audit(args: argparse.Namespace) -> int:
    """List CF tokens + emit substrate audit body. Detects orphans/ghosts
    relative to keychain entries we expect."""
    status, parsed = _dispatch(
        "credentials.list", "GET", "/user/tokens?per_page=50",
        extra_envelope={"op": "audit_pre_list"},
    )
    if status != 200 or not parsed:
        return _exit_for_status(status, parsed)

    cf_tokens = parsed.get("result") or []
    cf_token_names = {t.get("name"): t for t in cf_tokens if t.get("name")}

    # Inspect known keychain services prefixed `arqera-cf-`.
    # Keychain doesn't easily enumerate by prefix; we instead enumerate
    # the keychain entries we set in this primitive's lifecycle:
    keychain_known = [
        "arqera-cf-master",
        "arqera-cf-runtime",
        "arqera-cf-cloudflared-homelab",
        "arqera-cf-ci-cf",
    ]
    keychain_present = []
    for svc in keychain_known:
        if _keychain_lookup(svc):
            keychain_present.append(svc)

    audit_payload = {
        "title": "Cloudflare credential audit",
        "provider": "cloudflare",
        "cf_token_count": len(cf_tokens),
        "cf_token_names": sorted(cf_token_names.keys()),
        "keychain_services_present": keychain_present,
        "ts": _ts_iso(),
    }
    audit_addr = _emit("body", "credential_audit",
                        f"cloudflare-{_date_utc()}",
                        audit_payload)

    _print_json({
        "audit_address": audit_addr,
        **audit_payload,
    })
    return 0


def cmd_credentials_revoke(args: argparse.Namespace) -> int:
    """Revoke a CF API token via master.

    If --grace-hours > 0, emit ``credential_revoke_scheduled`` and exit
    (no actual CF delete — the operator/cron must follow up). If grace=0,
    DELETE immediately and emit ``credential_revoked``.
    """
    if not args.token_id:
        sys.stderr.write(
            "arq-cloudflare credentials revoke: --token-id required\n"
        )
        return 2

    grace = args.grace_hours or 0
    if grace > 0:
        sched_addr = _emit(
            "act", "credential_revoke_scheduled",
            f"{args.service or args.token_id}-{_ts_ms()}",
            {"token_id": args.token_id,
             "keychain_service": args.service,
             "grace_hours": grace,
             "scheduled_at": _ts_iso()},
        )
        _print_json({
            "outcome": "scheduled",
            "token_id": args.token_id,
            "grace_hours": grace,
            "act_address": sched_addr,
            "_note": "operator/cron must run credentials revoke --grace-hours 0 after grace window",
        })
        return 0

    # Immediate revoke.
    status, parsed = _dispatch(
        "credentials.revoke", "DELETE", f"/user/tokens/{args.token_id}",
        extra_envelope={"op": "delete_user_token",
                        "token_id": args.token_id,
                        "keychain_service": args.service},
    )
    rc = _exit_for_status(status, parsed)
    if rc != 0:
        return rc

    # Drop the keychain entry too, if requested.
    keychain_dropped = False
    if args.service:
        keychain_dropped = _keychain_delete(args.service)

    revoke_addr = _emit(
        "act", "credential_revoked",
        f"{args.service or args.token_id}-{_ts_ms()}",
        {"token_id": args.token_id,
         "keychain_service": args.service,
         "keychain_dropped": keychain_dropped,
         "revoked_at": _ts_iso()},
    )
    _print_json({
        "outcome": "revoked",
        "token_id": args.token_id,
        "keychain_service": args.service,
        "keychain_dropped": keychain_dropped,
        "act_address": revoke_addr,
    })
    return 0


def cmd_raw(args: argparse.Namespace) -> int:
    body = None
    if args.data:
        try:
            body = json.loads(args.data)  # noqa: ARQ-NO-JSON-HOT-PATH operator CLI input boundary — --data arg is documented JSON
        except json.JSONDecodeError:
            sys.stderr.write("arq-cloudflare raw: --data must be valid JSON\n")
            return 2
    status, parsed = _dispatch(
        "raw", args.method.upper(), args.path, body=body,
        extra_envelope={"raw_method": args.method.upper(),
                        "raw_path_preview": args.path[:120]},
    )
    rc = _exit_for_status(status, parsed)
    if rc == 0 and parsed is not None:
        _print_json(parsed)
    return rc


# ---------------------------------------------------------------------------
# argparse wiring
# ---------------------------------------------------------------------------

from pathlib import Path as _ScopesPath
import sys as _scopes_sys
_scopes_sys.path.insert(0, str(_ScopesPath(__file__).parent))
from _arq_provider_base import handle_meta_flags  # noqa: E402

PROVIDER = "cloudflare"
REQUIRED_SCOPES: dict[str, list[str]] = {
    # Cloudflare scoped-API-token permission names (dashboard "Create Token"
    # template strings). raw=unknown because the payload decides.
    "zone list": ["Zone:Read"],
    "zone dns": ["Zone:DNS:Read"],
    "worker list": ["Account:Workers Scripts:Read"],
    "worker routes": ["Zone:Workers Routes:Read"],
    "access apps": ["Account:Access: Apps and Policies:Read"],
    "access policies": ["Account:Access: Apps and Policies:Read"],
    "ai-gateway list": ["Account:AI Gateway:Read"],
    "tunnel list": ["Account:Cloudflare Tunnel:Read"],
    "tunnel create": ["Account:Cloudflare Tunnel:Edit"],
    "tunnel route": ["Account:Cloudflare Tunnel:Edit"],
    "dns create": ["Zone:DNS:Edit"],
    "config-rule create": ["Zone:WAF:Edit"],
    "access app create": ["Account:Access: Apps and Policies:Edit"],
    "access service-token create": ["Account:Access: Service Tokens:Edit"],
    "access policy create": ["Account:Access: Apps and Policies:Edit"],
    "credentials list": ["User:API Tokens Read"],
    "credentials mint": ["User:API Tokens Write"],
    "credentials rotate": ["User:API Tokens Write"],
    "credentials verify": ["User:API Tokens Read"],
    "credentials audit": ["User:API Tokens Read"],
    "credentials revoke": ["User:API Tokens Write"],
    "raw": ["unknown"],
}

def main() -> int:
    handle_meta_flags(PROVIDER, REQUIRED_SCOPES)
    ap = argparse.ArgumentParser(
        prog="arq-cloudflare",
        description="Cloudflare adapter — mesh-routed CF API v4 with substrate audit.",
    )
    sub = ap.add_subparsers(dest="verb", required=True)

    zone = sub.add_parser("zone", help="zone operations")
    z_sub = zone.add_subparsers(dest="zone_verb", required=True)
    z_list = z_sub.add_parser("list")
    z_list.set_defaults(func=cmd_zone_list)
    z_dns = z_sub.add_parser("dns")
    z_dns.add_argument("zone", nargs="?", default=None,
                        help="zone name or id (defaults to SOPS zone_id)")
    z_dns.set_defaults(func=cmd_zone_dns)

    worker = sub.add_parser("worker", help="worker operations")
    w_sub = worker.add_subparsers(dest="worker_verb", required=True)
    w_list = w_sub.add_parser("list")
    w_list.set_defaults(func=cmd_worker_list)
    w_routes = w_sub.add_parser("routes")
    w_routes.add_argument("zone", nargs="?", default=None)
    w_routes.set_defaults(func=cmd_worker_routes)

    access = sub.add_parser("access", help="Cloudflare Access operations")
    a_sub = access.add_subparsers(dest="access_verb", required=True)
    a_apps = a_sub.add_parser("apps")
    a_apps.set_defaults(func=cmd_access_apps)
    a_pol = a_sub.add_parser("policies")
    a_pol.add_argument("app_id")
    a_pol.set_defaults(func=cmd_access_policies)

    # access app create / access service-token create / access policy create
    # — Zero Trust writes (require Account:Access:* Edit scopes on the token).
    a_app = a_sub.add_parser("app", help="Access application operations")
    a_app_sub = a_app.add_subparsers(dest="app_verb", required=True)
    a_app_create = a_app_sub.add_parser(
        "create", help="create a self-hosted Access application")
    a_app_create.add_argument("--name", required=True,
                              help="application display name (e.g. vllm-homelab)")
    a_app_create.add_argument("--domain", required=True,
                              help="hostname guarded by Access (e.g. vllm.arqera.io)")
    a_app_create.add_argument("--session-duration", default="24h",
                              help="session duration (e.g. 24h, 730h). Default: 24h.")
    a_app_create.set_defaults(func=cmd_access_app_create)

    a_st = a_sub.add_parser("service-token",
                            help="Access service-token operations")
    a_st_sub = a_st.add_subparsers(dest="st_verb", required=True)
    a_st_create = a_st_sub.add_parser(
        "create", help="create a service token (client_id + client_secret)")
    a_st_create.add_argument("--name", required=True,
                             help="service token name (e.g. arqera-backend-staging)")
    a_st_create.set_defaults(func=cmd_access_service_token_create)

    a_policy = a_sub.add_parser("policy",
                                help="Access policy operations")
    a_policy_sub = a_policy.add_subparsers(dest="policy_verb", required=True)
    a_policy_create = a_policy_sub.add_parser(
        "create", help="attach a non-identity service-token policy to an app")
    a_policy_create.add_argument("--app-id", required=True,
                                  help="Access application id")
    a_policy_create.add_argument("--name", required=True,
                                  help="policy display name")
    a_policy_create.add_argument("--service-token-id", required=True,
                                  help="service token id to allow")
    a_policy_create.set_defaults(func=cmd_access_policy_create)

    # config-rule create — Zone:WAF:Edit writes a rule into the
    # http_config_settings phase entrypoint ruleset.
    cfg = sub.add_parser("config-rule", help="Zone Configuration Rule operations")
    cfg_sub = cfg.add_subparsers(dest="cfg_verb", required=True)
    cfg_create = cfg_sub.add_parser(
        "create",
        help="disable Bot Fight Mode + lower security_level for a hostname",
    )
    cfg_create.add_argument("--hostname", required=True,
                            help="full hostname (e.g. vllm.arqera.io)")
    cfg_create.add_argument("--bic", choices=("on", "off"), default="off",
                            help="Bot Fight Mode for this hostname (default: off)")
    cfg_create.add_argument("--security-level", default="essentially_off",
                            choices=("off", "essentially_off", "low",
                                     "medium", "high", "under_attack"),
                            help="security level (default: essentially_off)")
    cfg_create.add_argument("--zone", default=None,
                            help="zone id or name (defaults to SOPS zone_id)")
    cfg_create.set_defaults(func=cmd_config_rule_create)

    aigw = sub.add_parser("ai-gateway", help="AI Gateway operations")
    aigw_sub = aigw.add_subparsers(dest="aigw_verb", required=True)
    aigw_list = aigw_sub.add_parser("list")
    aigw_list.set_defaults(func=cmd_ai_gateway_list)

    tun = sub.add_parser("tunnel", help="cfd_tunnel operations")
    tun_sub = tun.add_subparsers(dest="tunnel_verb", required=True)
    tun_list = tun_sub.add_parser("list")
    tun_list.set_defaults(func=cmd_tunnel_list)
    tun_create = tun_sub.add_parser("create",
                                     help="create a named cfd_tunnel")
    tun_create.add_argument("--name", required=True,
                             help="tunnel name (e.g. arqera-homelab)")
    tun_create.add_argument("--config-src",
                             default="cloudflare",
                             choices=("cloudflare", "local"),
                             help="ingress managed via API (cloudflare) "
                                  "or by local cloudflared config (local). "
                                  "Default: cloudflare.")
    tun_create.set_defaults(func=cmd_tunnel_create)
    tun_route = tun_sub.add_parser("route",
                                    help="set ingress rules on a tunnel")
    tun_route.add_argument("--tunnel-id", required=True)
    tun_route.add_argument("--ingress-json", required=True,
                            help="path to a JSON file containing the ingress "
                                 "rules (either bare array, "
                                 "{'ingress': [...]}, or {'config': {...}})")
    tun_route.set_defaults(func=cmd_tunnel_route)

    dns = sub.add_parser("dns", help="DNS record operations")
    dns_sub = dns.add_subparsers(dest="dns_verb", required=True)
    dns_create = dns_sub.add_parser("create",
                                     help="create a DNS CNAME pointing at "
                                          "a Cloudflare Tunnel")
    dns_create.add_argument("--hostname", required=True,
                             help="full DNS name (e.g. vllm.arqera.io)")
    dns_create.add_argument("--tunnel-id", required=True,
                             help="CF tunnel id (sets content to "
                                  "<tunnel-id>.cfargotunnel.com)")
    dns_create.add_argument("--zone", required=True,
                             help="zone id or zone name (e.g. arqera.io)")
    dns_create.add_argument("--comment", default=None,
                             help="optional record comment")
    dns_create.set_defaults(func=cmd_dns_create)

    # credentials.* — substrate-managed credential primitive (2026-05-27).
    # Master keychain entry mints/rotates/audits/revokes per-service tokens.
    cred = sub.add_parser(
        "credentials",
        help="substrate-managed credential lifecycle (mint/rotate/audit/...)",
    )
    cred_sub = cred.add_subparsers(dest="cred_verb", required=True)

    cred_list = cred_sub.add_parser(
        "list", help="list all CF API tokens via master")
    cred_list.set_defaults(func=cmd_credentials_list)

    cred_mint = cred_sub.add_parser(
        "mint", help="mint a per-service token, store in keychain, emit ledger body")
    cred_mint.add_argument("--name", required=True,
                            help="service name; keychain entry will be arqera-cf-<name>")
    cred_mint.add_argument("--acct-scope", default=None,
                            help="comma-separated account permission_group names "
                                 "(e.g. 'Cloudflare Tunnel Write,Workers Scripts Read')")
    cred_mint.add_argument("--zone-scope", default=None,
                            help="comma-separated zone permission_group names")
    cred_mint.add_argument("--user-scope", default=None,
                            help="comma-separated user permission_group names "
                                 "(API Tokens Write is forbidden)")
    cred_mint.add_argument("--expires", default=None,
                            help="expiry date (YYYY-MM-DD); default: no expiry")
    cred_mint.set_defaults(func=cmd_credentials_mint)

    cred_rotate = cred_sub.add_parser(
        "rotate", help="rotate a per-service token (scope-mirror — WIP)")
    cred_rotate.add_argument("--service", required=True,
                              help="keychain service name to rotate (e.g. arqera-cf-runtime)")
    cred_rotate.set_defaults(func=cmd_credentials_rotate)

    cred_verify = cred_sub.add_parser(
        "verify", help="verify CF tokens via /user/tokens/<id>/verify")
    cred_verify.add_argument("--all", action="store_true",
                              help="verify every token the master can see")
    cred_verify.add_argument("--service", default=None,
                              help="keychain service name (requires --token-id for now)")
    cred_verify.add_argument("--token-id", default=None,
                              help="explicit CF token id to verify")
    cred_verify.set_defaults(func=cmd_credentials_verify)

    cred_audit = cred_sub.add_parser(
        "audit",
        help="emit credential_audit body cross-checking CF list vs keychain")
    cred_audit.set_defaults(func=cmd_credentials_audit)

    cred_revoke = cred_sub.add_parser(
        "revoke", help="schedule or execute a CF token revoke")
    cred_revoke.add_argument("--token-id", required=True,
                              help="CF token id to revoke")
    cred_revoke.add_argument("--service", default=None,
                              help="keychain service name to drop on immediate revoke")
    cred_revoke.add_argument("--grace-hours", type=int, default=24,
                              help="schedule revoke for N hours from now (default: 24); "
                                   "pass 0 to revoke immediately")
    cred_revoke.set_defaults(func=cmd_credentials_revoke)

    raw = sub.add_parser("raw", help="arbitrary /client/v4 passthrough")
    raw.add_argument("method", help="GET | POST | PUT | PATCH | DELETE")
    raw.add_argument("path", help="path under /client/v4 (must start with '/')")
    raw.add_argument("--data", default=None, help="JSON body for write methods")
    raw.set_defaults(func=cmd_raw)

    args = ap.parse_args()
    return args.func(args)


if __name__ == "__main__":
    sys.exit(main())
