#!/usr/bin/env bash
# orch-subscribe — give a worker pane push-notifications when listed peers
# fire Stop events. Spawns a long-running daemon that fswatches the marker dir
# and injects a prompt into the calling pane via `orch-tell` on each event.
#
# Usage (run from inside the worker pane, or from the orchestrator with
# ORCH_PANE_ID set to the worker's pane):
#   orch-subscribe <peer_pane> [<peer_pane> ...]   # add subscriptions
#   orch-subscribe --list                          # show active subs
#   orch-subscribe --unsub <peer_pane>             # remove one sub
#   orch-subscribe --cancel                        # remove all subs
#
# State: ~/.cache/orch-subs/<self>.<peer>.pid (one pidfile per sub).
# Daemon exits when its pidfile is removed OR the self-pane disappears.
#
# Loop-break: injected prompts are prefixed `[peer event]` and the fleet-prompt
# addendum tells workers to treat them as read-only context. This is a social
# guard. If A subs to B and B subs to A AND a worker writes back on every
# event, you will ping-pong. Don't do that.
set -euo pipefail

[ -n "${ORCH_PANE_ID:-}" ] || {
    echo "orch-subscribe: ORCH_PANE_ID not set — am I in a worker pane?" >&2
    exit 1
}
command -v fswatch >/dev/null 2>&1 || { echo "fswatch missing — brew install fswatch" >&2; exit 1; }
command -v orch-tell >/dev/null 2>&1 || { echo "orch-tell not on PATH" >&2; exit 1; }

SELF=$ORCH_PANE_ID
DIR="${ORCH_STOP_DIR:-$HOME/.cache/orch-stop}"
SUB_DIR="${ORCH_SUB_DIR:-$HOME/.cache/orch-subs}"
mkdir -p "$DIR" "$SUB_DIR"

list_subs() {
    local found=0
    for f in "$SUB_DIR/$SELF".*.pid; do
        [ -e "$f" ] || continue
        local pid peer
        pid=$(cat "$f")
        peer=$(basename "$f" .pid)
        peer=${peer#"$SELF."}
        if kill -0 "$pid" 2>/dev/null; then
            echo "$peer  pid=$pid"
            found=1
        else
            rm -f "$f"  # stale
        fi
    done
    [ $found -eq 0 ] && echo "(no active subscriptions for $SELF)"
}

unsub_one() {
    local peer=$1
    local pidfile="$SUB_DIR/$SELF.$peer.pid"
    [ -f "$pidfile" ] || { echo "$peer: not subscribed"; return; }
    kill "$(cat "$pidfile")" 2>/dev/null || true
    rm -f "$pidfile"
    echo "$peer: unsubscribed"
}

cancel_all() {
    for f in "$SUB_DIR/$SELF".*.pid; do
        [ -e "$f" ] || continue
        kill "$(cat "$f")" 2>/dev/null || true
        rm -f "$f"
    done
    echo "all subscriptions for $SELF cancelled"
}

case "${1:-}" in
    --list)   list_subs;   exit 0 ;;
    --cancel) cancel_all;  exit 0 ;;
    --unsub)
        [ "${2:-}" ] || { echo "usage: orch-subscribe --unsub <peer_pane>" >&2; exit 1; }
        unsub_one "$2"
        exit 0 ;;
    "" )
        echo "usage: orch-subscribe <peer_pane>... | --list | --unsub <peer> | --cancel" >&2
        exit 1 ;;
esac

# Daemon body — spawned per (self, peer) pair.
# Killable via TERM (--unsub / --cancel signals the pid, trap cleans up pidfile).
spawn_daemon() {
    local peer=$1
    local pidfile=$2
    local marker="$DIR/$peer.event"

    (
        trap 'rm -f "$pidfile"; exit 0' TERM INT
        trap 'rm -f "$pidfile"' EXIT

        # Dedup on ts_ns from marker content (set once per logical event by the
        # producer hook). mtime-based dedup fails because a shell `> file`
        # redirect writes incrementally — multiple kernel events, multiple
        # mtime ticks, but ONE logical Stop event.
        local last_ts_ns=""
        [ -e "$marker" ] && last_ts_ns=$(grep '^ts_ns=' "$marker" 2>/dev/null | cut -d= -f2- || echo "")

        # Process-then-block ordering: check the marker at the top of each
        # iteration BEFORE entering fswatch. Without this pre-check there's a
        # race: while orch-tell runs (~1s) the next event can land before
        # `fswatch -1` re-arms; the next `fswatch -1` will only wake on a
        # FUTURE event, dropping the queued one. Pre-check catches any change
        # that occurred during the previous fire.
        while :; do
            # Self-pane disappeared → exit. Cheap check on each iteration.
            tmux list-panes -a -F '#{pane_id}' 2>/dev/null | grep -qx "$SELF" || exit 0

            local cur_ts_ns ts_iso cwd
            cur_ts_ns=""
            [ -e "$marker" ] && cur_ts_ns=$(grep '^ts_ns=' "$marker" 2>/dev/null | cut -d= -f2- || echo "")

            if [ -n "$cur_ts_ns" ] && [ "$cur_ts_ns" != "$last_ts_ns" ]; then
                last_ts_ns=$cur_ts_ns
                ts_iso=$(grep '^ts_iso=' "$marker" 2>/dev/null | cut -d= -f2- || echo "")
                cwd=$(grep '^cwd=' "$marker" 2>/dev/null | cut -d= -f2- || echo "")
                orch-tell "$SELF" "[peer event] $peer fired Stop at $ts_iso (cwd=$cwd) — read-only context, do not auto-reply unless instructed." >/dev/null 2>&1 || true
                continue   # immediately re-check before blocking — drains queued events
            fi

            # No new event → block until any change in marker dir.
            fswatch -1 "$DIR" >/dev/null 2>&1 || sleep 1
        done
    ) &
    local pid=$!
    # Write pidfile immediately so the caller can read it without racing the
    # subshell. Subshell's EXIT trap will clean it up if/when the daemon ends.
    echo "$pid" > "$pidfile"
}

REFUSED=0
for peer in "$@"; do
    [[ "$peer" =~ ^%[0-9]+$ ]] || { echo "orch-subscribe: invalid pane id: $peer" >&2; exit 1; }
    [ "$peer" = "$SELF" ] && { echo "orch-subscribe: refusing to subscribe pane to itself ($peer)" >&2; exit 1; }

    # Refuse mutual subscriptions: if `peer` is already subscribed to `SELF`,
    # adding the reverse direction would create a cascade loop (every Stop on
    # one side fires a turn on the other, whose Stop fires back, forever). The
    # right place for bidirectional coordination is the orchestrator.
    reverse_pidfile="$SUB_DIR/${peer}.${SELF}.pid"
    if [ -f "$reverse_pidfile" ]; then
        rev_pid=$(cat "$reverse_pidfile" 2>/dev/null)
        if [ -n "$rev_pid" ] && kill -0 "$rev_pid" 2>/dev/null; then
            echo "orch-subscribe: refusing — $peer is already subscribed to $SELF (mutual subscriptions cause cascade loops; route bidirectional coordination through the orchestrator instead)" >&2
            REFUSED=$((REFUSED + 1))
            continue
        else
            rm -f "$reverse_pidfile"  # stale, clean up
        fi
    fi

    pidfile="$SUB_DIR/$SELF.$peer.pid"
    if [ -f "$pidfile" ] && kill -0 "$(cat "$pidfile")" 2>/dev/null; then
        echo "$peer: already subscribed (pid $(cat "$pidfile"))"
        continue
    fi
    rm -f "$pidfile"  # clean up stale

    spawn_daemon "$peer" "$pidfile"
    echo "$peer: subscribed (pid $(cat "$pidfile"))"
done

[ "$REFUSED" -gt 0 ] && exit 2 || exit 0
