#!/usr/bin/env python3
"""arq-vendor-stream-emit — flow vendor events into substrate as acts.

Task #38: actually flow Sentry/CF/GCP/Modal/Posthog events as substrate
acts so vendor-side state contributes to substrate truth.

Pattern: per-vendor read via existing arq-<vendor> primitive, classify
events, emit one substrate act per observed event. Dedup via in-memory
seen-set (per run) + optionally a substrate seen-act lookup.

v0 verbs:
  arq-vendor-stream-emit                   # one-shot all vendors
  arq-vendor-stream-emit --vendor sentry   # subset
  arq-vendor-stream-emit --daemon          # loop (60s default cadence)
  arq-vendor-stream-emit --dry-run         # print what would be emitted, don't emit

Substrate emission shape:
  arq://act/<vendor>_event_observed/<event-id>
  payload: {vendor, event_id, kind, observed_at, raw_summary}
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
import time
from datetime import UTC, datetime
from pathlib import Path

BRIDGE_DIR = Path(__file__).parent
sys.path.insert(0, str(BRIDGE_DIR))
from _arq_provider_base import emit_act


# Per-vendor read recipe: (verbs, json-path-to-event-list, id-extractor, kind-extractor)
VENDOR_READS = {
    "sentry": {
        "verbs": ["issue", "list", "--project", "arqera-prod"],
        "list_path": "data",
        "id_key": "id",
        "kind_key": "level",
    },
    "cloudflare": {
        "verbs": ["zone", "list"],
        "list_path": "result",
        "id_key": "id",
        "kind_key": "status",
    },
    "posthog": {
        "verbs": ["event", "count", "--days", "1"],
        "list_path": "results",
        "id_key": "id",
        "kind_key": "event",
    },
    "modal": {
        "verbs": ["app", "list"],
        "list_path": None,  # modal returns plain-text; treated separately
        "id_key": None,
        "kind_key": None,
    },
}


def _read_vendor(vendor: str, recipe: dict, timeout: int) -> list[dict]:
    script = BRIDGE_DIR / f"arq-{vendor}"
    if not script.exists():
        return []
    try:
        r = subprocess.run(
            [str(script)] + recipe["verbs"],
            capture_output=True, text=True, timeout=timeout, check=False,
        )
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return []

    if r.returncode != 0:
        return []

    # Modal special case: plain-text; one event per non-header line
    if recipe["list_path"] is None:
        events = []
        for line in r.stdout.splitlines()[2:]:  # skip header lines
            if line.strip():
                events.append({"_raw": line.strip(), "id": line.split()[0] if line.split() else "unknown"})
        return events

    try:
        data = json.loads(r.stdout)
        events = data.get(recipe["list_path"], []) if isinstance(data, dict) else []
        return events if isinstance(events, list) else []
    except json.JSONDecodeError:
        return []


def _event_id(event: dict, recipe: dict) -> str:
    if recipe.get("id_key") and recipe["id_key"] in event:
        return str(event[recipe["id_key"]])
    return str(hash(json.dumps(event, sort_keys=True, default=str)))


def _emit_event(vendor: str, event: dict, recipe: dict, dry_run: bool) -> str:
    eid = _event_id(event, recipe)
    kind = str(event.get(recipe.get("kind_key", ""), "unknown")) if recipe.get("kind_key") else "unknown"
    ref = f"{vendor}-{eid}"
    payload = {
        "vendor": vendor,
        "event_id": eid,
        "kind": kind,
        "observed_at": datetime.now(UTC).isoformat(),
        "raw_summary": {k: v for k, v in event.items() if not str(k).startswith("_")},
    }
    if dry_run:
        print(f"  [DRY-RUN] emit arq://act/{vendor}_event_observed/{ref} kind={kind}")
    else:
        emit_act("act", f"{vendor}_event_observed", ref, payload)
    return ref


def _run_once(vendors: list[str], dry_run: bool, timeout: int) -> dict[str, int]:
    counts: dict[str, int] = {}
    for v in vendors:
        recipe = VENDOR_READS.get(v)
        if not recipe:
            print(f"  no recipe for {v} · skipping")
            counts[v] = 0
            continue
        events = _read_vendor(v, recipe, timeout)
        for evt in events:
            _emit_event(v, evt, recipe, dry_run)
        counts[v] = len(events)
        print(f"  {v}: {len(events)} events {'(dry-run)' if dry_run else 'emitted'}")
    return counts


def main() -> int:
    p = argparse.ArgumentParser(prog="arq-vendor-stream-emit")
    p.add_argument("--vendor", action="append", help="subset of vendors (default: all)")
    p.add_argument("--daemon", action="store_true", help="loop every --interval seconds")
    p.add_argument("--interval", type=int, default=60)
    p.add_argument("--dry-run", action="store_true", help="print only, don't emit")
    p.add_argument("--timeout", type=int, default=30)
    args = p.parse_args()

    vendors = args.vendor or list(VENDOR_READS.keys())

    if args.daemon:
        print(f"  daemon mode · vendors={vendors} · interval={args.interval}s")
        while True:
            tick = datetime.now(UTC).isoformat()
            print(f"\n[{tick}] tick")
            try:
                _run_once(vendors, args.dry_run, args.timeout)
            except KeyboardInterrupt:
                print("\n  shutting down")
                return 0
            except Exception as e:
                sys.stderr.write(f"  tick err: {e}\n")
            time.sleep(args.interval)
    else:
        counts = _run_once(vendors, args.dry_run, args.timeout)
        total = sum(counts.values())
        print(f"\n  total events: {total} across {len(vendors)} vendors")

    return 0


if __name__ == "__main__":
    sys.exit(main())
