All files / src/utils streamCoordination.ts

88.57% Statements 31/35
100% Branches 11/11
55.55% Functions 5/9
93.54% Lines 29/31

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121                        18x                                   100x 35x   65x 6x   59x                                                     79x 79x 79x 79x 74x 74x 56x       23x                                       63x 21x 21x   21x 54x 54x 36x 36x 18x   18x     18x       21x       21x 18x   21x    
/**
 * Pure helpers for the Coordination surface: classifying IPC streams by kind
 * and attributing them to the task that owns their subscribers.
 *
 * No React or DOM dependencies — safe to unit test in isolation.
 *
 * @module
 */
 
import type { Session, StreamData } from "../hooks/types.js";
 
/** Internal IPC plumbing prefixes (mirrors the server's RESERVED_PREFIXES). */
export const INTERNAL_STREAM_PREFIXES: readonly string[] = ["lifecycle:", "pipe:", "stdin:"];
 
/** Display kind of a stream, derived from its shape. */
export type StreamKind = "chatroom" | "pipe" | "channel";
 
/** Ownership classification of a stream, derived from its subscribers' sessions. */
export type StreamOwnership =
  | { kind: "task"; taskId: string }
  | { kind: "unattached" }
  | { kind: "external" };
 
/**
 * Classify a stream's kind:
 * - `chatroom` — self-echo streams (N-party rooms where senders see their own messages).
 * - `pipe` — internal point-to-point pipes (`pipe:` prefix).
 * - `channel` — any other named stream.
 */
export function streamKind(stream: StreamData): StreamKind {
  if (stream.selfEcho) {
    return "chatroom";
  }
  if (stream.name.startsWith("pipe:")) {
    return "pipe";
  }
  return "channel";
}
 
/** Returns true when a stream is internal IPC plumbing (reserved prefix). */
export function isInternalStream(stream: StreamData): boolean {
  return INTERNAL_STREAM_PREFIXES.some((prefix) => stream.name.startsWith(prefix));
}
 
/**
 * Attribute a stream to the task that owns it, by resolving its subscribers'
 * sessions:
 * - the first subscriber whose session has a `taskId` wins → `{ kind: "task" }`;
 * - else if any subscriber's session is known (but task-less) → `unattached`;
 * - else (no subscriber session resolvable — e.g. CLI/MCP-created) → `external`.
 */
export function attributeStream(stream: StreamData, sessions: readonly Session[]): StreamOwnership {
  return attributeStreamWithMap(stream, new Map(sessions.map((s) => [s.id, s])));
}
 
/**
 * Attribute a stream to its owning task against a precomputed session map —
 * avoids rebuilding the map per stream when attributing many streams at once.
 */
export function attributeStreamWithMap(
  stream: StreamData,
  sessionsById: ReadonlyMap<string, Session>,
): StreamOwnership {
  let sawKnownSession = false;
  for (const sub of stream.subscribers) {
    const session = sessionsById.get(sub.sessionId);
    if (session) {
      sawKnownSession = true;
      if (session.taskId) {
        return { kind: "task", taskId: session.taskId };
      }
    }
  }
  return sawKnownSession ? { kind: "unattached" } : { kind: "external" };
}
 
/** A group of streams sharing an owning task (or the unattached/external bucket). */
export interface StreamGroup {
  /** Owning task id, or `undefined` for the combined unattached/external bucket. */
  taskId?: string;
  /** Streams in this group, in their incoming order. */
  streams: StreamData[];
}
 
/**
 * Group streams by owning task, preserving first-seen order of tasks. Streams
 * that are unattached or external are collected into a single trailing bucket
 * with `taskId === undefined`.
 */
export function groupStreamsByTask(
  streams: readonly StreamData[],
  sessions: readonly Session[],
): StreamGroup[] {
  const sessionsById = new Map(sessions.map((s) => [s.id, s]));
  const taskGroups = new Map<string, StreamData[]>();
  const orphans: StreamData[] = [];
 
  for (const stream of streams) {
    const ownership = attributeStreamWithMap(stream, sessionsById);
    if (ownership.kind === "task") {
      const existing = taskGroups.get(ownership.taskId);
      if (existing) {
        existing.push(stream);
      } else {
        taskGroups.set(ownership.taskId, [stream]);
      }
    } else {
      orphans.push(stream);
    }
  }
 
  const groups: StreamGroup[] = Array.from(taskGroups, ([taskId, groupStreams]) => ({
    taskId,
    streams: groupStreams,
  }));
  if (orphans.length > 0) {
    groups.push({ taskId: undefined, streams: orphans });
  }
  return groups;
}