# stream-chain

> Chain functions, generators, and streams into a single pipeline with proper per-item backpressure. Zero dependencies. 4.x is ESM-only and ships three substrate variants: `stream-chain` (default = Node Streams), `stream-chain/web` (native Web Streams), `stream-chain/core` (substrate-free async iterables).

## Install

npm i stream-chain

Requires Node 22, 24, or 26.

## Quick start

```js
import chain from 'stream-chain';

const pipeline = chain([
  x => x * x,
  x => x % 2 ? x : null,
  async x => await process(x)
]);

dataSource.pipe(pipeline).pipe(destination);
```

## API

### chain(fns[, options])

Creates a Duplex stream from an array of functions, streams, or arrays (flattened).

- `fns` (array) — functions, streams, or nested arrays. Falsy values are ignored.
- `options` (object, optional) — Duplex options plus:
  - `noGrouping` (boolean) — disable function grouping optimization (default: false).
  - `skipEvents` (boolean) — disable error event forwarding (default: false).
- Default: `{writableObjectMode: true, readableObjectMode: true}`.
- Returns: `Duplex` stream with `.streams`, `.input`, `.output` properties.

Supported function types: regular, async, generator, async generator.

### chainUnchecked(fns[, options])

Same as `chain()` but bypasses TypeScript type checking on the `fns` parameter.

```js
import {chainUnchecked} from 'stream-chain';
const pipeline = chainUnchecked([x => x * x]);
```

### Special return values

- `none` — skip, produce no value (same as returning `null`/`undefined`).
- `stop` — skip and terminate the generator pipeline.
- `many(values)` — emit multiple values from a single input.
- `finalValue(value)` — skip remaining chain steps, emit value directly (gen/fun only).
- `flushable(fn, final?)` — mark function to be called at stream end.

**Convention:** Generators (sync `function*` and async `async function*`) must yield plain values only — not `none`/`stop`/`many(...)`/`finalValue(...)`. Use language constructs instead: skip with `continue`, stop with `return`, emit multiple via separate `yield`s. The markers are for regular-function returns only.

```js
import chain from 'stream-chain';
import {none, stop, many, finalValue, flushable} from 'stream-chain/defs.js';

chain([
  x => x % 2 ? x : none,
  x => many([x, x * 10]),
]);
```

### gen(...fns)

Creates an async generator pipeline from functions. Used internally by `chain()` for grouping.

```js
import gen from 'stream-chain/gen.js';

const g = gen(x => x + 1, x => x * x);
for await (const v of g(2)) console.log(v); // 9
```

### fun(...fns)

Like `gen()` but returns an async function. Generator results are collected into `many()`.

**Memory caveat:** `fun()` collects all outputs for a single input into one `Many` before returning. Memory scales with output-per-input. Unsafe for unbounded expansions; `gen()` is the safe default.

Intentionally NOT exported from the default `stream-chain` / `stream-chain/node` entry — requires an explicit import. Available from `stream-chain/fun.js` directly and re-exported by `/web` and `/core`.

```js
import fun from 'stream-chain/fun.js';

const f = fun(x => x + 1, x => x * x);
console.log(await f(2)); // 9
```

### asStream(fn[, options])

Wraps any function as a Node Duplex stream with per-item backpressure.

```js
import asStream from 'stream-chain/asStream.js';
const stream = asStream(x => x * x);
```

### asWebStream(fn[, options])

Wraps any function as a Web Streams `{readable, writable}` duplex pair with per-item backpressure. NOT a TransformStream — `transform()` can't suspend mid-call for per-item drain.

```js
import asWebStream from 'stream-chain/asWebStream.js';
const {readable, writable} = asWebStream(x => x * x);
```

`options` accepts `{strategy, readableStrategy, writableStrategy}` — Web Streams' standard `QueuingStrategy` shape.

### Subpaths

```js
import chain from 'stream-chain';       // default — same as /node
import chain from 'stream-chain/node';  // canonical Node Streams chain
import chain from 'stream-chain/web';   // native Web Streams chain (browser-safe)
import chain from 'stream-chain/core';  // async-iterable chain (no streams at all)
```

The `/node` chain returns a `Duplex`. The `/web` chain returns `{readable, writable}`. The `/core` chain returns a callable: `(input?) => AsyncGenerator<R>`.

### Web Streams type guards

```js
import {
  isReadableWebStream,
  isWritableWebStream,
  isDuplexWebStream
} from 'stream-chain/defs.js';
```

Also accessible as `chain.isReadableWebStream` etc.

### dataSource(fn)

Takes a function or iterable and returns the underlying iterator function.

```js
import {dataSource} from 'stream-chain';
const iter = dataSource([1, 2, 3]);
```

## Utilities

All utilities return functions for use in `chain()`.

### Slicing

- `take(n, finalValue?)` — take N items then stop.
- `takeWhile(fn, finalValue?)` — take while predicate is true.
- `takeWithSkip(n, skip?, finalValue?)` — skip then take.
- `skip(n)` — skip N items.
- `skipWhile(fn)` — skip while predicate is true.

### Folding

- `fold(fn, initial)` — reduce stream to single value at end.
- `reduce(fn, initial)` — alias for fold.
- `scan(fn, initial)` — emit running accumulator after each item.
- `reduceStream(fn, initial)` — reduce as Writable stream with `.accumulator`.

### Stream helpers

- `batch(size)` — group items into arrays of `size`.
- `lines()` — split byte stream into lines.
- `fixUtf8Stream()` — repartition chunks for valid UTF-8.
- `readableFrom({iterable})` — convert iterable to Readable stream.

### Async-iterator wrappers (4.x)

- `makeStreamPuller(readable)` — wrap a Node `Readable` as a non-destructive async iterator (`stream-chain/utils/streamPuller.js`).
- `makeWebStreamPuller(readable)` — wrap a Web `ReadableStream` as a non-destructive async iterator with `cancel(reason)` extension (`stream-chain/utils/webStreamPuller.js`).

Both implement `for await` directly. Used by stream-join / stream-sorting for downstream merge operations.

```js
import makeStreamPuller from 'stream-chain/utils/streamPuller.js';

for await (const v of makeStreamPuller(readable)) {
  if (shouldStop(v)) break;  // source remains alive — non-destructive
}
```

```js
import take from 'stream-chain/utils/take.js';
import fold from 'stream-chain/utils/fold.js';
import batch from 'stream-chain/utils/batch.js';

chain([
  take(10, stop),
  batch(3),
  fold((acc, x) => acc + x.length, 0)
]);
```

## JSONL support

- `parser(reviver?)` — JSONL parser function (returns gen() pipeline).
- `parserStream(options?)` — JSONL parser as a stream.
- `stringerStream(options?)` — JSONL stringer as a stream.

```js
import chain from 'stream-chain';
import parser from 'stream-chain/jsonl/parser.js';
import fs from 'node:fs';

chain([
  fs.createReadStream('data.jsonl'),
  parser(),
  obj => console.log(obj)
]);
```

## Common patterns

### Object processing pipeline

```js
import chain from 'stream-chain';

const pipeline = chain([
  x => x * x,
  x => chain.many([x - 1, x, x + 1]),
  x => x % 2 ? x : null,
]);
dataSource.pipe(pipeline);
pipeline.on('data', x => console.log(x));
```

### Async pipeline with filtering

```js
chain([
  async x => await fetchData(x),
  x => x.status === 200 ? x.body : null,
  x => JSON.parse(x),
]);
```

### Generator producing multiple values

```js
chain([
  function* (x) {
    for (let i = 0; i < x; ++i) yield i;
  },
  x => x * x,
]);
```

### Accumulate and emit at end

```js
import {none, flushable} from 'stream-chain/defs.js';

let sum = 0;
chain([
  flushable(x => {
    if (x === none) return sum;
    sum += x;
    return none;
  })
]);
```

### Web streams

```js
const readable = new ReadableStream({ /* ... */ });
const writable = new WritableStream({ /* ... */ });

chain([readable, x => x * 2, writable]);
```

## TypeScript

```ts
import chain from 'stream-chain';
import {TypedTransform} from 'stream-chain/typed-streams.js';

const transform = new TypedTransform<number, string>({
  objectMode: true,
  transform(x, _, cb) { cb(null, String(x)); }
});
const pipeline = chain([transform] as const);
```

## Links

- Docs: https://github.com/uhop/stream-chain/wiki
- npm: https://www.npmjs.com/package/stream-chain
- Full LLM reference: https://github.com/uhop/stream-chain/blob/master/llms-full.txt
