# stream-chain

> Chain functions, generators, and streams into a single Duplex stream with proper backpressure handling. Zero dependencies.

## Install

npm i stream-chain

## Quick start

```js
import chain from 'stream-chain';

const pipeline = chain([
  x => x * x,
  x => x % 2 ? x : null,
  async x => await process(x)
]);

dataSource.pipe(pipeline).pipe(destination);
```

## API

### chain(fns[, options])

Creates a Duplex stream from an array of functions, streams, or arrays (flattened).

- `fns` (array) — functions, streams, or nested arrays. Falsy values are ignored.
- `options` (object, optional) — Duplex options plus:
  - `noGrouping` (boolean) — disable function grouping optimization (default: false).
  - `skipEvents` (boolean) — disable error event forwarding (default: false).
- Default: `{writableObjectMode: true, readableObjectMode: true}`.
- Returns: `Duplex` stream with `.streams`, `.input`, `.output` properties.

Supported function types: regular, async, generator, async generator.

### chainUnchecked(fns[, options])

Same as `chain()` but bypasses TypeScript type checking on the `fns` parameter.

```js
import {chainUnchecked} from 'stream-chain';
const pipeline = chainUnchecked([x => x * x]);
```

### Special return values

- `none` — skip, produce no value (same as returning `null`/`undefined`).
- `stop` — skip and terminate the generator pipeline.
- `many(values)` — emit multiple values from a single input.
- `finalValue(value)` — skip remaining chain steps, emit value directly (gen/fun only).
- `flushable(fn, final?)` — mark function to be called at stream end.

```js
import chain from 'stream-chain';
import {none, stop, many, finalValue, flushable} from 'stream-chain/defs.js';

chain([
  x => x % 2 ? x : none,
  x => many([x, x * 10]),
]);
```

### gen(...fns)

Creates an async generator pipeline from functions. Used internally by `chain()` for grouping.

```js
import gen from 'stream-chain/gen.js';

const g = gen(x => x + 1, x => x * x);
for await (const v of g(2)) console.log(v); // 9
```

### fun(...fns)

Like `gen()` but returns an async function. Generator results are collected into `many()`.

```js
import fun from 'stream-chain/fun.js';

const f = fun(x => x + 1, x => x * x);
console.log(await f(2)); // 9
```

### asStream(fn[, options])

Wraps any function as a Duplex stream.

```js
import asStream from 'stream-chain/asStream.js';
const stream = asStream(x => x * x);
```

### dataSource(fn)

Takes a function or iterable and returns the underlying iterator function.

```js
import {dataSource} from 'stream-chain';
const iter = dataSource([1, 2, 3]);
```

## Utilities

All utilities return functions for use in `chain()`.

### Slicing

- `take(n, finalValue?)` — take N items then stop.
- `takeWhile(fn, finalValue?)` — take while predicate is true.
- `takeWithSkip(n, skip?, finalValue?)` — skip then take.
- `skip(n)` — skip N items.
- `skipWhile(fn)` — skip while predicate is true.

### Folding

- `fold(fn, initial)` — reduce stream to single value at end.
- `reduce(fn, initial)` — alias for fold.
- `scan(fn, initial)` — emit running accumulator after each item.
- `reduceStream(fn, initial)` — reduce as Writable stream with `.accumulator`.

### Stream helpers

- `batch(size)` — group items into arrays of `size`.
- `lines()` — split byte stream into lines.
- `fixUtf8Stream()` — repartition chunks for valid UTF-8.
- `readableFrom({iterable})` — convert iterable to Readable stream.

```js
import take from 'stream-chain/utils/take.js';
import fold from 'stream-chain/utils/fold.js';
import batch from 'stream-chain/utils/batch.js';

chain([
  take(10, stop),
  batch(3),
  fold((acc, x) => acc + x.length, 0)
]);
```

## JSONL support

- `parser(reviver?)` — JSONL parser function (returns gen() pipeline).
- `parserStream(options?)` — JSONL parser as a stream.
- `stringerStream(options?)` — JSONL stringer as a stream.

```js
import chain from 'stream-chain';
import parser from 'stream-chain/jsonl/parser.js';
import fs from 'node:fs';

chain([
  fs.createReadStream('data.jsonl'),
  parser(),
  obj => console.log(obj)
]);
```

## Common patterns

### Object processing pipeline

```js
import chain from 'stream-chain';

const pipeline = chain([
  x => x * x,
  x => chain.many([x - 1, x, x + 1]),
  x => x % 2 ? x : null,
]);
dataSource.pipe(pipeline);
pipeline.on('data', x => console.log(x));
```

### Async pipeline with filtering

```js
chain([
  async x => await fetchData(x),
  x => x.status === 200 ? x.body : null,
  x => JSON.parse(x),
]);
```

### Generator producing multiple values

```js
chain([
  function* (x) {
    for (let i = 0; i < x; ++i) yield i;
  },
  x => x * x,
]);
```

### Accumulate and emit at end

```js
import {none, flushable} from 'stream-chain/defs.js';

let sum = 0;
chain([
  flushable(x => {
    if (x === none) return sum;
    sum += x;
    return none;
  })
]);
```

### Web streams

```js
const readable = new ReadableStream({ /* ... */ });
const writable = new WritableStream({ /* ... */ });

chain([readable, x => x * 2, writable]);
```

## TypeScript

```ts
import chain from 'stream-chain';
import {TypedTransform} from 'stream-chain/typed-streams.js';

const transform = new TypedTransform<number, string>({
  objectMode: true,
  transform(x, _, cb) { cb(null, String(x)); }
});
const pipeline = chain([transform] as const);
```

## Links

- Docs: https://github.com/uhop/stream-chain/wiki
- npm: https://www.npmjs.com/package/stream-chain
- Full LLM reference: https://github.com/uhop/stream-chain/blob/master/llms-full.txt
