Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | 23x 23x 23x 4x 21x 1x 20x 20x 2x 2x 2x 2x 4x 14x 7x 7x 7x 14x 2x 2x 2x 5x 5x 10x 4x 1x | /**
* Stream wrapper class inspired by OpenAI SDK
* Provides utilities for working with async iterables
*/
export class Stream<T> implements AsyncIterable<T> {
private consumed = false;
constructor(
private iterator: () => AsyncIterator<T>,
private controller?: AbortController
) {}
/**
* Create a Stream from an async generator
*/
static fromAsyncIterable<T>(iterable: AsyncIterable<T>, controller?: AbortController): Stream<T> {
return new Stream(() => iterable[Symbol.asyncIterator](), controller);
}
[Symbol.asyncIterator](): AsyncIterator<T> {
if (this.consumed) {
throw new Error("Cannot iterate over a consumed stream, use `.tee()` to split the stream.");
}
this.consumed = true;
return this.iterator();
}
/**
* Splits the stream into two streams which can be
* independently read from at different speeds.
*/
tee(): [Stream<T>, Stream<T>] {
const left: Array<Promise<IteratorResult<T>>> = [];
const right: Array<Promise<IteratorResult<T>>> = [];
const iterator = this.iterator();
const teeIterator = (queue: Array<Promise<IteratorResult<T>>>): AsyncIterator<T> => {
return {
next: () => {
if (queue.length === 0) {
const result = iterator.next();
left.push(result);
right.push(result);
}
return queue.shift()!;
}
};
};
return [
new Stream(() => teeIterator(left), this.controller),
new Stream(() => teeIterator(right), this.controller)
];
}
/**
* Collect all chunks into an array
*/
async toArray(): Promise<T[]> {
const result: T[] = [];
for await (const chunk of this) {
result.push(chunk);
}
return result;
}
/**
* Abort the underlying stream
*/
abort(): void {
this.controller?.abort();
}
}
|