Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | 19x 120x 40x 80x 9x 71x 15x 96x 96x 96x 96x 87x 87x 69x 27x 63x 21x 21x 21x 54x 54x 36x 36x 18x 18x 18x 21x 21x 18x 21x | /**
* Pure helpers for the Coordination surface: classifying IPC streams by kind
* and attributing them to the task that owns their subscribers.
*
* No React or DOM dependencies — safe to unit test in isolation.
*
* @module
*/
import type { Session, StreamData } from "../hooks/types.js";
/** Internal IPC plumbing prefixes (mirrors the server's RESERVED_PREFIXES). */
export const INTERNAL_STREAM_PREFIXES: readonly string[] = ["lifecycle:", "pipe:", "stdin:"];
/** Display kind of a stream, derived from its shape. */
export type StreamKind = "chatroom" | "pipe" | "channel";
/** Ownership classification of a stream, derived from its subscribers' sessions. */
export type StreamOwnership =
| { kind: "task"; taskId: string }
| { kind: "unattached" }
| { kind: "external" };
/**
* Classify a stream's kind:
* - `chatroom` — self-echo streams (N-party rooms where senders see their own messages).
* - `pipe` — internal point-to-point pipes (`pipe:` prefix).
* - `channel` — any other named stream.
*/
export function streamKind(stream: StreamData): StreamKind {
if (stream.selfEcho) {
return "chatroom";
}
if (stream.name.startsWith("pipe:")) {
return "pipe";
}
return "channel";
}
/** Returns true when a stream is internal IPC plumbing (reserved prefix). */
export function isInternalStream(stream: StreamData): boolean {
return INTERNAL_STREAM_PREFIXES.some((prefix) => stream.name.startsWith(prefix));
}
/**
* Attribute a stream to the task that owns it, by resolving its subscribers'
* sessions:
* - the first subscriber whose session has a `taskId` wins → `{ kind: "task" }`;
* - else if any subscriber's session is known (but task-less) → `unattached`;
* - else (no subscriber session resolvable — e.g. CLI/MCP-created) → `external`.
*/
export function attributeStream(stream: StreamData, sessions: readonly Session[]): StreamOwnership {
return attributeStreamWithMap(stream, new Map(sessions.map((s) => [s.id, s])));
}
/**
* Attribute a stream to its owning task against a precomputed session map —
* avoids rebuilding the map per stream when attributing many streams at once.
*/
export function attributeStreamWithMap(
stream: StreamData,
sessionsById: ReadonlyMap<string, Session>,
): StreamOwnership {
let sawKnownSession = false;
for (const sub of stream.subscribers) {
const session = sessionsById.get(sub.sessionId);
if (session) {
sawKnownSession = true;
if (session.taskId) {
return { kind: "task", taskId: session.taskId };
}
}
}
return sawKnownSession ? { kind: "unattached" } : { kind: "external" };
}
/** A group of streams sharing an owning task (or the unattached/external bucket). */
export interface StreamGroup {
/** Owning task id, or `undefined` for the combined unattached/external bucket. */
taskId?: string;
/** Streams in this group, in their incoming order. */
streams: StreamData[];
}
/**
* Group streams by owning task, preserving first-seen order of tasks. Streams
* that are unattached or external are collected into a single trailing bucket
* with `taskId === undefined`.
*/
export function groupStreamsByTask(
streams: readonly StreamData[],
sessions: readonly Session[],
): StreamGroup[] {
const sessionsById = new Map(sessions.map((s) => [s.id, s]));
const taskGroups = new Map<string, StreamData[]>();
const orphans: StreamData[] = [];
for (const stream of streams) {
const ownership = attributeStreamWithMap(stream, sessionsById);
if (ownership.kind === "task") {
const existing = taskGroups.get(ownership.taskId);
if (existing) {
existing.push(stream);
} else {
taskGroups.set(ownership.taskId, [stream]);
}
} else {
orphans.push(stream);
}
}
const groups: StreamGroup[] = Array.from(taskGroups, ([taskId, groupStreams]) => ({
taskId,
streams: groupStreams,
}));
if (orphans.length > 0) {
groups.push({ taskId: undefined, streams: orphans });
}
return groups;
}
|