All files / src/utils streamCoordination.ts

100% Statements 61/61
100% Branches 24/24
100% Functions 5/5
100% Lines 61/61

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121                        1x                                 1x 12x 4x 4x 12x 4x 4x 4x 4x     1x 4x 4x                 1x 3x 3x           1x 16x 16x 16x 16x 16x 20x 20x 17x 17x 6x 6x 17x 20x 16x 16x                             1x 2x 2x 2x 2x 2x 2x   2x 6x 6x 4x 4x 1x 4x 3x 3x 6x 2x 2x 6x   2x 3x 3x 2x 2x 1x 1x 2x 2x  
/**
 * Pure helpers for the Coordination surface: classifying IPC streams by kind
 * and attributing them to the task that owns their subscribers.
 *
 * No React or DOM dependencies — safe to unit test in isolation.
 *
 * @module
 */
 
import type { Session, StreamData } from "../hooks/types.js";
 
/** Internal IPC plumbing prefixes (mirrors the server's RESERVED_PREFIXES). */
export const INTERNAL_STREAM_PREFIXES: readonly string[] = ["lifecycle:", "pipe:", "stdin:"];
 
/** Display kind of a stream, derived from its shape. */
export type StreamKind = "chatroom" | "pipe" | "channel";
 
/** Ownership classification of a stream, derived from its subscribers' sessions. */
export type StreamOwnership =
  | { kind: "task"; taskId: string }
  | { kind: "unattached" }
  | { kind: "external" };
 
/**
 * Classify a stream's kind:
 * - `chatroom` — self-echo streams (N-party rooms where senders see their own messages).
 * - `pipe` — internal point-to-point pipes (`pipe:` prefix).
 * - `channel` — any other named stream.
 */
export function streamKind(stream: StreamData): StreamKind {
  if (stream.selfEcho) {
    return "chatroom";
  }
  if (stream.name.startsWith("pipe:")) {
    return "pipe";
  }
  return "channel";
}
 
/** Returns true when a stream is internal IPC plumbing (reserved prefix). */
export function isInternalStream(stream: StreamData): boolean {
  return INTERNAL_STREAM_PREFIXES.some((prefix) => stream.name.startsWith(prefix));
}
 
/**
 * Attribute a stream to the task that owns it, by resolving its subscribers'
 * sessions:
 * - the first subscriber whose session has a `taskId` wins → `{ kind: "task" }`;
 * - else if any subscriber's session is known (but task-less) → `unattached`;
 * - else (no subscriber session resolvable — e.g. CLI/MCP-created) → `external`.
 */
export function attributeStream(stream: StreamData, sessions: readonly Session[]): StreamOwnership {
  return attributeStreamWithMap(stream, new Map(sessions.map((s) => [s.id, s])));
}
 
/**
 * Attribute a stream to its owning task against a precomputed session map —
 * avoids rebuilding the map per stream when attributing many streams at once.
 */
export function attributeStreamWithMap(
  stream: StreamData,
  sessionsById: ReadonlyMap<string, Session>,
): StreamOwnership {
  let sawKnownSession = false;
  for (const sub of stream.subscribers) {
    const session = sessionsById.get(sub.sessionId);
    if (session) {
      sawKnownSession = true;
      if (session.taskId) {
        return { kind: "task", taskId: session.taskId };
      }
    }
  }
  return sawKnownSession ? { kind: "unattached" } : { kind: "external" };
}
 
/** A group of streams sharing an owning task (or the unattached/external bucket). */
export interface StreamGroup {
  /** Owning task id, or `undefined` for the combined unattached/external bucket. */
  taskId?: string;
  /** Streams in this group, in their incoming order. */
  streams: StreamData[];
}
 
/**
 * Group streams by owning task, preserving first-seen order of tasks. Streams
 * that are unattached or external are collected into a single trailing bucket
 * with `taskId === undefined`.
 */
export function groupStreamsByTask(
  streams: readonly StreamData[],
  sessions: readonly Session[],
): StreamGroup[] {
  const sessionsById = new Map(sessions.map((s) => [s.id, s]));
  const taskGroups = new Map<string, StreamData[]>();
  const orphans: StreamData[] = [];
 
  for (const stream of streams) {
    const ownership = attributeStreamWithMap(stream, sessionsById);
    if (ownership.kind === "task") {
      const existing = taskGroups.get(ownership.taskId);
      if (existing) {
        existing.push(stream);
      } else {
        taskGroups.set(ownership.taskId, [stream]);
      }
    } else {
      orphans.push(stream);
    }
  }
 
  const groups: StreamGroup[] = Array.from(taskGroups, ([taskId, groupStreams]) => ({
    taskId,
    streams: groupStreams,
  }));
  if (orphans.length > 0) {
    groups.push({ taskId: undefined, streams: orphans });
  }
  return groups;
}