/** * SQLite-backed job queue for indexing jobs (TRUEREF-0009). * * Jobs are processed sequentially (one at a time) to avoid SQLite write * contention. The queue uses setImmediate to yield to the event loop between * jobs so that API requests remain responsive. */ import type Database from 'better-sqlite3'; import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js'; import { IndexingJob, IndexingJobEntity } from '$lib/server/models/indexing-job.js'; import type { WorkerPool } from './worker-pool.js'; // --------------------------------------------------------------------------- // SQL projection + row mapper (mirrors repository.service.ts pattern) // --------------------------------------------------------------------------- const JOB_SELECT = `SELECT * FROM indexing_jobs`; type JobStatusFilter = IndexingJob['status'] | Array; function escapeLikePattern(value: string): string { return value.replaceAll('\\', '\\\\').replaceAll('%', '\\%').replaceAll('_', '\\_'); } function isSpecificRepositoryId(repositoryId: string): boolean { return repositoryId.split('/').filter(Boolean).length >= 2; } function normalizeStatuses(status?: JobStatusFilter): Array { if (!status) { return []; } const statuses = Array.isArray(status) ? status : [status]; return [...new Set(statuses)]; } function buildJobFilterQuery(options?: { repositoryId?: string; status?: JobStatusFilter; }): { where: string; params: unknown[] } { const conditions: string[] = []; const params: unknown[] = []; if (options?.repositoryId) { if (isSpecificRepositoryId(options.repositoryId)) { conditions.push('repository_id = ?'); params.push(options.repositoryId); } else { conditions.push(`(repository_id = ? OR repository_id LIKE ? ESCAPE '\\')`); params.push(options.repositoryId, `${escapeLikePattern(options.repositoryId)}/%`); } } const statuses = normalizeStatuses(options?.status); if (statuses.length > 0) { conditions.push(`status IN (${statuses.map(() => '?').join(', ')})`); params.push(...statuses); } return { where: conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '', params }; } export class JobQueue { private workerPool: WorkerPool | null = null; constructor(private readonly db: Database.Database) {} /** * Inject the worker pool dependency (alternative to direct pipeline calling). * When set, enqueue() will delegate to the pool instead of calling processNext(). */ setWorkerPool(pool: WorkerPool): void { this.workerPool = pool; } /** * Enqueue a new indexing job for the given repository. * If a job for this repository is already queued or running, returns the * existing job instead of creating a duplicate. */ enqueue(repositoryId: string, versionId?: string): IndexingJob { // Return early if there's already an active job for this exact (repo, version) pair. const resolvedVersionId = versionId ?? null; const activeRaw = this.db .prepare<[string, string | null, string | null], IndexingJobEntity>( `${JOB_SELECT} WHERE repository_id = ? AND (version_id = ? OR (version_id IS NULL AND ? IS NULL)) AND status IN ('queued', 'running') ORDER BY created_at DESC LIMIT 1` ) .get(repositoryId, resolvedVersionId, resolvedVersionId); if (activeRaw) { // Ensure the queue is draining even if enqueue was called concurrently. if (!this.workerPool) { setImmediate(() => this.processNext()); } return IndexingJobMapper.fromEntity(new IndexingJobEntity(activeRaw)); } const now = Math.floor(Date.now() / 1000); const job = new IndexingJob({ id: crypto.randomUUID(), repositoryId, versionId: versionId ?? null, status: 'queued', progress: 0, totalFiles: 0, processedFiles: 0, stage: 'queued', stageDetail: null, error: null, startedAt: null, completedAt: null, createdAt: new Date(now * 1000) }); this.db .prepare( `INSERT INTO indexing_jobs (id, repository_id, version_id, status, progress, total_files, processed_files, stage, stage_detail, error, started_at, completed_at, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) .run( job.id, job.repositoryId, job.versionId, job.status, job.progress, job.totalFiles, job.processedFiles, job.stage, job.stageDetail, job.error, job.startedAt, job.completedAt, now ); // Delegate to worker pool if available, otherwise fall back to direct processing if (this.workerPool) { this.workerPool.enqueue(job.id, repositoryId, versionId ?? null); } else { setImmediate(() => this.processNext()); } const created = this.db .prepare<[string], IndexingJobEntity>(`${JOB_SELECT} WHERE id = ?`) .get(job.id as string)!; return IndexingJobMapper.fromEntity(new IndexingJobEntity(created)); } /** * Pick the oldest queued job and run it through the pipeline directly. * This is now a fallback method used only when no WorkerPool is set. * Called via setImmediate so the event loop stays unblocked. */ private async processNext(): Promise { // Fallback path: no worker pool configured, run directly (used by tests and dev mode) console.warn('[JobQueue] Running in fallback mode (no worker pool) — direct pipeline execution.'); const rawJob = this.db .prepare<[], IndexingJobEntity>( `${JOB_SELECT} WHERE status = 'queued' ORDER BY created_at ASC LIMIT 1` ) .get(); if (!rawJob) return; console.warn('[JobQueue] processNext: no pipeline or pool configured — skipping job processing'); } /** * Retrieve a single job by ID. */ getJob(id: string): IndexingJob | null { const raw = this.db.prepare<[string], IndexingJobEntity>(`${JOB_SELECT} WHERE id = ?`).get(id); return raw ? IndexingJobMapper.fromEntity(new IndexingJobEntity(raw)) : null; } /** * List recent jobs, optionally filtered by repository and/or status. */ listJobs(options?: { repositoryId?: string; status?: JobStatusFilter; limit?: number; }): IndexingJob[] { const limit = Math.min(options?.limit ?? 20, 200); const { where, params } = buildJobFilterQuery(options); const sql = `${JOB_SELECT} ${where} ORDER BY created_at DESC LIMIT ?`; params.push(limit); return ( this.db.prepare(sql).all(...params) as IndexingJobEntity[] ).map((row) => IndexingJobMapper.fromEntity(new IndexingJobEntity(row))); } /** * Trigger processing of any queued jobs (e.g. after server restart). * If a worker pool is configured, delegates to it. Otherwise falls back to direct processing. * Safe to call multiple times. */ drainQueued(): void { if (this.workerPool) { // Delegate all queued jobs to the worker pool const queued = this.db .prepare<[], IndexingJobEntity>(`${JOB_SELECT} WHERE status = 'queued'`) .all(); for (const rawJob of queued) { const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob)); this.workerPool.enqueue(job.id, job.repositoryId, job.versionId); } } else { // Fallback: direct pipeline processing setImmediate(() => this.processNext()); } } /** * Count all jobs matching optional filters. */ countJobs(options?: { repositoryId?: string; status?: IndexingJob['status'] }): number { const { where, params } = buildJobFilterQuery(options); const sql = `SELECT COUNT(*) as n FROM indexing_jobs ${where}`; const row = this.db.prepare(sql).get(...params); return row?.n ?? 0; } /** * Pause a job that is currently queued or running. * Returns true if the job was successfully paused, false otherwise. */ pauseJob(id: string): boolean { const job = this.getJob(id); if (!job) return false; // Only queued or running jobs can be paused if (job.status !== 'queued' && job.status !== 'running') { return false; } this.db.prepare(`UPDATE indexing_jobs SET status = 'paused' WHERE id = ?`).run(id); return true; } /** * Resume a paused job by changing its status back to 'queued' and * triggering the queue drain. * Returns true if the job was successfully resumed, false otherwise. */ resumeJob(id: string): boolean { const job = this.getJob(id); if (!job) return false; // Only paused jobs can be resumed if (job.status !== 'paused') { return false; } this.db.prepare(`UPDATE indexing_jobs SET status = 'queued' WHERE id = ?`).run(id); // Trigger queue processing in case the queue was idle this.drainQueued(); return true; } /** * Cancel a job if it's not already completed. * Returns true if the job was successfully cancelled, false otherwise. */ cancelJob(id: string): boolean { const job = this.getJob(id); if (!job) return false; // Can't cancel jobs that are already done or failed if (job.status === 'done' || job.status === 'failed') { return false; } const now = Math.floor(Date.now() / 1000); this.db .prepare( `UPDATE indexing_jobs SET status = 'cancelled', completed_at = ? WHERE id = ?` ) .run(now, id); return true; } }