# stream-csv-as-json

> A micro-library of stream components for building custom CSV processing pipelines with a minimal memory footprint, on Node.js or Web Streams. It can parse CSV files far exceeding available memory streaming individual primitives using a SAX-inspired API. ESM-only, Node 22+. Runtime dependencies: `stream-chain` (pipeline composition + stream adapters) and `stream-json` (token infrastructure: `Assembler`, `emit`). Companion project to `stream-json` — uses the same token protocol, integrates with its filters, streamers, and utilities.

- Streaming SAX-inspired CSV parser producing `{name, value}` tokens
- Parse CSV files far exceeding available memory
- Runs on both Node.js streams and Web Streams (browser-safe `/web` entry)
- Individual field values can be streamed piece-wise or packed into single tokens
- asObjects converts header + data rows into object token streams
- stringer converts token streams back to CSV text
- Token protocol compatible with `stream-json` — use its filters, streamers, and utilities downstream
- Proper backpressure handling via `stream-chain` (flushable functions, `gen()`, `asStream()`/`asWebStream()`)
- Components are substrate-free factory functions returning flushable closures, with stream adapters attached per entry
- TypeScript declarations included; tokens typed as discriminated unions

## Quick start

Install (requires Node.js 22+):

```bash
npm i stream-csv-as-json
```

Stream a huge CSV file as token objects (Node.js):

```js
import fs from 'node:fs';
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';
import asObjects from 'stream-csv-as-json/as-objects.js';

const pipeline = chain([
  fs.createReadStream('data.csv'),
  parser(),
  asObjects()
]);

pipeline.on('data', token => console.log(token));
pipeline.on('end', () => console.log('done'));
```

The same pipeline on Web Streams:

```js
import {chain} from 'stream-chain/web';
import parser from 'stream-csv-as-json/web/parser.js';
import asObjects from 'stream-csv-as-json/web/as-objects.js';

const pipeline = chain([response.body.pipeThrough(new TextDecoderStream()), parser(), asObjects()]);

for await (const token of pipeline.readable) console.log(token);
```

## Entry points and substrates

ESM-only (`"type": "module"`, Node 22+). Source is split into three layers, exposed through the `exports` map (`.`, `./web`, `./*`):

- **`core/`** — substrate-free factories. `import parser from 'stream-csv-as-json/core/parser.js'` returns the flushable factory with **no** `.asStream` / `.asWebStream` adapters. Imports only `stream-chain/core` — pulls in no `node:*` and no Web globals.
- **Node (`.` and per-component subpaths)** — `stream-csv-as-json`, `stream-csv-as-json/parser.js`, etc. Re-export the core factory with **both** `.asStream` (Node `Duplex`, via `stream-chain`) and `.asWebStream` (Web pair, via `stream-chain/web`) attached.
- **`web/`** — `stream-csv-as-json/web`, `stream-csv-as-json/web/parser.js`, etc. Browser-safe (no `node:*`); the factory carries only `.asWebStream`.

```js
// Node-flavored main API (parser + emit)
import make from 'stream-csv-as-json';
import {parser} from 'stream-csv-as-json';

// Node-flavored components
import parser from 'stream-csv-as-json/parser.js';
import asObjects from 'stream-csv-as-json/as-objects.js';
import stringer from 'stream-csv-as-json/stringer.js';
import withParser from 'stream-csv-as-json/utils/with-parser.js';

// Web Streams entry (browser-safe)
import webMake from 'stream-csv-as-json/web';
import parser from 'stream-csv-as-json/web/parser.js';

// Substrate-free factory (no adapters)
import parser from 'stream-csv-as-json/core/parser.js';
```

`chain()` from `stream-chain` (Node) or `stream-chain/web` (Web) auto-wraps a returned flushable for its substrate, so component factories take identical call sites — `chain([source, parser(), stringer()])` works on either substrate.

## Token protocol

The parser emits `{name, value}` tokens. All downstream components operate on this protocol. The tokens are compatible with `stream-json`.

| Token name    | Value  | Meaning                     |
| ------------- | ------ | --------------------------- |
| `startArray`  | —      | Start of a CSV row          |
| `endArray`    | —      | End of a CSV row            |
| `startString` | —      | Start of a field value      |
| `endString`   | —      | End of a field value        |
| `stringChunk` | string | Piece of a field value      |
| `stringValue` | string | Packed complete field value |

By default, the parser emits both streamed tokens (`startString`/`stringChunk`/`endString`) and packed tokens (`stringValue`). This is controlled by options.

After `asObjects`, additional tokens appear:

| Token name    | Value  | Meaning                      |
| ------------- | ------ | ---------------------------- |
| `startObject` | —      | Start of a data row (object) |
| `endObject`   | —      | End of a data row (object)   |
| `startKey`    | —      | Start of field name          |
| `endKey`      | —      | End of field name            |
| `keyValue`    | string | Packed field name            |

Token types are exported as discriminated unions: `parser.Token` (the CSV subset above) and `asObjects.AsObjectsToken` (the parser tokens plus the object/key tokens). Narrowing on `token.name` tightens `token.value` per arm.

## Main module

The Node default export creates a Parser Duplex stream with `emit()` applied (from `stream-json/utils/emit`), so the returned stream emits named events for each token type:

```js
import make from 'stream-csv-as-json';

const stream = make();
stream.on('startArray', () => { /* row start */ });
stream.on('stringValue', val => { /* field value */ });
stream.on('endArray', () => { /* row end */ });
```

Named export: `parser` (the factory function). The `emit()` event sugar is Node-only; the Web entry's default export is `parser.asWebStream` (returns a `{readable, writable}` pair).

## Parser API

`parser(options)` — factory returning a flushable function that consumes CSV text and produces `{name, value}` tokens. Composed with `fixUtf8Stream()` via `gen()`.

- `parser(options)` — returns a flushable function for use in `chain()`.
- `parser.asStream(options)` — returns a Node `Duplex` stream (writableObjectMode: false, readableObjectMode: true).
- `parser.asWebStream(options)` — returns a Web `{readable, writable}` pair.

Options:

- `packStrings` (boolean, default: true) — emit `stringValue` tokens with the complete field value.
- `packValues` (boolean) — alias for `packStrings`.
- `streamStrings` (boolean, default: true) — emit `startString`/`stringChunk`/`endString` tokens.
- `streamValues` (boolean) — alias for `streamStrings`.
- `separator` (string, default: `','`) — field separator character.

If `packStrings` is false, `streamStrings` is forced to true (at least one representation must be emitted).

CSV parsing:
- Handles quoted fields per RFC 4180: double-quote escaping (`""` → `"`), embedded separators, embedded newlines.
- Row terminator acceptance is lenient — CRLF (RFC 4180), LF, and bare CR all work.
- A leading UTF-8 BOM (`U+FEFF`) at the start of the input is stripped.
- Each row is represented as an array: `startArray`, field tokens, `endArray`.
- Uses sticky RegExp (`/y` flag) for performance; each parser instance owns its own pattern set.

Errors:
- `"Parser cannot parse input: expected a quoted value"` — input ends mid-quote (unterminated quoted field).
- `"Parser cannot parse input: unexpected character after a quoted value"` — content other than the separator, CR, LF, or another `"` appears immediately after a closing `"`.

Errors propagate as the stream's `'error'` event (Node) or reject the readable (Web).

```js
import parser from 'stream-csv-as-json/parser.js';
import fs from 'node:fs';

fs.createReadStream('data.csv')
  .pipe(parser.asStream())
  .on('data', token => console.log(token.name, token.value))
  .on('end', () => console.log('done'));
```

With `stream-chain`:

```js
import fs from 'node:fs';
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';

const pipeline = chain([
  fs.createReadStream('data.csv'),
  parser(),
  token => { console.log(token.name, token.value); return null; }
]);
```

On Web Streams:

```js
import {chain} from 'stream-chain/web';
import parser from 'stream-csv-as-json/web/parser.js';

const pipeline = chain([webReadable, parser()]);
for await (const token of pipeline.readable) console.log(token.name, token.value);
```

## AsObjects API

`asObjects(options)` — factory returning a flushable function that uses the first CSV row as field names and converts subsequent rows from array tokens to object tokens.

- `asObjects(options)` — returns a flushable function for use in `chain()`.
- `asObjects.asStream(options)` — returns a Node `Duplex` stream (objectMode both sides).
- `asObjects.asWebStream(options)` — returns a Web `{readable, writable}` pair.
- `asObjects.withParser(options)` — creates a pipeline of CSV parser + asObjects via `gen()`.
- `asObjects.withParserAsStream(options)` — same, wrapped as a Node `Duplex` stream.
- `asObjects.withParserAsWebStream(options)` — same, wrapped as a Web `{readable, writable}` pair.

Options:

- `packKeys` (boolean, default: true) — emit `keyValue` tokens with the field name.
- `packValues` (boolean) — alias for `packKeys`.
- `streamKeys` (boolean, default: true) — emit `startKey`/`stringChunk`/`endKey` tokens for field names.
- `streamValues` (boolean) — alias for `streamKeys`.
- `fieldPrefix` (string, default: `'field'`) — prefix for unnamed fields when data has more columns than headers, or when a header cell is empty. Field name becomes `fieldPrefix + index`.
- `useStringValues` / `useValues` — deprecated no-ops kept for backward compatibility (see Header auto-detection below).

If `packKeys` is false, `streamKeys` is forced to true.

Behavior:
1. **Header phase**: Consumes the first row to build the field-name list. Auto-detects the parser's mode — captures from `startString` / `stringChunk` / `endString` when stream tokens are emitted, or from `stringValue` when only packed values are emitted. Works with every parser configuration without an explicit option.
2. **Data phase**: Converts subsequent rows:
   - `startArray` → `startObject`
   - Before each field value: emits key tokens (`startKey`/`stringChunk`/`endKey` and/or `keyValue`)
   - `endArray` → `endObject`

```js
import fs from 'node:fs';
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';
import asObjects from 'stream-csv-as-json/as-objects.js';

const pipeline = chain([
  fs.createReadStream('data.csv'),
  parser(),
  asObjects()
]);

pipeline.on('data', token => console.log(token));
```

With `withParser` shortcut:

```js
import fs from 'node:fs';
import chain from 'stream-chain';
import asObjects from 'stream-csv-as-json/as-objects.js';

const pipeline = chain([
  fs.createReadStream('data.csv'),
  asObjects.withParser()
]);

pipeline.on('data', token => console.log(token));
```

Custom field prefix:

```js
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';
import asObjects from 'stream-csv-as-json/as-objects.js';

// If headers are: name,age and data has 4 columns:
// Fields become: name, age, col2, col3
const pipeline = chain([
  parser(),
  asObjects({fieldPrefix: 'col'})
]);
```

## Stringer API

`stringer(options)` — factory returning a flushable function that converts a CSV token stream back to CSV text.

- `stringer(options)` — returns a flushable function for use in `chain()`.
- `stringer.asStream(options)` — returns a Node `Duplex` stream (writableObjectMode: true, readableObjectMode: false).
- `stringer.asWebStream(options)` — returns a Web `{readable, writable}` pair.

Options:

- `useStringValues` (boolean, default: false) — use `stringValue` tokens instead of `startString`/`stringChunk`/`endString`.
- `useValues` (boolean) — alias for `useStringValues`.
- `separator` (string, default: `','`) — field separator character.
- `rowTerminator` (string, default: `'\r\n'`) — row terminator string. CRLF per RFC 4180; override with `'\n'` for Unix-style output.

Two modes:

1. **Stream mode** (default, `useStringValues: false`): Consumes `startString`/`stringChunk`/`endString` tokens. Always quotes fields (wraps in `"`), escapes `"` as `""`.
2. **Value mode** (`useStringValues: true`): Consumes `stringValue` tokens. Quotes only when necessary (field contains separator, `"`, `\r`, or `\n`).

Rows are terminated with the configured `rowTerminator` (default `\r\n` per RFC 4180).

```js
import fs from 'node:fs';
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';
import stringer from 'stream-csv-as-json/stringer.js';

// Round-trip: parse CSV and write it back
chain([
  fs.createReadStream('input.csv'),
  parser(),
  stringer(),
  fs.createWriteStream('output.csv')
]);

// Value mode: smarter quoting
chain([
  parser({packValues: true, streamValues: false}),
  stringer({useStringValues: true}),
  fs.createWriteStream('output.csv')
]);
```

## with-parser utility

`stream-csv-as-json/utils/with-parser.js` (Node) / `stream-csv-as-json/web/utils/with-parser.js` (Web) — composes the CSV parser with another component factory via `gen()`. Most components expose `.withParser()` built on this utility.

- `withParser(fn, options)` — returns a flushable composing `parser(options)` then `fn(options)`.
- `withParser.asStream(fn, options)` — Node `Duplex`. `withParser.asWebStream(fn, options)` — Web pair.

## File components (Node-only)

File-edge stages under `stream-csv-as-json/file/` compose stream-chain's async block reader/writer with the core parser/stringer. They use `node:fs/promises` (Node-only) and resolve through the `./*` export.

- `parseFile(options)` (`stream-csv-as-json/file/parser.js`) — input-edge stage: `gen(asyncBlockReader(options), parser(options))`. Pass a file path as the input value; yields the CSV token stream. Options extend the parser options with `readBlockSize` (default 65536 / 64 KB). Also exported as the named `parser`.
- `stringerToFile(path, options)` (`stream-csv-as-json/file/stringer.js`) — output-edge sink: `gen(stringer(options), asyncBlockWriter(path, options))`. Writes the token stream to `path`. Options extend the stringer options with `writeBlockSize` (default 1048576 / 1 MB). Also exported as the named `stringer`. The writer opens lazily and closes its file handle on the flush signal, so the pipeline **must** be flushed.

Drive a file pipeline with `pipe` + `drain` from `stream-chain/utils` (`pipe` runs the flush after the data pass, which is required for the writer to close):

```js
import pipe from 'stream-chain/utils/pipe.js';
import drain from 'stream-chain/utils/drain.js';
import parseFile from 'stream-csv-as-json/file/parser.js';
import stringerToFile from 'stream-csv-as-json/file/stringer.js';

// Parse a file into tokens:
await drain(
  pipe(parseFile(), token => {
    /* handle token */ return null;
  })('in.csv')
);

// Round-trip a file:
await drain(pipe(parseFile(), stringerToFile('out.csv', {useValues: true}))('in.csv'));
```

## Common patterns

### Stream a huge CSV as token objects

```js
import fs from 'node:fs';
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';
import asObjects from 'stream-csv-as-json/as-objects.js';

const pipeline = chain([
  fs.createReadStream('huge.csv'),
  parser(),
  asObjects()
]);
pipeline.on('data', token => {
  if (token.name === 'endObject') processRow();
});
pipeline.on('end', () => console.log('done'));
```

### Compressed CSV processing

```js
import fs from 'node:fs';
import zlib from 'node:zlib';
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';
import asObjects from 'stream-csv-as-json/as-objects.js';

chain([
  fs.createReadStream('data.csv.gz'),
  zlib.createGunzip(),
  parser(),
  asObjects()
]);
```

### CSV round-trip (parse and re-emit)

```js
import fs from 'node:fs';
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';
import stringer from 'stream-csv-as-json/stringer.js';

chain([
  fs.createReadStream('input.csv'),
  parser(),
  stringer(),
  fs.createWriteStream('output.csv')
]);
```

### Custom separator (TSV)

```js
import fs from 'node:fs';
import chain from 'stream-chain';
import {parser} from 'stream-csv-as-json';
import asObjects from 'stream-csv-as-json/as-objects.js';

chain([
  fs.createReadStream('data.tsv'),
  parser({separator: '\t'}),
  asObjects()
]);
```

### Web Streams in the browser

```js
import {chain} from 'stream-chain/web';
import parser from 'stream-csv-as-json/web/parser.js';
import asObjects from 'stream-csv-as-json/web/as-objects.js';

const response = await fetch('/data.csv');
const pipeline = chain([response.body.pipeThrough(new TextDecoderStream()), parser(), asObjects()]);

for await (const token of pipeline.readable) {
  if (token.name === 'keyValue') console.log('field:', token.value);
}
```

### Event-based processing (Node)

```js
import fs from 'node:fs';
import make from 'stream-csv-as-json';

const stream = make();
let rowCount = 0;
stream.on('startArray', () => ++rowCount);
stream.on('end', () => console.log(`${rowCount} rows`));
fs.createReadStream('data.csv').pipe(stream);
```

## Links

- Docs: https://github.com/uhop/stream-csv-as-json/wiki
- npm: https://www.npmjs.com/package/stream-csv-as-json
- Repository: https://github.com/uhop/stream-csv-as-json
