All files / src/utils async.ts

100% Statements 28/28
100% Branches 6/6
100% Functions 10/10
100% Lines 24/24

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83              40x         40x 40x       31x 13x 13x   18x 18x         31x 31x 18x   13x                                         69x 69x     93x 113x 106x 106x 106x       69x 93x     66x 103x                     1x    
/**
 * A counting semaphore for limiting concurrent access to a shared resource.
 * Callers acquire a permit before starting work and release it when done.
 * If no permits are available, acquire() blocks until one is released.
 */
export class Semaphore {
  private permits: number;
  private waiters: (() => void)[] = [];
  /** The initial permit count this semaphore was created with. */
  readonly initialPermits: number;
 
  constructor(permits: number) {
    this.permits = permits;
    this.initialPermits = permits;
  }
 
  async acquire(): Promise<void> {
    if (this.permits > 0) {
      this.permits--;
      return;
    }
    return new Promise<void>((resolve) => {
      this.waiters.push(resolve);
    });
  }
 
  release(): void {
    const next = this.waiters.shift();
    if (next) {
      next();
    } else {
      this.permits++;
    }
  }
}
 
/**
 * Run async work items with a sliding-window concurrency pool.
 * Spawns up to `concurrency` workers that each grab the next
 * queued item as soon as they finish, keeping all slots busy.
 *
 * Results are returned in input order regardless of completion order.
 * When `shouldAbort` is provided and returns true, workers stop
 * picking up new items; already-started items run to completion.
 * Only completed items appear in the returned array.
 */
export async function runPool<T, R>(
  items: T[],
  concurrency: number,
  fn: (item: T, index: number) => Promise<R>,
  options?: { shouldAbort?: () => boolean }
): Promise<R[]> {
  const results: { index: number; value: R }[] = [];
  let nextIndex = 0;
 
  async function worker(): Promise<void> {
    while (nextIndex < items.length) {
      if (options?.shouldAbort?.()) break;
      const index = nextIndex++;
      const item = items[index] as T;
      results.push({ index, value: await fn(item, index) });
    }
  }
 
  const workerCount = Math.min(concurrency, items.length);
  await Promise.all(Array.from({ length: workerCount }, () => worker()));
 
  // Return results in input order
  results.sort((a, b) => a.index - b.index);
  return results.map((r) => r.value);
}
 
/**
 * Process items with limited concurrency using a sliding-window pool.
 */
export async function processInBatches<T, R>(
  items: T[],
  fn: (item: T) => Promise<R>,
  batchSize: number
): Promise<R[]> {
  return runPool(items, batchSize, fn);
}