# stream-chain

> A library for chaining functions, generators, and streams into a single pipeline with proper per-item backpressure. Zero runtime dependencies. Works with Node.js (22, 24, 26), Bun, and Deno. ESM-only; CJS consumers use destructure.

- Three substrate variants: `stream-chain` / `stream-chain/node` (Node Streams `Duplex`), `stream-chain/web` (native Web Streams `{readable, writable}`), `stream-chain/core` (substrate-free async-iterable factory).
- Chain regular functions, async functions, generator functions, async generator functions, and existing streams
- Per-item backpressure — queue stays at hwm+1 even under unbounded `many()` / generator expansion
- Object mode by default in the Node-streams variant — ideal for data processing pipelines
- Native Web Streams support throughout: `asWebStream()` adapter, web type guards, `/web` chain
- Function grouping optimization for efficient pipelines
- Special return values for flow control: skip, stop, emit multiple, final value, flush at end
- Built-in JSONL (line-separated JSON) parser and stringer
- Utility functions for common stream operations: take, skip, fold, scan, batch, lines
- Non-destructive async-iterator wrappers (`makeStreamPuller`, `makeWebStreamPuller`)
- TypeScript support with typed streams

## Quick start

Install:

```bash
npm i stream-chain
```

Create a pipeline (`example.mjs`):

```js
import chain from 'stream-chain';
import {Readable} from 'node:stream';

const pipeline = chain([
  x => x * x,
  x => x % 2 ? x : null,
  x => x + 1
]);

const source = Readable.from([1, 2, 3, 4, 5]);
source.pipe(pipeline);
pipeline.on('data', x => console.log(x));
// Output: 2, 10, 26
```

Run: `node example.mjs`

## Importing

```js
// ESM (shortest form is the default export)
import chain from 'stream-chain';
import {none, stop, many, finalValue, flushable} from 'stream-chain';

// CommonJS (4.x requires destructure — no bare-callable form)
const {chain, none, stop, many} = require('stream-chain');

// Substrate variants (each subpath's default is its own `chain`)
import chain from 'stream-chain/node';   // canonical Node Streams chain (same as default)
import chain from 'stream-chain/web';    // native Web Streams chain (browser-safe)
import chain from 'stream-chain/core';   // substrate-free async-iterable chain

// Individual modules (ESM)
import gen from 'stream-chain/gen.js';
import fun from 'stream-chain/fun.js';
import asStream from 'stream-chain/asStream.js';
import asWebStream from 'stream-chain/asWebStream.js';
import {none, stop, many, finalValue, flushable, isReadableWebStream, isWritableWebStream, isDuplexWebStream} from 'stream-chain/defs.js';

// Utilities
import take from 'stream-chain/utils/take.js';
import takeWhile from 'stream-chain/utils/takeWhile.js';
import takeWithSkip from 'stream-chain/utils/takeWithSkip.js';
import skip from 'stream-chain/utils/skip.js';
import skipWhile from 'stream-chain/utils/skipWhile.js';
import fold from 'stream-chain/utils/fold.js';
import reduce from 'stream-chain/utils/reduce.js';
import reduceStream from 'stream-chain/utils/reduceStream.js';
import scan from 'stream-chain/utils/scan.js';
import batch from 'stream-chain/utils/batch.js';
import lines from 'stream-chain/utils/lines.js';
import fixUtf8Stream from 'stream-chain/utils/fixUtf8Stream.js';
import readableFrom from 'stream-chain/utils/readableFrom.js';

// JSONL
import parser from 'stream-chain/jsonl/parser.js';
import parserStream from 'stream-chain/jsonl/parserStream.js';
import stringerStream from 'stream-chain/jsonl/stringerStream.js';

// TypeScript helpers
import {TypedReadable, TypedWritable, TypedDuplex, TypedTransform} from 'stream-chain/typed-streams.js';
```

## chain() API

`chain(fns, options?)` — creates a Duplex stream from an array of pipeline steps.

Arguments:

- `fns` (array) — items can be:
  - **Functions**: regular, async, generator, async generator.
  - **Streams**: Transform, Duplex. First item can be Readable. Last item can be Writable.
  - **Web streams**: ReadableStream, WritableStream, `{readable, writable}` pair. Adapted automatically.
  - **Arrays**: flattened recursively; elements included verbatim.
  - **Falsy values**: ignored (filtered out).
- `options` (object, optional) — extends `DuplexOptions` with:
  - `noGrouping` (boolean, default: false) — if true, each function becomes a separate stream. If false, consecutive functions are grouped via `gen()` for efficiency.
  - `skipEvents` (boolean, default: false) — if true, error events from internal streams are not forwarded.
  - Default: `{writableObjectMode: true, readableObjectMode: true}`.

Returns: `Duplex` stream with additional properties:
- `streams` — array of all internal streams created by the chain.
- `input` — the first stream (write to it or attach event handlers).
- `output` — the last stream (read from it or attach event handlers).

### How functions are called

Functions receive `(chunk, encoding)`. The return value determines what happens:

- **`null`, `undefined`, or `none`** — no value passed downstream (filter). Note: `null`/`undefined` are treated as `none` in `asStream()`/`chain()` because Node.js streams reserve them for end-of-stream signaling. `gen()` and `fun()` pass `null`/`undefined` through like any other value.
- **`stop`** — no value passed and generator terminates (gen/fun only).
- **Regular value** — passed to next step.
- **`many(values)`** — all values in the array emitted sequentially.
- **`finalValue(value)`** — value emitted directly, remaining steps skipped (gen/fun only).
- **Promise** — awaited; resolved value handled as above.
- **Generator/iterator** — iterated; each yielded value handled as above.
- **Thrown exception** — caught, emitted as stream error event.

### chainUnchecked(fns, options?)

Same as `chain()` in JavaScript. In TypeScript, bypasses type checking on `fns`. Accepts optional `<W, R>` type parameters.

```ts
import {chainUnchecked} from 'stream-chain';
const pipeline = chainUnchecked<number, string>([x => String(x * x)]);
```

## Special values (defs.js)

All special values can be imported from `'stream-chain'` or `'stream-chain/defs.js'`.

**Convention:** the special markers below (`none`/`stop`/`many`/`finalValue`) are for **regular function returns only**. Generators (sync `function*` and async `async function*`) must yield plain values — express skip via `continue`, stop via `return`, emit multiple via separate `yield`s. Yielding a marker from a generator is unsupported and may pass through as opaque data.

### none

`Symbol.for('object-stream.none')` — return from a function to skip producing output for this input.

```js
chain([x => x > 0 ? x : none]); // filter: pass only positive values
```

### stop

`Symbol.for('object-stream.stop')` — return to skip and terminate the generator. Works only within `gen()` or `fun()` segments.

```js
chain([
  function* () { for (let i = 0; ; ++i) yield i; },
  n => n > 100 ? stop : n
]); // produces 0..100
```

### many(values)

Wraps an array to emit multiple values from a single input.

```js
chain([x => many([x - 1, x, x + 1])]); // 5 → 4, 5, 6
```

Helper functions:
- `isMany(value)` — check if value is a Many wrapper.
- `getManyValues(value)` — extract the array from a Many wrapper.
- `toMany(value)` — convert any value to Many: `none` → `many([])`, `x` → `many([x])`, `many(arr)` → `many(arr)`.
- `normalizeMany(value)` — `many([])` → `none`, `many([x])` → `x`, `many([...])` → `many([...])`.
- `combineMany(...args)` — merge any number of values (any of none/value/many) into a single Many. Returns new Many.
- `combineManyMut(a, ...args)` — like `combineMany` but may mutate `a` (the first argument) for performance.

### finalValue(value)

Wraps a value to skip all remaining functions in the current gen/fun segment. The value is emitted directly. Does not work in native streams (treated as regular value).

```js
chain([[
  x => x * x,
  x => finalValue(x), // skip the next step
  x => x + 1          // never called
]]);
```

Helper functions:
- `isFinalValue(value)` — check if value is a final wrapper.
- `getFinalValue(value)` — extract the value.

### flushable(fn, final?)

Marks a function to be called when the stream ends. When called at end-of-stream, `fn` receives `none` as its argument. Alternatively, provide a separate `final` function that is called with no arguments at end-of-stream.

```js
let sum = 0;
chain([
  flushable(x => {
    if (x === none) return sum; // emit accumulated value at end
    sum += x;
    return none;
  })
]); // input: 1, 2, 3 → output: 6
```

Equivalent with separate final function:

```js
let sum = 0;
chain([
  flushable(
    x => { sum += x; return none; },
    () => sum
  )
]);
```

Helper function:
- `isFlushable(fn)` — check if function is marked as flushable.

### Function lists

Used internally for optimization. When `chain()` encounters a function tagged with a function list, it can inline the underlying functions for better performance.

- `setFunctionList(fn, fns)` — tag a function with its underlying function array.
- `isFunctionList(fn)` — check if function has a function list.
- `getFunctionList(fn)` — extract the function array.
- `clearFunctionList(fn)` — remove the tag to prevent inlining.

```js
import {gen, clearFunctionList} from 'stream-chain';

const inlined = gen(x => x + 1, x => x * x);     // will be inlined by chain()
const opaque = clearFunctionList(gen(x => x + 1)); // will NOT be inlined
```

### Stop (exception class)

Can be thrown instead of returning `stop`:

```js
import {Stop} from 'stream-chain/defs.js';
chain([n => { if (n > 100) throw new Stop(); return n; }]);
```

## gen(...fns)

Creates an async generator pipeline from functions. Each input value is passed through all functions sequentially. Supports all special return values.

- Functions can be regular, async, generator, or async generator.
- Arrays in the argument list are flattened. Falsy values are ignored.
- If any function is flushable, the result is also flushable.
- The result is tagged with a function list for chain() optimization.
- Unlike `asStream()`/`chain()`, `gen()` passes `null`/`undefined` through the pipeline like any other value. Use `none` for consistent skip behavior.

```js
import gen from 'stream-chain/gen.js';

const pipeline = gen(
  function* (n) { for (let i = 0; i < n; ++i) yield i; },
  x => x * x
);
for await (const v of pipeline(3)) {
  console.log(v); // 0, 1, 4
}
```

## fun(...fns)

Like `gen()` but returns a function instead of a generator. Values from generators are collected into `many()` arrays. For purely synchronous pipelines it returns a synchronous result; for asynchronous pipelines it returns a `Promise`. Like `gen()`, passes `null`/`undefined` through the pipeline (unlike `asStream()`/`chain()`).

**Memory caveat:** `fun()` collects the entire output for a single input into one `Many` before returning. Memory scales with output-per-input. For unbounded pipelines (a stage that produces unbounded values from a single input), use `gen()` instead.

**Explicit-import policy:** `fun()` is intentionally NOT on the default `stream-chain` / `stream-chain/node` export. Import it from `stream-chain/fun.js` directly, or use the `/web` / `/core` chains where it's re-exported. The friction is deliberate — it makes the output-size profile explicit at the call site.

```js
import fun from 'stream-chain/fun.js';
import {getManyValues} from 'stream-chain/defs.js';

const f = fun(
  function* (n) { for (let i = 0; i < n; ++i) yield i; },
  x => x * x
);
const result = await f(3);
console.log(getManyValues(result)); // [0, 1, 4]
```

## asStream(fn, options?)

Wraps any function as a Node Duplex stream with per-item backpressure. Supports regular, async, generator, and async generator functions. Handles all special return values. Treats `null`/`undefined` as `none` (skip) because Node.js streams reserve them for end-of-stream signaling — this differs from `gen()`/`fun()` which pass them through.

- `fn` — any function.
- `options` — Duplex options (default: `{writableObjectMode: true, readableObjectMode: true}`).
- Returns: a `Duplex` stream.

```js
import asStream from 'stream-chain/asStream.js';

const stream = asStream(x => x * x);
source.pipe(stream).pipe(destination);
```

If `fn` is already a Node stream object (`Readable`/`Writable`/`Duplex`) or a Web Stream object, it is returned as-is (passthrough).

## asWebStream(fn, options?)

Wraps any function as a Web Streams `{readable, writable}` duplex pair with per-item backpressure. Same dual-role API as `asStream` — pass a Web Streams object → returned as-is; pass a function → returns a duplex pair. NOT a `TransformStream` (`transform()` can't suspend mid-call for per-item drain).

- `fn` — any function (regular, async, generator, async generator).
- `options` — `{strategy?, readableStrategy?, writableStrategy?}` Web Streams' standard `QueuingStrategy` shape. `strategy` is shorthand for "apply to both sides"; per-side wins.
- Returns: `{readable: ReadableStream<R>, writable: WritableStream<W>}`.

```js
import asWebStream from 'stream-chain/asWebStream.js';

const {readable, writable} = asWebStream(x => x * x);
sourceReadable.pipeTo(writable);
```

Lifecycle parity with `TransformStream`: `reader.cancel(reason)` propagates to the writable side as an error; `writer.abort(reason)` errors both sides and unblocks any pending backpressure await; user-function errors propagate to both sides.

## dataSource(fn)

Takes a function or iterable and returns the underlying iterator function. Substrate-agnostic — exported from all three subpaths (`stream-chain` / `/web` / `/core`) and attached as `chain.dataSource` on each.

- Function → returns the function as-is.
- Async iterable → returns `Symbol.asyncIterator` bound to the object.
- Iterable → returns `Symbol.iterator` bound to the object.

```js
import {dataSource} from 'stream-chain';        // /node
import {dataSource} from 'stream-chain/web';    // /web
import {dataSource} from 'stream-chain/core';   // /core
const iter = dataSource([1, 2, 3]); // returns [].values bound to array
```

## Stream type guards

Shape-based predicates exported from `stream-chain/defs.js` (no `node:stream` import — browser-safe to call from any substrate):

```js
import {
  isReadableWebStream,
  isWritableWebStream,
  isDuplexWebStream,
  isReadableNodeStream,
  isWritableNodeStream,
  isDuplexNodeStream
} from 'stream-chain/defs.js';
```

Web guards check for `getReader`/`pipeTo` / `getWriter`/`abort`. Node guards check for `.pipe`/`.on`/`_readableState` / `.write`/`.on`/`_writableState`. The `chain.X` static surface on `/node` and `/web` entries mirrors them.

## Utilities

All utilities return functions suitable for use in `chain()`. Import from `'stream-chain/utils/<name>.js'`.

### take(n, finalValue?)

Take `n` items from the stream. After `n` items, returns `finalValue` (default: `none`). Use `stop` as `finalValue` to terminate the stream early.

```js
import take from 'stream-chain/utils/take.js';
import {stop} from 'stream-chain/defs.js';

chain([
  function* () { for (let i = 0; ; ++i) yield i; },
  take(5, stop) // take 5 items then stop
]);
```

### takeWhile(fn, finalValue?)

Take items while `fn(item)` returns truthy. `fn` can be async. Once `fn` returns falsy, returns `finalValue` for all subsequent items.

```js
import takeWhile from 'stream-chain/utils/takeWhile.js';
chain([takeWhile(x => x < 100, stop)]);
```

### takeWithSkip(n, skip?, finalValue?)

Skip `skip` items (default: 0), then take `n` items.

```js
import takeWithSkip from 'stream-chain/utils/takeWithSkip.js';
chain([takeWithSkip(5, 2, stop)]); // skip 2, take 5
```

### skip(n)

Skip `n` items from the beginning, pass all remaining.

```js
import skip from 'stream-chain/utils/skip.js';
chain([skip(5)]); // skip first 5 items
```

### skipWhile(fn)

Skip items while `fn(item)` returns truthy. `fn` can be async. Once `fn` returns falsy, pass all remaining items.

```js
import skipWhile from 'stream-chain/utils/skipWhile.js';
chain([skipWhile(x => x.status !== 'ready')]);
```

### fold(fn, initial)

Reduce the entire stream to a single value emitted at end. `fn(accumulator, item)` returns new accumulator. `fn` can be async.

```js
import fold from 'stream-chain/utils/fold.js';
chain([fold((acc, x) => acc + x, 0)]); // sum all values
```

### reduce(fn, initial)

Alias for `fold`.

### scan(fn, initial)

Running accumulator — emits the current accumulator after each item. `fn(accumulator, item)` returns new accumulator. `fn` can be async.

```js
import scan from 'stream-chain/utils/scan.js';
chain([scan((acc, x) => acc + x, 0)]); // input: 1,2,3 → output: 1,3,6
```

### reduceStream(fn, initial) / reduceStream(options)

Creates a Node `Writable` that reduces values. The current accumulator is available as `.accumulator`.

- `reduceStream(fn, initial)` — simple form.
- `reduceStream({reducer, initial, ...writableOptions})` — options form.

Reducer is called with `this` bound to the writable (`reducer.call(this, this.accumulator, chunk)`). Explicit `null` / `false` / `0` / `''` / `undefined` for `initial` are respected (presence-check semantics).

```js
import reduceStream from 'stream-chain/utils/reduceStream.js';
const r = reduceStream((acc, x) => acc + x, 0);
chain([r]);
// After stream ends: r.accumulator === sum
```

### reduceWebStream(fn, initial) / reduceWebStream(options)

Web Streams counterpart of `reduceStream()`. Returns `{writable, result, accumulator}`:
- `writable` — `WritableStream<T>` you pipe input into.
- `result` — `Promise<A>` that resolves to the final accumulator on clean close, rejects on `abort` or reducer error.
- `accumulator` — live getter for the running accumulator value (mirrors Node's `stream.accumulator`).

Options shape: `{reducer, initial, strategy?, writableStrategy?}`. Reducer is called with `this` bound to the return object so `this.accumulator` works inside (parity with `reduceStream`).

```js
import reduceWebStream from 'stream-chain/utils/reduceWebStream.js';

const r = reduceWebStream((acc, x) => acc + x, 0);
source.pipeTo(r.writable);
console.log(await r.result);    // sum
console.log(r.accumulator);     // live read during the run
```

### batch(size)

Group items into fixed-size arrays. Last batch may be smaller.

```js
import batch from 'stream-chain/utils/batch.js';
chain([batch(3)]); // input: 1,2,3,4,5 → output: [1,2,3], [4,5]
```

### lines()

Split a byte/string stream into lines.

```js
import lines from 'stream-chain/utils/lines.js';
chain([lines()]); // input: "a\nb\nc" → output: "a", "b", "c"
```

### fixUtf8Stream()

Repartition byte chunks so multi-byte UTF-8 characters are not split across chunks.
Uses `TextDecoder` by default (works in every runtime); on Node, asynchronously
upgrades to `node:string_decoder` for ≈2–4× faster decoding. Named export
`whenReady()` returns a `Promise` that resolves when the upgrade has landed
(or immediately on non-Node) — optional, only needed when the Node fast path
must be in effect on the first composition.

```js
import fixUtf8Stream, {whenReady} from 'stream-chain/utils/fixUtf8Stream.js';
chain([fixUtf8Stream(), lines()]);
await whenReady(); // optional, Node fast path
```

### readableFrom(options)

Convert an iterable / iterator / function to a Node `Readable` stream.

- `options.iterable` — iterable or iterator to read from (or pass an iterable/function directly without wrapping in options).
- `options.objectMode` — default: `true`.

Accepts a 0-ary function (sync or async) returning a value, generator, or async generator. Speaks the chain protocol: producer can return `none`/`null`/`undefined` (skip), `stop` (terminate), `many([...])` (fan-out), `finalValue(v)` (unwrap).

```js
import readableFrom from 'stream-chain/utils/readableFrom.js';
chain([readableFrom({iterable: [1, 2, 3]}), x => x * 2]);
```

### readableWebStreamFrom(options)

Web Streams counterpart of `readableFrom()` — same input shapes (iterable / iterator / function / options object), returns a `ReadableStream`. Per-item backpressure via `desiredSize`/`pull()`. For the plain iterable-to-stream case the platform's `ReadableStream.from()` suffices; reach for `readableWebStreamFrom` when you need function-source / promise-resolution / chain-protocol awareness on the producer side.

```js
import readableWebStreamFrom from 'stream-chain/utils/readableWebStreamFrom.js';
import chain from 'stream-chain/web';

const c = chain([x => x * x]);
readableWebStreamFrom([1, 2, 3]).pipeThrough(c).pipeTo(destination);
```

### makeStreamPuller(readable)

Wraps a Node `Readable` as a non-destructive async iterator. Thin facade over `readable.iterator({destroyOnReturn: false})`.

- `readable` — a Node `Readable`.
- Returns: an `AsyncIterableIterator<T>` exposing `next()`, `return()`, `[Symbol.asyncIterator]()`.

Properties: preserves the original `'error'` value (no `AbortError` wrapping); synthesizes `Error('Premature close')` on `destroy()` without `end`/`error`; breaking out of `for await` does NOT destroy the source.

```js
import makeStreamPuller from 'stream-chain/utils/streamPuller.js';

for await (const v of makeStreamPuller(readable)) {
  if (shouldStop(v)) break; // source remains alive
}
```

### makeWebStreamPuller(readable)

Wraps a Web Streams `ReadableStream` as a non-destructive async iterator. Built on `stream[Symbol.asyncIterator]({preventCancel: true})` plus a `cancel(reason)` extension (the iterator-protocol `return()` can't carry a cancel reason cleanly).

- `readable` — a Web Streams `ReadableStream`.
- Returns: `{next(), return(), cancel(reason?), [Symbol.asyncIterator]()}` — async-iterator + `cancel`.

```js
import makeWebStreamPuller from 'stream-chain/utils/webStreamPuller.js';

const puller = makeWebStreamPuller(readable);
for await (const v of puller) { /* ... */ }
await puller.cancel(new Error('user aborted')); // explicit cancel-with-reason
```

## JSONL support

### parser(reviver?)

Returns a `gen()` pipeline that parses JSONL: `fixUtf8Stream → lines → JSON.parse`.

- `reviver` — optional JSON.parse reviver function, or `{reviver, ignoreErrors}` object.
- `ignoreErrors` (boolean, default: false) — if true, silently skip lines that fail to parse.

```js
import chain from 'stream-chain';
import parser from 'stream-chain/jsonl/parser.js';
import fs from 'node:fs';

chain([
  fs.createReadStream('data.jsonl'),
  parser(),
  obj => console.log(obj)
]);
```

### parserStream(options?)

Wraps `parser()` with `asStream()`. Returns a Node `Duplex`. Options extend Duplex options:
- `reviver` — JSON.parse reviver function.
- `ignoreErrors` (boolean) — silently skip parse errors (bad lines are dropped; no envelope reaches the consumer).
- Default: `{writableObjectMode: false, readableObjectMode: true}`.

Emitted records are `{key, value}` where `key` is the zero-based input line index.

```js
import parserStream from 'stream-chain/jsonl/parserStream.js';
chain([fs.createReadStream('data.jsonl'), parserStream()]);
```

### parserWebStream(options?)

Web Streams counterpart of `parserStream()`. Wraps `parser()` with `asWebStream()`. Returns `{readable, writable}`. Options:
- `reviver` — JSON.parse reviver function.
- `ignoreErrors` (boolean) — silently skip parse errors.
- `strategy` / `readableStrategy` / `writableStrategy` — Web Streams `QueuingStrategy` options.

```js
import parserWebStream from 'stream-chain/jsonl/parserWebStream.js';
import chain from 'stream-chain/web';

const c = chain([parserWebStream(), obj => console.log(obj)]);
fetchResponse.body.pipeTo(c.writable);
```

### stringerStream(options?)

Node `Transform` that serializes objects to JSONL. Options extend Duplex options:
- `replacer` — JSON.stringify replacer function.
- `space` — JSON.stringify space argument.
- `prefix` (string, default: '') — prepended to output.
- `suffix` (string, default: '') — appended to output.
- `separator` (string, default: '\n') — between items.
- `emptyValue` (string, default: prefix + suffix) — output when no values.
- Default: `{writableObjectMode: true, readableObjectMode: false}`.

```js
import stringerStream from 'stream-chain/jsonl/stringerStream.js';
chain([objectSource, stringerStream(), fs.createWriteStream('out.jsonl')]);
```

Output as JSON array:
```js
stringerStream({prefix: '[', suffix: ']', separator: ','});
// input: 1, 2, 3 → output: [1,2,3]
```

### stringerWebStream(options?)

Web Streams counterpart of `stringerStream()`. Implemented as a `TransformStream` — JSON.stringify is synchronous so the platform's transform machinery is sufficient (no need for a custom `{readable, writable}` pair like `asWebStream`). `flush()` emits the suffix (or `emptyValue` / `prefix+suffix` on an empty stream).

Options: same as `stringerStream` (`prefix`, `suffix`, `separator`, `emptyValue`, `replacer`, `space`) plus `strategy` / `readableStrategy` / `writableStrategy` for Web Streams `QueuingStrategy` configuration.

Returns `TransformStream<T, string>` — use via `pipeThrough()` or directly in a `/web` chain.

```js
import stringerWebStream from 'stream-chain/jsonl/stringerWebStream.js';

source.pipeThrough(stringerWebStream()).pipeTo(destination);
```

## TypeScript

Typed stream wrappers for precise type inference in chains:

- `TypedReadable<R>` — Readable with typed output.
- `TypedWritable<W>` — Writable with typed input.
- `TypedDuplex<W, R>` — Duplex with typed input and output.
- `TypedTransform<W, R>` — Transform with typed input and output.

```ts
import chain from 'stream-chain';
import {TypedTransform} from 'stream-chain/typed-streams.js';

const transform = new TypedTransform<number, string>({
  objectMode: true,
  transform(x, _, cb) { cb(null, String(x + 1)); }
});
const pipeline = chain([transform] as const); // ChainOutput<number, string>
```

## Common patterns

### Data processing pipeline

```js
import chain from 'stream-chain';
import fs from 'node:fs';
import zlib from 'node:zlib';

const pipeline = chain([
  x => x * x,
  x => chain.many([x - 1, x, x + 1]),
  async x => await lookupInDb(x),
  function* (x) { for (let i = x; i > 0; --i) yield i; },
  x => x % 2 ? x : null,
  x => '' + x,
  zlib.createGzip()
]);
dataSource.pipe(pipeline).pipe(fs.createWriteStream('output.gz'));
```

### Filter + transform

```js
chain([
  x => x > 0 ? x : null,  // filter: positive only
  x => x * x,              // transform: square
  x => ({value: x}),       // transform: wrap in object
]);
```

### Using input/output properties

```js
const pipeline = chain([x => x * x, x => String(x)]);
pipeline.output.pipe(destination);
source.pipe(pipeline.input);
```

### Web streams integration

```js
const readable = new ReadableStream({
  start(controller) {
    controller.enqueue(1);
    controller.enqueue(2);
    controller.close();
  }
});
chain([readable, x => x * 2]); // 2, 4
```

### JSONL file processing

```js
import chain from 'stream-chain';
import parser from 'stream-chain/jsonl/parser.js';
import stringerStream from 'stream-chain/jsonl/stringerStream.js';
import fs from 'node:fs';

chain([
  fs.createReadStream('input.jsonl'),
  parser(),
  obj => ({...obj, processed: true}),
  stringerStream(),
  fs.createWriteStream('output.jsonl')
]);
```

### Accumulate with flush

```js
import {none, flushable} from 'stream-chain/defs.js';

const items = [];
chain([
  flushable(x => {
    if (x === none) return many(items.sort());
    items.push(x);
    return none;
  })
]); // collects all items, sorts, emits at end
```

### Paginated API consumption

```js
import chain from 'stream-chain';
import take from 'stream-chain/utils/take.js';
import {stop} from 'stream-chain/defs.js';

chain([
  async function* () {
    let page = 1;
    while (true) {
      const data = await fetchPage(page++);
      if (!data.length) return;
      for (const item of data) yield item;
    }
  },
  take(100, stop),
  item => processItem(item)
]);
```

## Links

- Docs: https://github.com/uhop/stream-chain/wiki
- npm: https://www.npmjs.com/package/stream-chain
- Repository: https://github.com/uhop/stream-chain
