All files / src batcher.ts

84% Statements 42/50
70% Branches 14/20
75% Functions 6/8
89.36% Lines 42/47

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101    2x 2x 2x   2x 14x 14x 14x                 14x 14x 14x 14x   14x 14x 14x   14x 14x       12x 12x 12x 1x               9x 9x 1x     8x 8x   8x 8x                 7x 2x 2x     5x 5x   1x 1x   8x 8x                       15x 14x 14x   15x       8x         1x    
import type { EvalyticOptions, FlushResult, LLMCallRecord } from './types.js';
 
const DEFAULT_ENDPOINT = 'https://api.evalytic.dev';
const DEFAULT_BATCH_SIZE = 10;
const DEFAULT_FLUSH_INTERVAL_MS = 5000;
 
export class CallBatcher {
  private buffer: LLMCallRecord[] = [];
  private timer: ReturnType<typeof setInterval> | null = null;
  private flushing = false;
 
  private readonly apiKey: string;
  private readonly endpoint: string;
  private readonly batchSize: number;
  private readonly debug: boolean;
  private readonly beforeExitHandler: () => void;
 
  constructor(opts: EvalyticOptions) {
    this.apiKey = opts.apiKey ?? '';
    this.endpoint = (opts.endpoint ?? DEFAULT_ENDPOINT).replace(/\/+$/, '');
    this.batchSize = opts.batchSize ?? DEFAULT_BATCH_SIZE;
    this.debug = opts.debug ?? false;
 
    const intervalMs = opts.flushIntervalMs ?? DEFAULT_FLUSH_INTERVAL_MS;
    this.timer = setInterval(() => void this.flush(), intervalMs);
    this.timer.unref();
 
    this.beforeExitHandler = () => void this.flush();
    process.on('beforeExit', this.beforeExitHandler);
  }
 
  push(record: LLMCallRecord): void {
    try {
      this.buffer.push(record);
      if (this.buffer.length >= this.batchSize) {
        void this.flush();
      }
    } catch (error) {
      this.log(`push error: ${stringifyError(error)}`);
    }
  }
 
  async flush(): Promise<FlushResult> {
    try {
      if (this.flushing || this.buffer.length === 0) {
        return { sent: 0, failed: 0 };
      }
 
      this.flushing = true;
      const batch = this.buffer.splice(0);
 
      try {
        const response = await fetch(`${this.endpoint}/v1/calls/batch`, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
            'X-Evalytic-Key': this.apiKey,
          },
          body: JSON.stringify({ calls: batch }),
        });
 
        if (!response.ok) {
          this.log(`flush failed: HTTP ${response.status}`);
          return { sent: 0, failed: batch.length };
        }
 
        this.log(`flushed ${batch.length} calls`);
        return { sent: batch.length, failed: 0 };
      } catch (error) {
        this.log(`flush error: ${stringifyError(error)}`);
        return { sent: 0, failed: batch.length };
      } finally {
        this.flushing = false;
        Iif (this.buffer.length >= this.batchSize) {
          void this.flush();
        }
      }
    } catch (error) {
      this.flushing = false;
      this.log(`flush internal error: ${stringifyError(error)}`);
      return { sent: 0, failed: 0 };
    }
  }
 
  destroy(): void {
    if (this.timer) {
      clearInterval(this.timer);
      this.timer = null;
    }
    process.off('beforeExit', this.beforeExitHandler);
  }
 
  private log(msg: string): void {
    Iif (this.debug) console.debug(`[evalytic] ${msg}`);
  }
}
 
function stringifyError(error: unknown): string {
  return error instanceof Error ? error.message : String(error);
}