# Warlock Herald — full skills

> Package: `@warlock.js/herald`

> Generated artifact. Concatenates every SKILL.md and reference file under `@warlock.js/herald/skills/`. Re-run `node scripts/generate-llms.mjs` after any change.

## consume-message  `@warlock.js/herald/consume-message/SKILL.md`

---
name: consume-message
description: 'Subscribe to a channel via .subscribe(handler, options?) — handler receives (message, ctx). Control flow via ctx.ack / ctx.nack(requeue?) / ctx.reject / ctx.retry(delayMs). Configure prefetch, retry policy, dead-letter, consumer groups. Triggers: `subscribe`, `ctx.ack`, `ctx.nack`, `ctx.reject`, `ctx.retry`, `prefetch`, `group`, `retry`, `deadLetter`, `@Consumable`, `EventConsumer`, `defineConsumer`, `autoAck`; "consume messages from a queue", "implement a worker", "background queue processor", "set up retry and dead-letter", "class-based event consumer"; typical import `import { herald, Consumable, EventConsumer } from "@warlock.js/herald"`. Skip: setup — `@warlock.js/herald/herald-basics/SKILL.md`; producing — `@warlock.js/herald/publish-message/SKILL.md`; RPC reply — `@warlock.js/herald/request-and-respond/SKILL.md`; competing libs `amqplib`, `bullmq`, `kafkajs`; NestJS `@MessagePattern` / `@EventPattern`.'
---

# Consume messages

Subscribe to a channel; receive each message with a flow-control `ctx`.

## Minimal subscribe

```ts
import { herald } from "@warlock.js/herald";

herald()
  .channel<{ userId: number; email: string }>("user.created")
  .subscribe(async (message, ctx) => {
    try {
      await sendWelcomeEmail(message.payload.email);
      await ctx.ack();          // success — message removed from queue
    } catch (error) {
      await ctx.nack(true);     // failure — requeue for redelivery
    }
  });
```

**Smart auto-ack is on by default** (when `autoAck` is unset or `false`). The consumer runs with manual-ack enabled, and herald acks/nacks based on how your handler ends:

- Handler returns without calling any `ctx` method → herald **auto-acks**.
- Handler **throws** → herald **auto-nacks** (requeue, or DLQ/reject once retries are exhausted — see below).
- Handler calls `ctx.ack()` / `ctx.nack()` / `ctx.reject()` / `ctx.retry()` → herald respects that and does nothing further.

So the `try/catch` above is optional — letting the error throw produces the same auto-nack. Call `ctx` methods explicitly only when you want a *non-default* outcome (reject without requeue, route to DLQ, delayed retry). The one mode that loses messages is `autoAck: true`, where the broker acks on delivery before your handler runs.

## Message context — flow control

```ts
ctx.ack();              // acknowledge — message removed from the queue
ctx.nack(requeue?);     // negative ack — requeue (true) or send to DLQ (false)
ctx.reject();           // shorthand for nack(false)
ctx.retry(delayMs);     // delayed retry — requeue after delay
ctx.reply(payload);     // for request-response pattern (see request-and-respond skill)
```

Pick by intent:

| Intent | Use |
| --- | --- |
| Processed cleanly | `ctx.ack()` |
| Transient failure, try again | `ctx.nack(true)` or `ctx.retry(5000)` |
| Permanent failure, send to DLQ | `ctx.nack(false)` or `ctx.reject()` |
| Validation failure (bad message) | `ctx.reject()` — don't requeue |

## Subscribe options

```ts
await channel.subscribe(handler, {
  group: "email-workers",      // consumer group / tag — multiple consumers share work
  prefetch: 10,                 // concurrency — how many in-flight messages this consumer holds
  autoAck: false,               // default; keep it false in production
  exclusive: false,             // single consumer only on this channel?
  retry: {
    maxRetries: 3,              // redelivery ceiling on handler throw (delay not auto-applied — see below)
    delay: 1000,
  },
  deadLetter: {
    channel: "user.created.failed",
    preserveOriginal: true,
  },
});
```

`group` is the unit of "share work" — N consumers in the same group split messages across them. Different groups each receive every message (fanout).

`prefetch` is the per-consumer concurrency cap. Higher = throughput; lower = even spread. For CPU-bound handlers, set ~= CPU cores. For IO-bound, can go higher (50+).

## Retry policy

```ts
retry: {
  maxRetries: 3,
  delay: 1000,                  // see the caveat below — delay is not auto-applied
}
```

`maxRetries` is the part that does the work: when a handler **throws**, herald reads the message's `x-retry-count` header and, while it's under `maxRetries`, nacks with requeue so the broker redelivers. Once `x-retry-count` reaches `maxRetries`, it nacks-without-requeue (→ DLQ if configured) or rejects outright.

**Caveat on `delay`.** `RetryOptions.delay` (number or `(attempt) => number`) is **not applied on the automatic throw path** — a thrown handler requeues immediately, with no wait. The only place a delay takes effect is the explicit `ctx.retry(delayMs)` call, which republishes the message with an `x-delay` header — and even that needs the RabbitMQ delayed-message-exchange plugin installed, or the delay is ignored. So if you need real backoff, call `ctx.retry(ms)` from inside the handler and install the plugin; don't rely on the channel-level `retry.delay` for timing.

## Dead-letter queue

```ts
deadLetter: {
  channel: "user.created.failed",
  preserveOriginal: true,        // accepted, but currently a no-op (see note)
}
```

The DLQ always forwards the full original payload plus the message metadata, so `preserveOriginal` makes no observable difference today — the field is part of the type but the RabbitMQ driver doesn't branch on it yet. Set it or omit it; behaviour is the same.

Messages that exhausted retries land in `user.created.failed`. Subscribe to it separately for alerting / manual inspection:

```ts
herald().channel("user.created.failed").subscribe(async (message, ctx) => {
  await alerts.notify(`Failed: ${JSON.stringify(message.payload)}`);
  await ctx.ack();
});
```

## Decorator-style consumers — `@Consumable` + `EventConsumer`

For class-based consumers that pair with the [`EventMessage`](@warlock.js/herald/publish-message/SKILL.md) producer layer. This is a **different handler shape** from the raw `.subscribe()` above — ack/nack is automatic, and you get the unwrapped payload plus an event-metadata object (no `ctx`):

```ts
import { Consumable, EventConsumer } from "@warlock.js/herald";
import type { ConsumedEventMessage } from "@warlock.js/herald";

@Consumable()  // or @Consumable({ broker: "analytics" }) to target a non-default broker
export class UserCreatedConsumer extends EventConsumer<{ id: number; email: string }> {
  public static eventName = "user.created";

  // handle(payload, event) — NOT (message, ctx). No ctx.ack() here.
  public async handle(payload: { id: number; email: string }, event: ConsumedEventMessage) {
    await sendWelcomeEmail(payload.email);
    // return cleanly → herald acks. throw → herald nacks (requeue).
  }
}
```

The handler receives the **already-unwrapped payload** as the first argument and a `ConsumedEventMessage` as the second — `{ messageId, eventName, payload, version?, occurredAt?, metadata?, message }` (where `message` is the raw `Message` if you need `message.metadata.headers`). There is **no `ctx`**: the framework auto-acks when `handle` resolves and auto-nacks-with-requeue when it throws, so you never call `ack`/`nack` yourself in this style.

Wiring: the channel name comes from `static eventName`, and `@Consumable` self-registers the moment the class module is **imported** — if a broker is already connected it subscribes immediately, otherwise it buffers and subscribes once `connectToBroker` fires. So the only wiring you need is to import the consumer file on the boot path (e.g. your module's `main.ts`).

**Pair it with the producer:** an `EventConsumer` expects the envelope shape that `publishEvent(new SomeEvent(...))` emits — it reads `envelope.payload`. Don't point one at a channel fed by a bare `channel(name).publish(rawBody)`; the payload nesting won't match.

### Validation + version gating

Two optional gates run before `handle`, both driven by fields on the class:

```ts
import { Consumable, EventConsumer } from "@warlock.js/herald";
import { v } from "@warlock.js/seal";

@Consumable()
export class UserCreatedConsumer extends EventConsumer {
  public static eventName = "user.created";

  // Only accept events whose `version` falls in [minVersion, maxVersion].
  // Out-of-range events are acked and skipped (not requeued).
  public static minVersion = 2;
  public static maxVersion = 3;

  // Seal schema — invalid payloads are nacked before handle() runs.
  public schema = v.object({ id: v.int(), email: v.string().email() });

  public async handle(payload, event) {
    // payload is validated + version-accepted here
  }
}
```

### `defineConsumer` — no-class shorthand

```ts
import { defineConsumer } from "@warlock.js/herald";

export const userCreatedConsumer = defineConsumer<{ id: number; email: string }>("user.created", {
  schema: userCreatedSchema,                 // optional seal validation
  handle: async (payload, event) => {
    await sendWelcomeEmail(payload.email);
  },
});
```

`defineConsumer` self-registers via `@Consumable` internally, so importing the module is all the wiring it needs — same as the class form. (`subscribeConsumer(SomeConsumerClass)` is the imperative equivalent if you'd rather register explicitly.)

## Subscription handle

`subscribe()` resolves to a `Subscription` you can manage:

```ts
const subscription = await channel.subscribe(handler);

subscription.id;            // the consumer id (string)
subscription.isActive();    // boolean

await subscription.unsubscribe();   // cancel the consumer on the broker
await subscription.pause();          // also cancels the consumer (RabbitMQ has no native pause)
```

On the RabbitMQ driver, `pause()` and `unsubscribe()` both cancel the consumer; `resume()` is **not supported and throws** — create a fresh subscription to start consuming again. You can also cancel by id from the channel: `await channel.unsubscribeById(consumerId)`, or drop every consumer on the channel with `await channel.stopConsuming()`.

## Channel admin

The channel exposes queue-management helpers beyond pub/sub:

```ts
const { messageCount, consumerCount } = await channel.stats(); // queue depth + live consumers
const purged = await channel.purge();    // drop all pending messages, returns the count
const ok = await channel.exists();        // does the queue exist on the broker?
await channel.assert();                   // create the queue with its options (idempotent)
await channel.delete();                   // remove the queue entirely
```

`assert()` runs lazily on the first `publish` / `subscribe`, so you rarely call it directly — reach for it only to pre-create a queue before any traffic.

## Things NOT to do

- Don't catch the handler's error and return normally. Smart auto-ack reads a clean return as success and **acks** — silently dropping the work. Let it throw (auto-nack) or call `ctx.reject()` / `ctx.nack(false)` deliberately.
- Don't `ctx.nack(true)` in an infinite loop. With no `retry` policy, a perpetually-failing message ping-pongs forever. Set `retry.maxRetries` + a `deadLetter` so failures eventually move out.
- Don't ignore the `deadLetter` channel. Subscribe to it (or at least monitor its depth) — DLQ growth means real failures piling up.
- Don't use `autoAck: true` for messages that matter. The broker acks on delivery, so a crash mid-handling loses the message.

## See also

- [`@warlock.js/herald/publish-message/SKILL.md`](@warlock.js/herald/publish-message/SKILL.md) — the producing side
- [`@warlock.js/herald/request-and-respond/SKILL.md`](@warlock.js/herald/request-and-respond/SKILL.md) — when you need to reply to a message


## herald-basics  `@warlock.js/herald/herald-basics/SKILL.md`

---
name: herald-basics
description: 'Start with @warlock.js/herald — connectToBroker config, herald() factory, channel concept, multi-broker support. Triggers: `connectToBroker`, `herald`, `channel`, `isDefault`, `autoAck`; "set up herald", "wire connectToBroker at boot", "configure multiple brokers", "notifications + analytics + events"; typical import `import { connectToBroker, herald } from "@warlock.js/herald"`. Skip: publishing — `@warlock.js/herald/publish-message/SKILL.md`; consuming — `@warlock.js/herald/consume-message/SKILL.md`; RPC — `@warlock.js/herald/request-and-respond/SKILL.md`; competing libs `amqplib`, `kafkajs`, `bullmq`, `nats`; NestJS messaging; native `EventEmitter`.'
---

# Herald basics

Message bus library — wraps RabbitMQ (Kafka WIP) behind a unified pub/sub API. `herald()` returns a broker, `.channel(name)` returns a pub/sub interface, type-safe via TypeScript generics.

> This skill is the herald **map** — read it first, then load the specific skill for the task.

## Install

```bash
yarn add @warlock.js/herald amqplib   # amqplib for RabbitMQ
```

## Foundations

1. **`connectToBroker(config)` is the bootstrap.** Call once per broker at app startup, before any publish/subscribe.
2. **`herald()` returns the default broker.** `herald("name")` returns a named one. Most apps have one broker; multi-broker is for "notifications + analytics + events" scale.
3. **`.channel(name)` is the queue / topic.** Publish into it from producer, subscribe to it from consumer.
4. **Channels are typed.** `channel<UserPayload>("user.created")` gives full TS inference on publish and subscribe.
5. **`@warlock.js/seal` schemas validate on publish + receive.** Pass `{ schema }` to `.channel(name, { schema })`.
6. **Subscribers control message flow** via `ctx.ack()` / `ctx.nack()` / `ctx.reject()` / `ctx.retry(ms)`.
7. **Smart auto-ack is the default** (`autoAck` unset/`false`). The consumer runs with manual-ack enabled, but herald acks for you when the handler returns cleanly and nacks-with-requeue when it throws — so a crash mid-handling re-delivers, and a clean handler that forgot `ctx.ack()` is still acked. Call `ctx` methods explicitly only when you need a non-default outcome (reject, DLQ, delayed retry). `autoAck: true` is the dangerous mode: the broker acks on delivery, so a crash loses the message.

## Minimal example

```ts
import { connectToBroker, herald } from "@warlock.js/herald";

// Boot
await connectToBroker({
  driver: "rabbitmq",
  host: "localhost",
  port: 5672,
  username: "guest",
  password: "guest",
});

// Produce
await herald().channel("user.created").publish({ userId: 1, email: "ada@example.com" });

// Consume
herald()
  .channel<{ userId: number; email: string }>("user.created")
  .subscribe(async (message, ctx) => {
    console.log("New user:", message.payload.userId);
    await ctx.ack();
  });
```

## Multi-broker

```ts
await connectToBroker({
  driver: "rabbitmq",
  name: "notifications",
  isDefault: true,
  host: process.env.NOTIFICATIONS_HOST,
});

await connectToBroker({
  driver: "rabbitmq",
  name: "analytics",
  isDefault: false, // ← required: `isDefault` defaults to true, so omitting it makes analytics the default
  host: process.env.ANALYTICS_HOST,
});

herald().channel("emails").publish({ /* ... */ });           // → notifications
herald("analytics").channel("events").publish({ /* ... */ }); // → analytics
```

`connectToBroker` defaults `isDefault` to `true`, and the registry promotes the most-recently-registered default. So when you register more than one broker, mark the secondaries `isDefault: false` — otherwise the last one wins and `herald()` returns the wrong broker.

## Pick a skill

| If the task is about… | Load |
| --- | --- |
| Publishing messages — single + batch + publish options (priority, ttl, delay, persistent, headers) | [`@warlock.js/herald/publish-message/SKILL.md`](@warlock.js/herald/publish-message/SKILL.md) |
| Subscribing to messages — handler signature, message context (ack/nack/retry), prefetch, retry policy, dead-letter | [`@warlock.js/herald/consume-message/SKILL.md`](@warlock.js/herald/consume-message/SKILL.md) |
| Request/response (RPC) — `channel.request(...)` + `channel.respond(...)` for synchronous-style calls over a message bus | [`@warlock.js/herald/request-and-respond/SKILL.md`](@warlock.js/herald/request-and-respond/SKILL.md) |
| Schema-validated channels — `.channel(name, { schema })` with `@warlock.js/seal` | See [`@warlock.js/seal/seal-basics/SKILL.md`](@warlock.js/seal/seal-basics/SKILL.md) + skills below |

## Things NOT to do

- Don't `connectToBroker` from inside a request handler. Call once at boot.
- Don't catch a handler error and swallow it — under smart auto-ack a thrown handler auto-nacks (re-delivers), but a caught-and-ignored error looks like success and gets auto-acked, silently dropping the work. Either let it throw or call `ctx.reject()` / `ctx.nack(false)` deliberately.
- Don't use `autoAck: true` in production. The default smart auto-ack re-delivers on a mid-handling crash; `autoAck: true` acks on delivery, so a crash loses the message.
- Don't share a typed channel across producer and consumer code without a shared type / schema file. Drift between sides causes silent payload corruption.

## See also

- README at `@warlock.js/herald/README.md` for the full API surface and RabbitMQ / Kafka driver config
- [`@warlock.js/seal/seal-basics/SKILL.md`](@warlock.js/seal/seal-basics/SKILL.md) — schema validation


## publish-message  `@warlock.js/herald/publish-message/SKILL.md`

---
name: publish-message
description: 'Publish messages to a channel — .publish(payload, options?) for single, .publishBatch([...], options?) for batch, with priority / ttl / delay / persistent / correlationId / headers options. Optional schema validation via .channel(name, {schema}). Triggers: `publish`, `publishBatch`, `channel`, `priority`, `ttl`, `delay`, `persistent`, `correlationId`, `headers`, `schema`; "publish a message", "emit an event after a domain change", "fan out a notification", "schedule delayed work", "batch publish"; typical import `import { herald } from "@warlock.js/herald"`. Skip: setup — `@warlock.js/herald/herald-basics/SKILL.md`; consuming — `@warlock.js/herald/consume-message/SKILL.md`; RPC — `@warlock.js/herald/request-and-respond/SKILL.md`; competing libs `amqplib`, `kafkajs`, `bullmq`; NestJS `ClientProxy.emit`.'
---

# Publish messages

Push messages into a channel. The consumer side picks them up via [`@warlock.js/herald/consume-message/SKILL.md`](@warlock.js/herald/consume-message/SKILL.md).

## Single publish

```ts
import { herald } from "@warlock.js/herald";

await herald().channel("user.created").publish({
  userId: 1,
  email: "ada@example.com",
});
```

Returns when the broker has accepted the message (not when a consumer has handled it — that's `.request()` territory).

## Typed channels

```ts
type UserPayload = { userId: number; email: string };

const channel = herald().channel<UserPayload>("user.created");

await channel.publish({ userId: 1, email: "test@example.com" });   // ✅ typed
await channel.publish({ userId: "1" } as never);                    // ❌ compile error
```

Share the payload type between producer and consumer via a common `types/` file.

## Schema-validated publish

```ts
import { v } from "@warlock.js/seal";

const userSchema = v.object({
  userId: v.int(),
  email: v.string().email(),
});

const channel = herald().channel("user.created", { schema: userSchema });

await channel.publish({ userId: 1, email: "invalid" });   // Throws — fails .email()
```

The schema runs `validate()` before the message hits the broker. Invalid payloads never leave the producer. See [`@warlock.js/seal/seal-basics/SKILL.md`](@warlock.js/seal/seal-basics/SKILL.md).

## Publish options

```ts
await channel.publish(payload, {
  priority: 5,           // 0-9, higher = served first
  expiration: 60_000,    // ms — message expires if not consumed (RabbitMQ uses `expiration`, not `ttl`)
  delay: 5_000,          // ms — delayed delivery (requires RabbitMQ delay plugin)
  persistent: true,      // survive broker restart (default true)
  correlationId: "uuid", // for tracking across services
  headers: {
    tenantId: "42",
    source: "billing-service",
  },
});
```

The headers field is free-form — consumers read `message.metadata.headers?.tenantId` to route or filter (headers live under `message.metadata`, alongside `messageId`, `correlationId`, `timestamp`, and `retryCount`).

## Batch publishing

```ts
await channel.publishBatch([
  { userId: 1, email: "a@example.com" },
  { userId: 2, email: "b@example.com" },
  { userId: 3, email: "c@example.com" },
], {
  // Options apply to every message in the batch
  persistent: true,
});
```

Under the hood `publishBatch` iterates and calls `.publish()` per message — there's no AMQP-level batching today. It's an ergonomic shape for "emit these N items" (bulk import, fanout to many recipients), not a throughput optimization. The same `options` apply to every message.

## Publish + transaction (outbox pattern)

Don't publish inside a database transaction — if the transaction rolls back but the publish already happened, you've emitted an event for a state that never persisted.

The outbox pattern: write to an `outbox` table inside the transaction, dispatch from the outbox via a worker after commit:

```ts
await transaction(async () => {
  await Order.create(orderData);
  await Outbox.create({
    channel: "order.created",
    payload: { orderId: order.id, ... },
    status: "pending",
  });
});

// In a separate worker:
const pending = await Outbox.where("status", "pending").get();
for (const row of pending) {
  await herald().channel(row.get("channel")).publish(row.get("payload"));
  await row.merge({ status: "sent", sent_at: new Date() }).save();
}
```

Full recipe at `domains/cascade/docs/recipes/outbox-pattern.md`.

## Broadcasting use-case results

`@warlock.js/herald` ships `heraldBroadcast()` — a channel adapter that lets a `@warlock.js/core` `useCase()` publish its result to the bus automatically on success. Register it once in the use-cases config; each use case opts in with `broadcast: true`.

```ts title="src/config/use-cases.ts"
import { type UseCaseConfigurations } from "@warlock.js/core";
import { heraldBroadcast } from "@warlock.js/herald";

export default {
  broadcast: {
    enabled: true,
    channels: [heraldBroadcast({ broker: "default" })], // omit broker for the default
  },
} satisfies UseCaseConfigurations;
```

```ts
// The use case opts in — channel name defaults to the use case name.
export const createUserUseCase = useCase({
  name: "users.create",      // → publishes to channel "users.create"
  schema: createUserSchema,
  handler: async (data) => User.create(data),
  broadcast: true,
});
```

The adapter publishes the broadcast envelope's `payload` to `channel(event.event)`. It's structurally typed, so herald keeps no dependency on core. See [`@warlock.js/core/write-use-case/SKILL.md`](@warlock.js/core/write-use-case/SKILL.md) for the `broadcast` option (payload projection, custom event name, payload-safety rules).

## Event classes (`EventMessage`) — the typed event layer

Raw `channel(name).publish(payload)` is the low-level path. For domain events that travel between modules, herald ships a higher-level pair: define the event once as a class, publish instances of it, and consume them with an [`EventConsumer`](@warlock.js/herald/consume-message/SKILL.md). The channel name comes from the event's `eventName` — no string to keep in sync across producer and consumer.

```ts
import { defineEvent, publishEvent } from "@warlock.js/herald";

// `defineEvent<IncomingData, OutgoingData>` — `toJSON` projects the wire payload.
const UserCreatedEvent = defineEvent<User, { id: number; email: string }>("user.created", {
  toJSON: (user) => ({ id: user.id, email: user.email }),
  // schema?: an @warlock.js/seal ObjectValidator (held on the instance; the consumer side validates)
});

// `publishEvent` serializes the instance and publishes to channel "user.created" on the default broker.
await publishEvent(new UserCreatedEvent(user));
```

`publishEvent(event)` is shorthand for `herald().publish(event)` — it calls `event.serialize()` and sends an **envelope**: `{ payload, metadata, messageId, eventName, version, occurredAt }`. That nesting is why an `EventConsumer` reads `envelope.payload`, not the raw body — pair this only with the consumer side, not a bare `.subscribe()`.

Set an optional `version` on the event (instance field) to drive the consumer's `minVersion` / `maxVersion` acceptance gate. The class form (`class X extends EventMessage`) is equivalent — override `toJSON()` and set `eventName`, `version`, `schema` as fields.

## Things NOT to do

- Don't publish inside a transaction. Use the outbox pattern.
- Don't pass non-JSON-serializable values (functions, `BigInt`, class instances with methods). Serialization happens at the broker boundary; functions go silent, BigInt throws.
- Don't `expiration: 0` — set a real expiry or omit. `0` means "expire immediately."
- Don't omit `persistent: true` for messages where loss matters. Default is `true` in this lib, but worth being explicit when the message represents money / customer state.
- Don't put secrets in headers. Headers travel in plaintext (encrypted only by TLS in transit, not at rest in the broker).

## See also

- [`@warlock.js/herald/consume-message/SKILL.md`](@warlock.js/herald/consume-message/SKILL.md) — the receiving side
- [`@warlock.js/herald/request-and-respond/SKILL.md`](@warlock.js/herald/request-and-respond/SKILL.md) — when you need a reply


## request-and-respond  `@warlock.js/herald/request-and-respond/SKILL.md`

---
name: request-and-respond
description: 'Synchronous-style RPC over the message bus — channel.request<R>(payload, {timeout}) waits for a reply, channel.respond(handler) registers the responder, ctx.reply(response) sends the answer. Triggers: `channel.request`, `channel.respond`, `ctx.reply`, `timeout`, `correlationId`, `headers.correlationId`; "RPC over message bus", "request-response across services", "wait for a reply", "internal service-to-service call instead of HTTP"; typical import `import { herald } from "@warlock.js/herald"`. Skip: setup — `@warlock.js/herald/herald-basics/SKILL.md`; fire-and-forget — `@warlock.js/herald/publish-message/SKILL.md`; consumer ctx flow — `@warlock.js/herald/consume-message/SKILL.md`; competing libs `amqplib` RPC, `nats` request/reply; NestJS `ClientProxy.send`; gRPC; HTTP.'
---

# Request-response over the bus

Most messaging is fire-and-forget. When you need a reply, use the request/respond pair — a reply-queue + correlation-id pattern under the hood, surfaced as a typed promise.

## Shape

```ts
// Caller (client)
const response = await herald()
  .channel<RequestPayload>("compute.tax")
  .request<TaxResponse>({ amount: 1000, country: "US" }, { timeout: 30_000 });

// Responder (server)
herald()
  .channel<RequestPayload>("compute.tax")
  .respond(async (message, ctx) => {
    const tax = await computeTax(message.payload);
    return { tax, currency: "USD" }; // ← the return value IS the reply
  });
```

`request` returns a promise that resolves with the responder's return value. `respond` registers a handler and **automatically replies with whatever the handler returns** (then acks) — so just `return` the response; you don't call `ctx.reply()` yourself inside a `respond` handler. (`ctx.reply` is the lower-level primitive used when you wire a plain `.subscribe()` as a responder by hand.)

## Timeout

```ts
await channel.request(payload, { timeout: 30_000 });
```

The promise rejects if no reply arrives within `timeout` ms. Pick a sane number — too short rejects mid-work; too long lets a hung responder block the caller.

## When to use it vs HTTP

| Use HTTP | Use request/respond |
| --- | --- |
| Stateless, fast, idempotent ops | Slow / queued ops where the caller can wait |
| Public API surface | Internal service-to-service |
| Frontend consumption | Backend orchestration |
| Synchronous user-facing flow | Async-but-needs-result patterns |

The bus adds queue persistence and retry. HTTP is faster for sub-100ms ops; the bus shines when the operation has variable duration or needs a queue's backpressure.

## When NOT to use it

- "I need a result in under 50ms" — too much overhead on the bus.
- "The responder might be down for hours" — request will time out repeatedly; consider `publish` + write the result somewhere the caller polls.
- "Many callers, the response is the same for all" — cache the result and use `publish` for invalidation.

## Multiple responders

`respond(handler)` takes no options — there's no `group` knob here. If multiple consumers `respond()` to the same channel, they all sit on the same queue, so RabbitMQ round-robins between them: **one responder handles each request, and the caller gets exactly one reply.** That's the "share work across responders" pattern — a synchronous worker pool. Make every responder functionally identical, since the caller has no say in which one answers.

For "multiple replies expected" patterns, use a regular `subscribe` + a request id, then the caller listens on a result channel.

## Correlation across replies

Each `request()` generates its own correlation ID and an exclusive reply queue under the hood. The reply carries the same ID; the client matches it to the awaiting promise and resolves. You don't manage correlation IDs manually — and you shouldn't pass your own `correlationId` on a `request()`, since herald overwrites it with the one it uses to match the reply. To thread your own trace id (e.g. a transaction id for logs) through to the responder, put it in `headers` instead — the responder reads it from `message.metadata.headers`.

## Things NOT to do

- Don't use request/respond for high-throughput, low-latency ops. The reply-queue round-trip adds at least one broker hop. HTTP is the right tool.
- Don't return huge payloads from `respond`. Reply messages travel through the broker; a multi-MB response is a load on the bus and on memory. For big results, write to S3 / cache and reply with a reference.
- Don't forget to `respond` in long-lived consumers. If the responder crashes, every caller's request will time out — they don't know why.
- Don't set timeout to `Infinity`. A stuck request becomes a leaked promise — set a real timeout and handle the rejection.

## See also

- [`@warlock.js/herald/publish-message/SKILL.md`](@warlock.js/herald/publish-message/SKILL.md) — fire-and-forget pattern
- [`@warlock.js/herald/consume-message/SKILL.md`](@warlock.js/herald/consume-message/SKILL.md) — subscribe + ctx flow control (including `ctx.reply`)


