All files / src/workers WorkerCommunicator.ts

61.22% Statements 30/49
20% Branches 3/15
62.5% Functions 10/16
63.04% Lines 29/46

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135                3x               3x     3x 3x 3x             1x       1x 1x                               1x 1x   1x 1x 1x                       1x 1x 1x     1x                         1x   1x           1x 1x                     1x 1x   1x 1x   1x       1x           1x 1x                          
import { EngineError } from "../errors/EngineError.js";
import { EngineErrorCode } from "../types.js";
 
/**
 * 2026 Zenith Tier: Worker との物理的な通信を管理。
 */
export class WorkerCommunicator {
  private worker: Worker;
  private pendingExpectations = new Map<
    string,
    {
      resolve: (data: unknown) => void;
      reject: (err: unknown) => void;
      predicate: (data: unknown) => boolean;
    }
  >();
  private messageListeners = new Set<(data: unknown) => void>();
 
  constructor(workerUrl: string) {
    this.worker = new Worker(workerUrl);
    this.worker.onmessage = (event) => this.handleMessage(event.data);
    this.worker.onerror = (error) => this.handleError(error);
  }
 
  /**
   * Worker へメッセージを送信します。
   */
  postMessage(message: unknown): void {
    this.worker.postMessage(message);
  }
 
  onMessage(callback: (data: unknown) => void): () => void {
    this.messageListeners.add(callback);
    return () => this.messageListeners.delete(callback);
  }
 
  /**
   * 特定の条件を満たすメッセージを待機します。
   */
  expectMessage<T>(
    predicate: (data: unknown) => boolean,
    options:
      | number
      | {
          timeoutMs?: number | undefined;
          signal?: AbortSignal | undefined;
        } = 5000,
  ): Promise<T> {
    const timeoutMs =
      typeof options === "number" ? options : (options.timeoutMs ?? 5000);
    const signal = typeof options === "number" ? undefined : options.signal;
 
    const id = Math.random().toString(36).substring(2);
    return new Promise<T>((resolve, reject) => {
      const timer = setTimeout(() => {
        if (this.pendingExpectations.has(id)) {
          this.pendingExpectations.delete(id);
          reject(
            new EngineError({
              code: EngineErrorCode.TIMEOUT,
              message: "Timed out waiting for worker message",
            }),
          );
        }
      }, timeoutMs);
 
      const cleanup = () => {
        clearTimeout(timer);
        signal?.removeEventListener("abort", abortHandler);
      };
 
      const abortHandler = () => {
        if (this.pendingExpectations.has(id)) {
          this.pendingExpectations.delete(id);
          cleanup();
          reject(
            new EngineError({
              code: EngineErrorCode.CANCELLED,
              message: "Operation cancelled",
            }),
          );
        }
      };
 
      signal?.addEventListener("abort", abortHandler);
 
      this.pendingExpectations.set(id, {
        resolve: (data) => {
          cleanup();
          resolve(data as T);
        },
        reject: (err) => {
          cleanup();
          reject(err);
        },
        predicate,
      });
    });
  }
 
  /**
   * Worker を物理的に終了し、保留中のタスクを全てリジェクトします。
   */
  async terminate(): Promise<void> {
    for (const expectation of this.pendingExpectations.values()) {
      expectation.reject(new Error("Worker terminated"));
    }
    this.pendingExpectations.clear();
    this.messageListeners.clear();
 
    this.worker.terminate();
  }
 
  private handleMessage(data: unknown): void {
    for (const [id, expectation] of this.pendingExpectations.entries()) {
      if (expectation.predicate(data)) {
        expectation.resolve(data);
        this.pendingExpectations.delete(id);
      }
    }
    for (const listener of this.messageListeners) {
      listener(data);
    }
  }
 
  private handleError(error: ErrorEvent): void {
    console.error("Worker error:", error);
    const err = new Error(error.message || "Worker error");
    for (const expectation of this.pendingExpectations.values()) {
      expectation.reject(err);
    }
    this.pendingExpectations.clear();
  }
}