/** * SQLite-backed job queue for indexing jobs (TRUEREF-0009). * * Jobs are processed sequentially (one at a time) to avoid SQLite write * contention. The queue uses setImmediate to yield to the event loop between * jobs so that API requests remain responsive. */ import type Database from 'better-sqlite3'; import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js'; import { IndexingJob, IndexingJobEntity } from '$lib/server/models/indexing-job.js'; import type { IndexingPipeline } from './indexing.pipeline.js'; // --------------------------------------------------------------------------- // SQL projection + row mapper (mirrors repository.service.ts pattern) // --------------------------------------------------------------------------- const JOB_SELECT = `SELECT * FROM indexing_jobs`; export class JobQueue { private isRunning = false; private pipeline: IndexingPipeline | null = null; constructor(private readonly db: Database.Database) {} /** * Inject the pipeline dependency (avoids circular construction order). */ setPipeline(pipeline: IndexingPipeline): void { this.pipeline = pipeline; } /** * Enqueue a new indexing job for the given repository. * If a job for this repository is already queued or running, returns the * existing job instead of creating a duplicate. */ enqueue(repositoryId: string, versionId?: string): IndexingJob { // Return early if there's already an active job for this repo. const activeRaw = this.db .prepare<[string], IndexingJobEntity>( `${JOB_SELECT} WHERE repository_id = ? AND status IN ('queued', 'running') ORDER BY created_at DESC LIMIT 1` ) .get(repositoryId); if (activeRaw) { // Ensure the queue is draining even if enqueue was called concurrently. if (!this.isRunning) setImmediate(() => this.processNext()); return IndexingJobMapper.fromEntity(new IndexingJobEntity(activeRaw)); } const now = Math.floor(Date.now() / 1000); const job = new IndexingJob({ id: crypto.randomUUID(), repositoryId, versionId: versionId ?? null, status: 'queued', progress: 0, totalFiles: 0, processedFiles: 0, error: null, startedAt: null, completedAt: null, createdAt: new Date(now * 1000) }); this.db .prepare( `INSERT INTO indexing_jobs (id, repository_id, version_id, status, progress, total_files, processed_files, error, started_at, completed_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) .run( job.id, job.repositoryId, job.versionId, job.status, job.progress, job.totalFiles, job.processedFiles, job.error, job.startedAt, job.completedAt, now ); // Kick off sequential processing if not already running. if (!this.isRunning) { setImmediate(() => this.processNext()); } const created = this.db .prepare<[string], IndexingJobEntity>(`${JOB_SELECT} WHERE id = ?`) .get(job.id as string)!; return IndexingJobMapper.fromEntity(new IndexingJobEntity(created)); } /** * Pick the oldest queued job and run it through the pipeline. * Called recursively via setImmediate so the event loop stays unblocked. */ private async processNext(): Promise { if (this.isRunning) return; if (!this.pipeline) { console.warn('[JobQueue] No pipeline configured — cannot process jobs.'); return; } const rawJob = this.db .prepare<[], IndexingJobEntity>( `${JOB_SELECT} WHERE status = 'queued' ORDER BY created_at ASC LIMIT 1` ) .get(); if (!rawJob) return; const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob)); this.isRunning = true; try { await this.pipeline.run(job); } catch (err) { // Error is logged inside pipeline.run(); no action needed here. console.error( `[JobQueue] Job ${job.id} failed: ${err instanceof Error ? err.message : String(err)}` ); } finally { this.isRunning = false; // Check whether another job was queued while this one ran. const next = this.db .prepare<[], { id: string }>( `SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1` ) .get(); if (next) { setImmediate(() => this.processNext()); } } } /** * Retrieve a single job by ID. */ getJob(id: string): IndexingJob | null { const raw = this.db .prepare<[string], IndexingJobEntity>(`${JOB_SELECT} WHERE id = ?`) .get(id); return raw ? IndexingJobMapper.fromEntity(new IndexingJobEntity(raw)) : null; } /** * List recent jobs, optionally filtered by repository and/or status. */ listJobs(options?: { repositoryId?: string; status?: IndexingJob['status']; limit?: number; }): IndexingJob[] { const limit = Math.min(options?.limit ?? 20, 200); const conditions: string[] = []; const params: unknown[] = []; if (options?.repositoryId) { conditions.push('repository_id = ?'); params.push(options.repositoryId); } if (options?.status) { conditions.push('status = ?'); params.push(options.status); } const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; const sql = `${JOB_SELECT} ${where} ORDER BY created_at DESC LIMIT ?`; params.push(limit); return (this.db.prepare(sql).all(...params) as IndexingJobEntity[]).map( (row) => IndexingJobMapper.fromEntity(new IndexingJobEntity(row)) ); } /** * Trigger processing of any queued jobs (e.g. after server restart). * Safe to call multiple times; a no-op if the queue is already running. */ drainQueued(): void { if (!this.isRunning) { setImmediate(() => this.processNext()); } } /** * Count all jobs matching optional filters. */ countJobs(options?: { repositoryId?: string; status?: IndexingJob['status'] }): number { const conditions: string[] = []; const params: unknown[] = []; if (options?.repositoryId) { conditions.push('repository_id = ?'); params.push(options.repositoryId); } if (options?.status) { conditions.push('status = ?'); params.push(options.status); } const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; const sql = `SELECT COUNT(*) as n FROM indexing_jobs ${where}`; const row = this.db.prepare(sql).get(...params); return row?.n ?? 0; } }