import { Worker } from 'node:worker_threads'; import { existsSync } from 'node:fs'; import type { ParseWorkerRequest, ParseWorkerResponse, EmbedWorkerRequest, EmbedWorkerResponse, WorkerInitData } from './worker-types.js'; export interface WorkerPoolOptions { concurrency: number; workerScript: string; embedWorkerScript: string; dbPath: string; embeddingProfileId?: string; onProgress: (jobId: string, msg: ParseWorkerResponse) => void; onJobDone: (jobId: string) => void; onJobFailed: (jobId: string, error: string) => void; onEmbedDone: (jobId: string) => void; onEmbedFailed: (jobId: string, error: string) => void; } interface QueuedJob { jobId: string; repositoryId: string; versionId?: string | null; } interface RunningJob { jobId: string; repositoryId: string; } interface EmbedQueuedJob { jobId: string; repositoryId: string; versionId: string | null; } export class WorkerPool { private workers: Worker[] = []; private idleWorkers: Worker[] = []; private embedWorker: Worker | null = null; private embedReady = false; private jobQueue: QueuedJob[] = []; private runningJobs = new Map(); private runningRepoIds = new Set(); private embedQueue: EmbedQueuedJob[] = []; private options: WorkerPoolOptions; private fallbackMode = false; private shuttingDown = false; constructor(options: WorkerPoolOptions) { this.options = options; // Check if worker script exists if (!existsSync(options.workerScript)) { console.warn(`Worker script not found at ${options.workerScript}, entering fallback mode`); this.fallbackMode = true; return; } // Spawn parse workers for (let i = 0; i < options.concurrency; i++) { const worker = this.spawnParseWorker(); this.workers.push(worker); this.idleWorkers.push(worker); } // Optionally spawn embed worker if (options.embeddingProfileId && existsSync(options.embedWorkerScript)) { this.embedWorker = this.spawnEmbedWorker(); } } private spawnParseWorker(): Worker { const worker = new Worker(this.options.workerScript, { workerData: { dbPath: this.options.dbPath } satisfies WorkerInitData }); worker.on('message', (msg: ParseWorkerResponse) => this.onWorkerMessage(worker, msg)); worker.on('exit', (code: number) => this.onWorkerExit(worker, code)); return worker; } private spawnEmbedWorker(): Worker { const worker = new Worker(this.options.embedWorkerScript, { workerData: { dbPath: this.options.dbPath, embeddingProfileId: this.options.embeddingProfileId } satisfies WorkerInitData }); worker.on('message', (msg: EmbedWorkerResponse) => this.onEmbedWorkerMessage(msg)); return worker; } public enqueue(jobId: string, repositoryId: string, versionId?: string | null): void { if (this.shuttingDown) { console.warn('WorkerPool is shutting down, ignoring enqueue request'); return; } if (this.fallbackMode) { console.warn(`WorkerPool in fallback mode for job ${jobId} - delegating to main thread`); return; } this.jobQueue.push({ jobId, repositoryId, versionId }); this.dispatch(); } private dispatch(): void { while (this.idleWorkers.length > 0 && this.jobQueue.length > 0) { // Find first job whose repositoryId is not currently running const jobIdx = this.jobQueue.findIndex((j) => !this.runningRepoIds.has(j.repositoryId)); if (jobIdx === -1) { // No eligible job found (all repos have running jobs) break; } const job = this.jobQueue.splice(jobIdx, 1)[0]; const worker = this.idleWorkers.pop()!; this.runningJobs.set(worker, { jobId: job.jobId, repositoryId: job.repositoryId }); this.runningRepoIds.add(job.repositoryId); const msg: ParseWorkerRequest = { type: 'run', jobId: job.jobId }; worker.postMessage(msg); } } private onWorkerMessage(worker: Worker, msg: ParseWorkerResponse): void { if (msg.type === 'progress') { this.options.onProgress(msg.jobId, msg); } else if (msg.type === 'done') { const runningJob = this.runningJobs.get(worker); if (runningJob) { this.runningJobs.delete(worker); this.runningRepoIds.delete(runningJob.repositoryId); } this.idleWorkers.push(worker); this.options.onJobDone(msg.jobId); // If embedding configured, enqueue embed request if (this.embedWorker && this.options.embeddingProfileId) { const runningJobData = runningJob || { jobId: msg.jobId, repositoryId: '' }; this.enqueueEmbed(msg.jobId, runningJobData.repositoryId, null); } this.dispatch(); } else if (msg.type === 'failed') { const runningJob = this.runningJobs.get(worker); if (runningJob) { this.runningJobs.delete(worker); this.runningRepoIds.delete(runningJob.repositoryId); } this.idleWorkers.push(worker); this.options.onJobFailed(msg.jobId, msg.error); this.dispatch(); } } private onWorkerExit(worker: Worker, code: number): void { if (this.shuttingDown) { return; } // Remove from idle if present const idleIdx = this.idleWorkers.indexOf(worker); if (idleIdx !== -1) { this.idleWorkers.splice(idleIdx, 1); } // Check if there's a running job const runningJob = this.runningJobs.get(worker); if (runningJob && code !== 0) { this.runningJobs.delete(worker); this.runningRepoIds.delete(runningJob.repositoryId); this.options.onJobFailed(runningJob.jobId, `Worker crashed with code ${code}`); } else if (runningJob) { this.runningJobs.delete(worker); this.runningRepoIds.delete(runningJob.repositoryId); } // Remove from workers array const workerIdx = this.workers.indexOf(worker); if (workerIdx !== -1) { this.workers.splice(workerIdx, 1); } // Spawn replacement worker if not shutting down and we haven't reached target if (!this.shuttingDown && this.workers.length < this.options.concurrency) { const newWorker = this.spawnParseWorker(); this.workers.push(newWorker); this.idleWorkers.push(newWorker); this.dispatch(); } } private onEmbedWorkerMessage(msg: EmbedWorkerResponse): void { if (msg.type === 'ready') { this.embedReady = true; // Process any queued embed requests this.processEmbedQueue(); } else if (msg.type === 'embed-progress') { // Progress message - could be tracked but not strictly required } else if (msg.type === 'embed-done') { this.options.onEmbedDone(msg.jobId); } else if (msg.type === 'embed-failed') { this.options.onEmbedFailed(msg.jobId, msg.error); } } private processEmbedQueue(): void { if (!this.embedWorker || !this.embedReady) { return; } while (this.embedQueue.length > 0) { const job = this.embedQueue.shift(); if (job) { const msg: EmbedWorkerRequest = { type: 'embed', jobId: job.jobId, repositoryId: job.repositoryId, versionId: job.versionId }; this.embedWorker.postMessage(msg); } } } public enqueueEmbed(jobId: string, repositoryId: string, versionId: string | null): void { if (!this.embedWorker) { return; // no-op if embedding not configured } if (this.embedReady) { const msg: EmbedWorkerRequest = { type: 'embed', jobId, repositoryId, versionId }; this.embedWorker.postMessage(msg); } else { this.embedQueue.push({ jobId, repositoryId, versionId }); } } public setMaxConcurrency(n: number): void { const current = this.workers.length; if (n > current) { // Spawn additional workers for (let i = current; i < n; i++) { const worker = this.spawnParseWorker(); this.workers.push(worker); this.idleWorkers.push(worker); } } else if (n < current) { // Shut down excess idle workers const excess = current - n; for (let i = 0; i < excess; i++) { if (this.idleWorkers.length > 0) { const worker = this.idleWorkers.pop()!; const workerIdx = this.workers.indexOf(worker); if (workerIdx !== -1) { this.workers.splice(workerIdx, 1); } const msg: ParseWorkerRequest = { type: 'shutdown' }; worker.postMessage(msg); } } } } public async shutdown(): Promise { this.shuttingDown = true; const msg: ParseWorkerRequest = { type: 'shutdown' }; // Send shutdown to all parse workers for (const worker of this.workers) { try { worker.postMessage(msg); } catch { // Worker might already be exited } } // Send shutdown to embed worker if exists if (this.embedWorker) { try { const embedMsg: EmbedWorkerRequest = { type: 'shutdown' }; this.embedWorker.postMessage(embedMsg); } catch { // Worker might already be exited } } // Wait for workers to exit with timeout const timeout = 5000; const startTime = Date.now(); const checkAllExited = (): boolean => { return this.workers.length === 0 && (!this.embedWorker || !this.embedWorker.threadId); }; while (!checkAllExited() && Date.now() - startTime < timeout) { await new Promise((resolve) => setTimeout(resolve, 100)); } // Force kill any remaining workers for (const worker of this.workers) { try { worker.terminate(); } catch { // Already terminated } } if (this.embedWorker) { try { this.embedWorker.terminate(); } catch { // Already terminated } } this.workers = []; this.idleWorkers = []; this.embedWorker = null; } public get isFallbackMode(): boolean { return this.fallbackMode; } }