All files / src/utils streamCoordination.ts

100% Statements 35/35
100% Branches 16/16
100% Functions 9/9
100% Lines 31/31

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121                        2x                                   12x 4x   8x 4x   4x         9x                     6x                     16x 16x 20x 20x 17x 17x 6x       10x                                       4x 2x 2x   2x 6x 6x 4x 4x 1x   3x     2x       3x       2x 1x   2x    
/**
 * Pure helpers for the Coordination surface: classifying IPC streams by kind
 * and attributing them to the task that owns their subscribers.
 *
 * No React or DOM dependencies — safe to unit test in isolation.
 *
 * @module
 */
 
import type { Session, StreamData } from "../hooks/types.js";
 
/** Internal IPC plumbing prefixes (mirrors the server's RESERVED_PREFIXES). */
export const INTERNAL_STREAM_PREFIXES: readonly string[] = ["lifecycle:", "pipe:", "stdin:"];
 
/** Display kind of a stream, derived from its shape. */
export type StreamKind = "chatroom" | "pipe" | "channel";
 
/** Ownership classification of a stream, derived from its subscribers' sessions. */
export type StreamOwnership =
  | { kind: "task"; taskId: string }
  | { kind: "unattached" }
  | { kind: "external" };
 
/**
 * Classify a stream's kind:
 * - `chatroom` — self-echo streams (N-party rooms where senders see their own messages).
 * - `pipe` — internal point-to-point pipes (`pipe:` prefix).
 * - `channel` — any other named stream.
 */
export function streamKind(stream: StreamData): StreamKind {
  if (stream.selfEcho) {
    return "chatroom";
  }
  if (stream.name.startsWith("pipe:")) {
    return "pipe";
  }
  return "channel";
}
 
/** Returns true when a stream is internal IPC plumbing (reserved prefix). */
export function isInternalStream(stream: StreamData): boolean {
  return INTERNAL_STREAM_PREFIXES.some((prefix) => stream.name.startsWith(prefix));
}
 
/**
 * Attribute a stream to the task that owns it, by resolving its subscribers'
 * sessions:
 * - the first subscriber whose session has a `taskId` wins → `{ kind: "task" }`;
 * - else if any subscriber's session is known (but task-less) → `unattached`;
 * - else (no subscriber session resolvable — e.g. CLI/MCP-created) → `external`.
 */
export function attributeStream(stream: StreamData, sessions: readonly Session[]): StreamOwnership {
  return attributeStreamWithMap(stream, new Map(sessions.map((s) => [s.id, s])));
}
 
/**
 * Attribute a stream to its owning task against a precomputed session map —
 * avoids rebuilding the map per stream when attributing many streams at once.
 */
export function attributeStreamWithMap(
  stream: StreamData,
  sessionsById: ReadonlyMap<string, Session>,
): StreamOwnership {
  let sawKnownSession = false;
  for (const sub of stream.subscribers) {
    const session = sessionsById.get(sub.sessionId);
    if (session) {
      sawKnownSession = true;
      if (session.taskId) {
        return { kind: "task", taskId: session.taskId };
      }
    }
  }
  return sawKnownSession ? { kind: "unattached" } : { kind: "external" };
}
 
/** A group of streams sharing an owning task (or the unattached/external bucket). */
export interface StreamGroup {
  /** Owning task id, or `undefined` for the combined unattached/external bucket. */
  taskId?: string;
  /** Streams in this group, in their incoming order. */
  streams: StreamData[];
}
 
/**
 * Group streams by owning task, preserving first-seen order of tasks. Streams
 * that are unattached or external are collected into a single trailing bucket
 * with `taskId === undefined`.
 */
export function groupStreamsByTask(
  streams: readonly StreamData[],
  sessions: readonly Session[],
): StreamGroup[] {
  const sessionsById = new Map(sessions.map((s) => [s.id, s]));
  const taskGroups = new Map<string, StreamData[]>();
  const orphans: StreamData[] = [];
 
  for (const stream of streams) {
    const ownership = attributeStreamWithMap(stream, sessionsById);
    if (ownership.kind === "task") {
      const existing = taskGroups.get(ownership.taskId);
      if (existing) {
        existing.push(stream);
      } else {
        taskGroups.set(ownership.taskId, [stream]);
      }
    } else {
      orphans.push(stream);
    }
  }
 
  const groups: StreamGroup[] = Array.from(taskGroups, ([taskId, groupStreams]) => ({
    taskId,
    streams: groupStreams,
  }));
  if (orphans.length > 0) {
    groups.push({ taskId: undefined, streams: orphans });
  }
  return groups;
}