# stream-chain

> A library for chaining functions, generators, and streams into a single Duplex stream with proper backpressure handling. Zero runtime dependencies. Works with Node.js and Bun. Supports both CommonJS and ESM consumers.

- Chain regular functions, async functions, generator functions, async generator functions, and existing streams
- Proper backpressure handling via Node.js stream infrastructure
- Object mode by default — ideal for data processing pipelines
- Web stream support (ReadableStream, WritableStream, duplex {readable, writable})
- Function grouping optimization for efficient pipelines
- Special return values for flow control: skip, stop, emit multiple, final value, flush at end
- Built-in JSONL (line-separated JSON) parser and stringer
- Utility functions for common stream operations: take, skip, fold, scan, batch, lines
- TypeScript support with typed streams

## Quick start

Install:

```bash
npm i stream-chain
```

Create a pipeline (`example.mjs`):

```js
import chain from 'stream-chain';
import {Readable} from 'node:stream';

const pipeline = chain([
  x => x * x,
  x => x % 2 ? x : null,
  x => x + 1
]);

const source = Readable.from([1, 2, 3, 4, 5]);
source.pipe(pipeline);
pipeline.on('data', x => console.log(x));
// Output: 2, 10, 26
```

Run: `node example.mjs`

## Importing

```js
// ESM (default export)
import chain from 'stream-chain';

// ESM (named exports)
import {chain, none, stop, many, finalValue, flushable, gen, asStream} from 'stream-chain';

// CommonJS
const chain = require('stream-chain');
const {none, stop, many} = require('stream-chain');

// Individual modules (ESM)
import gen from 'stream-chain/gen.js';
import fun from 'stream-chain/fun.js';
import asStream from 'stream-chain/asStream.js';
import {none, stop, many, finalValue, flushable} from 'stream-chain/defs.js';

// Utilities
import take from 'stream-chain/utils/take.js';
import takeWhile from 'stream-chain/utils/takeWhile.js';
import takeWithSkip from 'stream-chain/utils/takeWithSkip.js';
import skip from 'stream-chain/utils/skip.js';
import skipWhile from 'stream-chain/utils/skipWhile.js';
import fold from 'stream-chain/utils/fold.js';
import reduce from 'stream-chain/utils/reduce.js';
import reduceStream from 'stream-chain/utils/reduceStream.js';
import scan from 'stream-chain/utils/scan.js';
import batch from 'stream-chain/utils/batch.js';
import lines from 'stream-chain/utils/lines.js';
import fixUtf8Stream from 'stream-chain/utils/fixUtf8Stream.js';
import readableFrom from 'stream-chain/utils/readableFrom.js';

// JSONL
import parser from 'stream-chain/jsonl/parser.js';
import parserStream from 'stream-chain/jsonl/parserStream.js';
import stringerStream from 'stream-chain/jsonl/stringerStream.js';

// TypeScript helpers
import {TypedReadable, TypedWritable, TypedDuplex, TypedTransform} from 'stream-chain/typed-streams.js';
```

## chain() API

`chain(fns, options?)` — creates a Duplex stream from an array of pipeline steps.

Arguments:

- `fns` (array) — items can be:
  - **Functions**: regular, async, generator, async generator.
  - **Streams**: Transform, Duplex. First item can be Readable. Last item can be Writable.
  - **Web streams**: ReadableStream, WritableStream, `{readable, writable}` pair. Adapted automatically.
  - **Arrays**: flattened recursively; elements included verbatim.
  - **Falsy values**: ignored (filtered out).
- `options` (object, optional) — extends `DuplexOptions` with:
  - `noGrouping` (boolean, default: false) — if true, each function becomes a separate stream. If false, consecutive functions are grouped via `gen()` for efficiency.
  - `skipEvents` (boolean, default: false) — if true, error events from internal streams are not forwarded.
  - Default: `{writableObjectMode: true, readableObjectMode: true}`.

Returns: `Duplex` stream with additional properties:
- `streams` — array of all internal streams created by the chain.
- `input` — the first stream (write to it or attach event handlers).
- `output` — the last stream (read from it or attach event handlers).

### How functions are called

Functions receive `(chunk, encoding)`. The return value determines what happens:

- **`null`, `undefined`, or `none`** — no value passed downstream (filter).
- **`stop`** — no value passed and generator terminates (gen/fun only).
- **Regular value** — passed to next step.
- **`many(values)`** — all values in the array emitted sequentially.
- **`finalValue(value)`** — value emitted directly, remaining steps skipped (gen/fun only).
- **Promise** — awaited; resolved value handled as above.
- **Generator/iterator** — iterated; each yielded value handled as above.
- **Thrown exception** — caught, emitted as stream error event.

### chainUnchecked(fns, options?)

Same as `chain()` in JavaScript. In TypeScript, bypasses type checking on `fns`. Accepts optional `<W, R>` type parameters.

```ts
import {chainUnchecked} from 'stream-chain';
const pipeline = chainUnchecked<number, string>([x => String(x * x)]);
```

## Special values (defs.js)

All special values can be imported from `'stream-chain'` or `'stream-chain/defs.js'`.

### none

`Symbol.for('object-stream.none')` — return from a function to skip producing output for this input.

```js
chain([x => x > 0 ? x : none]); // filter: pass only positive values
```

### stop

`Symbol.for('object-stream.stop')` — return to skip and terminate the generator. Works only within `gen()` or `fun()` segments.

```js
chain([
  function* () { for (let i = 0; ; ++i) yield i; },
  n => n > 100 ? stop : n
]); // produces 0..100
```

### many(values)

Wraps an array to emit multiple values from a single input.

```js
chain([x => many([x - 1, x, x + 1])]); // 5 → 4, 5, 6
```

Helper functions:
- `isMany(value)` — check if value is a Many wrapper.
- `getManyValues(value)` — extract the array from a Many wrapper.
- `toMany(value)` — convert any value to Many: `none` → `many([])`, `x` → `many([x])`, `many(arr)` → `many(arr)`.
- `normalizeMany(value)` — `many([])` → `none`, `many([x])` → `x`, `many([...])` → `many([...])`.
- `combineMany(...args)` — merge any number of values (any of none/value/many) into a single Many. Returns new Many.
- `combineManyMut(a, ...args)` — like `combineMany` but may mutate `a` (the first argument) for performance.

### finalValue(value)

Wraps a value to skip all remaining functions in the current gen/fun segment. The value is emitted directly. Does not work in native streams (treated as regular value).

```js
chain([[
  x => x * x,
  x => finalValue(x), // skip the next step
  x => x + 1          // never called
]]);
```

Helper functions:
- `isFinalValue(value)` — check if value is a final wrapper.
- `getFinalValue(value)` — extract the value.

### flushable(fn, final?)

Marks a function to be called when the stream ends. When called at end-of-stream, `fn` receives `none` as its argument. Alternatively, provide a separate `final` function that is called with no arguments at end-of-stream.

```js
let sum = 0;
chain([
  flushable(x => {
    if (x === none) return sum; // emit accumulated value at end
    sum += x;
    return none;
  })
]); // input: 1, 2, 3 → output: 6
```

Equivalent with separate final function:

```js
let sum = 0;
chain([
  flushable(
    x => { sum += x; return none; },
    () => sum
  )
]);
```

Helper function:
- `isFlushable(fn)` — check if function is marked as flushable.

### Function lists

Used internally for optimization. When `chain()` encounters a function tagged with a function list, it can inline the underlying functions for better performance.

- `setFunctionList(fn, fns)` — tag a function with its underlying function array.
- `isFunctionList(fn)` — check if function has a function list.
- `getFunctionList(fn)` — extract the function array.
- `clearFunctionList(fn)` — remove the tag to prevent inlining.

```js
import {gen, clearFunctionList} from 'stream-chain';

const inlined = gen(x => x + 1, x => x * x);     // will be inlined by chain()
const opaque = clearFunctionList(gen(x => x + 1)); // will NOT be inlined
```

### Stop (exception class)

Can be thrown instead of returning `stop`:

```js
import {Stop} from 'stream-chain/defs.js';
chain([n => { if (n > 100) throw new Stop(); return n; }]);
```

## gen(...fns)

Creates an async generator pipeline from functions. Each input value is passed through all functions sequentially. Supports all special return values.

- Functions can be regular, async, generator, or async generator.
- Arrays in the argument list are flattened. Falsy values are ignored.
- If any function is flushable, the result is also flushable.
- The result is tagged with a function list for chain() optimization.

```js
import gen from 'stream-chain/gen.js';

const pipeline = gen(
  function* (n) { for (let i = 0; i < n; ++i) yield i; },
  x => x * x
);
for await (const v of pipeline(3)) {
  console.log(v); // 0, 1, 4
}
```

## fun(...fns)

Like `gen()` but returns an async function. Values from generators are collected into `many()` arrays.

```js
import fun from 'stream-chain/fun.js';
import {getManyValues} from 'stream-chain/defs.js';

const f = fun(
  function* (n) { for (let i = 0; i < n; ++i) yield i; },
  x => x * x
);
const result = await f(3);
console.log(getManyValues(result)); // [0, 1, 4]
```

## asStream(fn, options?)

Wraps any function as a Duplex stream. Supports regular, async, generator, and async generator functions. Handles all special return values.

- `fn` — any function.
- `options` — Duplex options (default: `{writableObjectMode: true, readableObjectMode: true}`).

```js
import asStream from 'stream-chain/asStream.js';

const stream = asStream(x => x * x);
source.pipe(stream).pipe(destination);
```

## dataSource(fn)

Takes a function or iterable and returns the underlying iterator function.

- Function → returns the function as-is.
- Async iterable → returns `Symbol.asyncIterator` bound to the object.
- Iterable → returns `Symbol.iterator` bound to the object.

```js
import {dataSource} from 'stream-chain';
const iter = dataSource([1, 2, 3]); // returns [].values bound to array
```

## Utilities

All utilities return functions suitable for use in `chain()`. Import from `'stream-chain/utils/<name>.js'`.

### take(n, finalValue?)

Take `n` items from the stream. After `n` items, returns `finalValue` (default: `none`). Use `stop` as `finalValue` to terminate the stream early.

```js
import take from 'stream-chain/utils/take.js';
import {stop} from 'stream-chain/defs.js';

chain([
  function* () { for (let i = 0; ; ++i) yield i; },
  take(5, stop) // take 5 items then stop
]);
```

### takeWhile(fn, finalValue?)

Take items while `fn(item)` returns truthy. `fn` can be async. Once `fn` returns falsy, returns `finalValue` for all subsequent items.

```js
import takeWhile from 'stream-chain/utils/takeWhile.js';
chain([takeWhile(x => x < 100, stop)]);
```

### takeWithSkip(n, skip?, finalValue?)

Skip `skip` items (default: 0), then take `n` items.

```js
import takeWithSkip from 'stream-chain/utils/takeWithSkip.js';
chain([takeWithSkip(5, 2, stop)]); // skip 2, take 5
```

### skip(n)

Skip `n` items from the beginning, pass all remaining.

```js
import skip from 'stream-chain/utils/skip.js';
chain([skip(5)]); // skip first 5 items
```

### skipWhile(fn)

Skip items while `fn(item)` returns truthy. `fn` can be async. Once `fn` returns falsy, pass all remaining items.

```js
import skipWhile from 'stream-chain/utils/skipWhile.js';
chain([skipWhile(x => x.status !== 'ready')]);
```

### fold(fn, initial)

Reduce the entire stream to a single value emitted at end. `fn(accumulator, item)` returns new accumulator. `fn` can be async.

```js
import fold from 'stream-chain/utils/fold.js';
chain([fold((acc, x) => acc + x, 0)]); // sum all values
```

### reduce(fn, initial)

Alias for `fold`.

### scan(fn, initial)

Running accumulator — emits the current accumulator after each item. `fn(accumulator, item)` returns new accumulator. `fn` can be async.

```js
import scan from 'stream-chain/utils/scan.js';
chain([scan((acc, x) => acc + x, 0)]); // input: 1,2,3 → output: 1,3,6
```

### reduceStream(fn, initial) / reduceStream(options)

Creates a Writable stream that reduces values. The current accumulator is available as `.accumulator`.

- `reduceStream(fn, initial)` — simple form.
- `reduceStream({reducer, initial, ...writableOptions})` — options form.

```js
import reduceStream from 'stream-chain/utils/reduceStream.js';
const r = reduceStream((acc, x) => acc + x, 0);
chain([r]);
// After stream ends: r.accumulator === sum
```

### batch(size)

Group items into fixed-size arrays. Last batch may be smaller.

```js
import batch from 'stream-chain/utils/batch.js';
chain([batch(3)]); // input: 1,2,3,4,5 → output: [1,2,3], [4,5]
```

### lines()

Split a byte/string stream into lines.

```js
import lines from 'stream-chain/utils/lines.js';
chain([lines()]); // input: "a\nb\nc" → output: "a", "b", "c"
```

### fixUtf8Stream()

Repartition byte chunks so multi-byte UTF-8 characters are not split across chunks.

```js
import fixUtf8Stream from 'stream-chain/utils/fixUtf8Stream.js';
chain([fixUtf8Stream(), lines()]);
```

### readableFrom(options)

Convert an iterable/iterator to a Readable stream.

- `options.iterable` — iterable or iterator to read from.
- `options.objectMode` — default: `true`.

```js
import readableFrom from 'stream-chain/utils/readableFrom.js';
chain([readableFrom({iterable: [1, 2, 3]}), x => x * 2]);
```

## JSONL support

### parser(reviver?)

Returns a `gen()` pipeline that parses JSONL: `fixUtf8Stream → lines → JSON.parse`.

- `reviver` — optional JSON.parse reviver function, or `{reviver, ignoreErrors}` object.
- `ignoreErrors` (boolean, default: false) — if true, silently skip lines that fail to parse.

```js
import chain from 'stream-chain';
import parser from 'stream-chain/jsonl/parser.js';
import fs from 'node:fs';

chain([
  fs.createReadStream('data.jsonl'),
  parser(),
  obj => console.log(obj)
]);
```

### parserStream(options?)

Wraps `parser()` with `asStream()`. Options extend Duplex options:
- `reviver` — JSON.parse reviver function.
- `ignoreErrors` (boolean) — silently skip parse errors.
- Default: `{writableObjectMode: false, readableObjectMode: true}`.

```js
import parserStream from 'stream-chain/jsonl/parserStream.js';
chain([fs.createReadStream('data.jsonl'), parserStream()]);
```

### stringerStream(options?)

Duplex stream that serializes objects to JSONL. Options extend Duplex options:
- `replacer` — JSON.stringify replacer function.
- `space` — JSON.stringify space argument.
- `prefix` (string, default: '') — prepended to output.
- `suffix` (string, default: '') — appended to output.
- `separator` (string, default: '\n') — between items.
- `emptyValue` (string, default: prefix + suffix) — output when no values.
- Default: `{writableObjectMode: true, readableObjectMode: false}`.

```js
import stringerStream from 'stream-chain/jsonl/stringerStream.js';
chain([objectSource, stringerStream(), fs.createWriteStream('out.jsonl')]);
```

Output as JSON array:
```js
stringerStream({prefix: '[', suffix: ']', separator: ','});
// input: 1, 2, 3 → output: [1,2,3]
```

## TypeScript

Typed stream wrappers for precise type inference in chains:

- `TypedReadable<R>` — Readable with typed output.
- `TypedWritable<W>` — Writable with typed input.
- `TypedDuplex<W, R>` — Duplex with typed input and output.
- `TypedTransform<W, R>` — Transform with typed input and output.

```ts
import chain from 'stream-chain';
import {TypedTransform} from 'stream-chain/typed-streams.js';

const transform = new TypedTransform<number, string>({
  objectMode: true,
  transform(x, _, cb) { cb(null, String(x + 1)); }
});
const pipeline = chain([transform] as const); // ChainOutput<number, string>
```

## Common patterns

### Data processing pipeline

```js
import chain from 'stream-chain';
import fs from 'node:fs';
import zlib from 'node:zlib';

const pipeline = chain([
  x => x * x,
  x => chain.many([x - 1, x, x + 1]),
  async x => await lookupInDb(x),
  function* (x) { for (let i = x; i > 0; --i) yield i; },
  x => x % 2 ? x : null,
  x => '' + x,
  zlib.createGzip()
]);
dataSource.pipe(pipeline).pipe(fs.createWriteStream('output.gz'));
```

### Filter + transform

```js
chain([
  x => x > 0 ? x : null,  // filter: positive only
  x => x * x,              // transform: square
  x => ({value: x}),       // transform: wrap in object
]);
```

### Using input/output properties

```js
const pipeline = chain([x => x * x, x => String(x)]);
pipeline.output.pipe(destination);
source.pipe(pipeline.input);
```

### Web streams integration

```js
const readable = new ReadableStream({
  start(controller) {
    controller.enqueue(1);
    controller.enqueue(2);
    controller.close();
  }
});
chain([readable, x => x * 2]); // 2, 4
```

### JSONL file processing

```js
import chain from 'stream-chain';
import parser from 'stream-chain/jsonl/parser.js';
import stringerStream from 'stream-chain/jsonl/stringerStream.js';
import fs from 'node:fs';

chain([
  fs.createReadStream('input.jsonl'),
  parser(),
  obj => ({...obj, processed: true}),
  stringerStream(),
  fs.createWriteStream('output.jsonl')
]);
```

### Accumulate with flush

```js
import {none, flushable} from 'stream-chain/defs.js';

const items = [];
chain([
  flushable(x => {
    if (x === none) return many(items.sort());
    items.push(x);
    return none;
  })
]); // collects all items, sorts, emits at end
```

### Paginated API consumption

```js
import chain from 'stream-chain';
import take from 'stream-chain/utils/take.js';
import {stop} from 'stream-chain/defs.js';

chain([
  async function* () {
    let page = 1;
    while (true) {
      const data = await fetchPage(page++);
      if (!data.length) return;
      for (const item of data) yield item;
    }
  },
  take(100, stop),
  item => processItem(item)
]);
```

## Links

- Docs: https://github.com/uhop/stream-chain/wiki
- npm: https://www.npmjs.com/package/stream-chain
- Repository: https://github.com/uhop/stream-chain
