/** * Server startup routines for the indexing pipeline (TRUEREF-0009). * * On every server start: * 1. Mark any jobs that were left in 'running' state as 'failed' * (they were interrupted by a process crash / restart). * 2. Reset any repositories stuck in 'indexing' state to 'error'. * 3. Construct and wire together the JobQueue + IndexingPipeline singletons. * 4. Kick off processing of any jobs that were 'queued' before the restart. */ import type Database from 'better-sqlite3'; import type { EmbeddingService } from '$lib/server/embeddings/embedding.service.js'; import { crawl as githubCrawl } from '$lib/server/crawler/github.crawler.js'; import { LocalCrawler } from '$lib/server/crawler/local.crawler.js'; import { IndexingPipeline } from './indexing.pipeline.js'; import { JobQueue } from './job-queue.js'; import { WorkerPool } from './worker-pool.js'; import type { ParseWorkerResponse } from './worker-types.js'; import { initBroadcaster } from './progress-broadcaster.js'; import type { ProgressBroadcaster } from './progress-broadcaster.js'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; // --------------------------------------------------------------------------- // Stale-job recovery // --------------------------------------------------------------------------- /** * Mark jobs that were `running` when the server crashed as `failed`, and * reset repositories that were stuck in `indexing` state to `error`. * * Safe to call on every startup — uses unixepoch() so timestamps stay in * the same integer-seconds format as the rest of the schema. */ export function recoverStaleJobs(db: Database.Database): void { db.prepare( `UPDATE indexing_jobs SET status = 'failed', error = 'Server restarted while job was running', completed_at = unixepoch() WHERE status = 'running'` ).run(); db.prepare( `UPDATE repositories SET state = 'error' WHERE state = 'indexing'` ).run(); } // --------------------------------------------------------------------------- // Singleton instances // --------------------------------------------------------------------------- let _queue: JobQueue | null = null; let _pipeline: IndexingPipeline | null = null; let _pool: WorkerPool | null = null; let _broadcaster: ProgressBroadcaster | null = null; /** * Initialise (or return the existing) JobQueue + IndexingPipeline pair. * * Must be called once at server startup (e.g. from `hooks.server.ts`). * Calling it more than once is harmless — subsequent calls are no-ops that * return the already-constructed singletons. * * @param db - Raw better-sqlite3 Database instance. * @param embeddingService - Optional embedding service; pass null to disable. * @param options - Optional configuration for worker pool (concurrency, dbPath). * @returns An object with `queue` and `pipeline` accessors. */ export function initializePipeline( db: Database.Database, embeddingService: EmbeddingService | null = null, options?: { concurrency?: number; dbPath?: string } ): { queue: JobQueue; pipeline: IndexingPipeline } { if (_queue && _pipeline) { return { queue: _queue, pipeline: _pipeline }; } // Recover before constructing so no stale job gets picked up. recoverStaleJobs(db); const localCrawler = new LocalCrawler(); const pipeline = new IndexingPipeline(db, githubCrawl, localCrawler, embeddingService); const queue = new JobQueue(db); // If worker pool options are provided, create and wire the pool if (options?.dbPath) { _broadcaster = initBroadcaster(); // Resolve worker script paths relative to this file (build/workers/ directory) const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const workerScript = path.join(__dirname, '../../../build/workers/worker-entry.mjs'); const embedWorkerScript = path.join(__dirname, '../../../build/workers/embed-worker-entry.mjs'); try { _pool = new WorkerPool({ concurrency: options.concurrency ?? 2, workerScript, embedWorkerScript, dbPath: options.dbPath, onProgress: (jobId, msg) => { // Update DB with progress db.prepare( `UPDATE indexing_jobs SET stage = ?, stage_detail = ?, progress = ?, processed_files = ?, total_files = ? WHERE id = ?` ).run(msg.stage, msg.stageDetail ?? null, msg.progress, msg.processedFiles, msg.totalFiles, jobId); // Broadcast progress event if (_broadcaster) { _broadcaster.broadcast(jobId, '', 'progress', msg); } }, onJobDone: (jobId: string) => { // Update job status to done db.prepare(`UPDATE indexing_jobs SET status = 'done', completed_at = unixepoch() WHERE id = ?`).run( jobId ); // Broadcast done event if (_broadcaster) { _broadcaster.broadcast(jobId, '', 'job-done', { jobId }); } }, onJobFailed: (jobId: string, error: string) => { // Update job status to failed with error message db.prepare( `UPDATE indexing_jobs SET status = 'failed', error = ?, completed_at = unixepoch() WHERE id = ?` ).run(error, jobId); // Broadcast failed event if (_broadcaster) { _broadcaster.broadcast(jobId, '', 'job-failed', { jobId, error }); } }, onEmbedDone: (jobId: string) => { console.log('[WorkerPool] Embedding complete for job:', jobId); }, onEmbedFailed: (jobId: string, error: string) => { console.error('[WorkerPool] Embedding failed for job:', jobId, error); } }); queue.setWorkerPool(_pool); } catch (err) { console.warn( '[startup] Failed to create WorkerPool (worker scripts may not exist yet):', err instanceof Error ? err.message : String(err) ); } } _queue = queue; _pipeline = pipeline; // Drain any jobs that were queued before the restart. setImmediate(() => { const pending = db .prepare<[], { id: string }>(`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`) .get(); if (pending) { // Re-enqueue logic is handled inside JobQueue.drainQueued; we trigger it here. queue.drainQueued(); } }); return { queue, pipeline }; } /** * Accessor for the JobQueue singleton. */ export function getQueue(): JobQueue | null { return _queue; } /** * Accessor for the IndexingPipeline singleton. */ export function getPipeline(): IndexingPipeline | null { return _pipeline; } /** * Accessor for the WorkerPool singleton. */ export function getPool(): WorkerPool | null { return _pool; } /** * Accessor for the ProgressBroadcaster singleton. */ export function getBroadcaster(): ProgressBroadcaster | null { return _broadcaster; } /** * Reset singletons (for testing). */ export function _resetSingletons(): void { _queue = null; _pipeline = null; _pool = null; _broadcaster = null; }