From 956b2a3a6277040aa5f68441582ab32a8eb954d1 Mon Sep 17 00:00:00 2001 From: Giancarmine Salucci Date: Sun, 22 Mar 2026 18:22:20 +0100 Subject: [PATCH] feat(TRUEREF-0009): implement indexing pipeline and job queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the end-to-end indexing pipeline with a SQLite-backed job queue, startup recovery, and REST API endpoints for job status. - IndexingPipeline: orchestrates crawl → parse → atomic replace → embed → repo stats update with progress tracking at each stage - JobQueue: sequential SQLite-backed queue (no external broker), deduplicates active jobs per repository, drains queued jobs on startup - startup.ts: stale job recovery (running→failed), repo state reset, singleton initialization wired from hooks.server.ts - GET /api/v1/jobs with repositoryId/status/limit filtering - GET /api/v1/jobs/[id] single job lookup - hooks.server.ts: initializes DB and pipeline on server start - 18 unit tests covering queue, pipeline stages, recovery, and atomicity Co-Authored-By: Claude Sonnet 4.6 --- src/hooks.server.ts | 72 +++ .../server/pipeline/indexing.pipeline.test.ts | 460 ++++++++++++++++++ src/lib/server/pipeline/indexing.pipeline.ts | 405 +++++++++++++++ src/lib/server/pipeline/job-queue.ts | 203 ++++++++ src/lib/server/pipeline/startup.ts | 122 +++++ src/routes/api/v1/jobs/+server.ts | 46 ++ src/routes/api/v1/jobs/[id]/+server.ts | 34 ++ 7 files changed, 1342 insertions(+) create mode 100644 src/hooks.server.ts create mode 100644 src/lib/server/pipeline/indexing.pipeline.test.ts create mode 100644 src/lib/server/pipeline/indexing.pipeline.ts create mode 100644 src/lib/server/pipeline/job-queue.ts create mode 100644 src/lib/server/pipeline/startup.ts create mode 100644 src/routes/api/v1/jobs/+server.ts create mode 100644 src/routes/api/v1/jobs/[id]/+server.ts diff --git a/src/hooks.server.ts b/src/hooks.server.ts new file mode 100644 index 0000000..62e8b89 --- /dev/null +++ b/src/hooks.server.ts @@ -0,0 +1,72 @@ +/** + * SvelteKit server hooks (TRUEREF-0009). + * + * Runs once when the Node.js server process starts. Initialises the database + * and kicks off the indexing pipeline + job queue so queued jobs resume + * automatically after a restart. + */ + +import { initializeDatabase } from '$lib/server/db/index.js'; +import { getClient } from '$lib/server/db/client.js'; +import { initializePipeline } from '$lib/server/pipeline/startup.js'; +import { EMBEDDING_CONFIG_KEY, createProviderFromConfig, defaultEmbeddingConfig } from '$lib/server/embeddings/factory.js'; +import { EmbeddingService } from '$lib/server/embeddings/embedding.service.js'; +import type { EmbeddingConfig } from '$lib/server/embeddings/factory.js'; +import type { Handle } from '@sveltejs/kit'; + +// --------------------------------------------------------------------------- +// Server initialisation — runs once on startup +// --------------------------------------------------------------------------- + +try { + initializeDatabase(); + + const db = getClient(); + + // Load persisted embedding configuration (if any). + const configRow = db + .prepare<[string], { value: string }>(`SELECT value FROM settings WHERE key = ?`) + .get(EMBEDDING_CONFIG_KEY); + + let embeddingService: EmbeddingService | null = null; + + if (configRow) { + try { + const config: EmbeddingConfig = + typeof configRow.value === 'string' + ? JSON.parse(configRow.value) + : (configRow.value as EmbeddingConfig); + + if (config.provider !== 'none') { + const provider = createProviderFromConfig(config); + embeddingService = new EmbeddingService(db, provider); + } + } catch (err) { + console.warn( + `[hooks.server] Could not load embedding config: ${err instanceof Error ? err.message : String(err)}` + ); + } + } else { + // Use the default (noop) config so the pipeline is still wired up. + const config = defaultEmbeddingConfig(); + if (config.provider !== 'none') { + const provider = createProviderFromConfig(config); + embeddingService = new EmbeddingService(db, provider); + } + } + + initializePipeline(db, embeddingService); + console.log('[hooks.server] Indexing pipeline initialised.'); +} catch (err) { + console.error( + `[hooks.server] Failed to initialise server: ${err instanceof Error ? err.message : String(err)}` + ); +} + +// --------------------------------------------------------------------------- +// Request handler (pass-through) +// --------------------------------------------------------------------------- + +export const handle: Handle = async ({ event, resolve }) => { + return resolve(event); +}; diff --git a/src/lib/server/pipeline/indexing.pipeline.test.ts b/src/lib/server/pipeline/indexing.pipeline.test.ts new file mode 100644 index 0000000..4f6eae8 --- /dev/null +++ b/src/lib/server/pipeline/indexing.pipeline.test.ts @@ -0,0 +1,460 @@ +/** + * Unit tests for IndexingPipeline and JobQueue (TRUEREF-0009). + * + * Uses an in-memory SQLite database populated with the same migration SQL + * as the production database. + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import Database from 'better-sqlite3'; +import { readFileSync } from 'node:fs'; +import { join } from 'node:path'; +import { JobQueue } from './job-queue.js'; +import { IndexingPipeline } from './indexing.pipeline.js'; +import { recoverStaleJobs } from './startup.js'; + +// --------------------------------------------------------------------------- +// Test DB factory +// --------------------------------------------------------------------------- + +function createTestDb(): Database.Database { + const client = new Database(':memory:'); + client.pragma('foreign_keys = ON'); + + const migrationsFolder = join(import.meta.dirname, '../db/migrations'); + const migrationSql = readFileSync( + join(migrationsFolder, '0000_large_master_chief.sql'), + 'utf-8' + ); + + const statements = migrationSql + .split('--> statement-breakpoint') + .map((s) => s.trim()) + .filter(Boolean); + + for (const stmt of statements) { + client.exec(stmt); + } + + return client; +} + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +const now = Math.floor(Date.now() / 1000); + +function insertRepo( + db: Database.Database, + overrides: Partial> = {} +): void { + db.prepare( + `INSERT INTO repositories + (id, title, source, source_url, branch, state, + total_snippets, total_tokens, trust_score, benchmark_score, + stars, github_token, last_indexed_at, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ).run( + overrides.id ?? '/test/repo', + overrides.title ?? 'Test Repo', + overrides.source ?? 'local', + overrides.source_url ?? '/tmp/test-repo', + overrides.branch ?? 'main', + overrides.state ?? 'pending', + 0, 0, 0, 0, null, null, null, now, now + ); +} + +function insertJob( + db: Database.Database, + overrides: Partial> = {} +): string { + const id = crypto.randomUUID(); + db.prepare( + `INSERT INTO indexing_jobs + (id, repository_id, version_id, status, progress, + total_files, processed_files, error, started_at, completed_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ).run( + overrides.id ?? id, + overrides.repository_id ?? '/test/repo', + overrides.version_id ?? null, + overrides.status ?? 'queued', + overrides.progress ?? 0, + overrides.total_files ?? 0, + overrides.processed_files ?? 0, + overrides.error ?? null, + overrides.started_at ?? null, + overrides.completed_at ?? null, + overrides.created_at ?? now + ); + return (overrides.id as string) ?? id; +} + +// --------------------------------------------------------------------------- +// recoverStaleJobs +// --------------------------------------------------------------------------- + +describe('recoverStaleJobs', () => { + let db: Database.Database; + + beforeEach(() => { + db = createTestDb(); + insertRepo(db); + }); + + it('marks running jobs as failed', () => { + insertJob(db, { status: 'running' }); + recoverStaleJobs(db); + + const row = db + .prepare(`SELECT status, error FROM indexing_jobs LIMIT 1`) + .get() as { status: string; error: string }; + expect(row.status).toBe('failed'); + expect(row.error).toMatch(/restarted/i); + }); + + it('resets repositories in indexing state to error', () => { + db.prepare(`UPDATE repositories SET state = 'indexing' WHERE id = '/test/repo'`).run(); + recoverStaleJobs(db); + + const row = db + .prepare(`SELECT state FROM repositories WHERE id = '/test/repo'`) + .get() as { state: string }; + expect(row.state).toBe('error'); + }); + + it('leaves queued and done jobs untouched', () => { + insertJob(db, { status: 'queued' }); + insertJob(db, { status: 'done' }); + recoverStaleJobs(db); + + const rows = db + .prepare(`SELECT status FROM indexing_jobs WHERE status IN ('queued', 'done')`) + .all() as { status: string }[]; + expect(rows).toHaveLength(2); + }); +}); + +// --------------------------------------------------------------------------- +// JobQueue +// --------------------------------------------------------------------------- + +describe('JobQueue', () => { + let db: Database.Database; + let queue: JobQueue; + + beforeEach(() => { + db = createTestDb(); + insertRepo(db); + queue = new JobQueue(db); + }); + + it('enqueues a new job and returns it', () => { + const job = queue.enqueue('/test/repo'); + expect(job.status).toBe('queued'); + expect(job.repositoryId ?? (job as unknown as { repository_id: string }).repository_id).toBe( + '/test/repo' + ); + }); + + it('deduplicates: returns existing active job instead of creating a new one', () => { + const job1 = queue.enqueue('/test/repo'); + const job2 = queue.enqueue('/test/repo'); + expect(job1.id).toBe(job2.id); + + const count = ( + db.prepare(`SELECT COUNT(*) as n FROM indexing_jobs`).get() as { n: number } + ).n; + expect(count).toBe(1); + }); + + it('getJob returns null for unknown ID', () => { + expect(queue.getJob('non-existent')).toBeNull(); + }); + + it('getJob returns the job if it exists', () => { + const jobId = insertJob(db, { status: 'done' }); + const job = queue.getJob(jobId); + expect(job).not.toBeNull(); + expect(job!.id).toBe(jobId); + }); + + it('listJobs returns all jobs ordered by created_at desc', () => { + insertJob(db, { created_at: now - 10, status: 'done' }); + insertJob(db, { created_at: now - 5, status: 'done' }); + insertJob(db, { created_at: now, status: 'queued' }); + + const jobs = queue.listJobs(); + expect(jobs.length).toBeGreaterThanOrEqual(3); + // Most recent first. + expect(jobs[0].status).toBe('queued'); + }); + + it('listJobs filters by repositoryId', () => { + insertRepo(db, { id: '/other/repo', source_url: '/tmp/other' }); + insertJob(db, { repository_id: '/other/repo', status: 'done' }); + insertJob(db, { status: 'queued' }); + + const jobs = queue.listJobs({ repositoryId: '/other/repo' }); + expect(jobs).toHaveLength(1); + }); + + it('listJobs filters by status', () => { + insertJob(db, { status: 'queued' }); + insertJob(db, { status: 'done' }); + insertJob(db, { status: 'failed' }); + + const queued = queue.listJobs({ status: 'queued' }); + expect(queued.every((j) => j.status === 'queued')).toBe(true); + }); + + it('countJobs returns correct count', () => { + insertJob(db, { status: 'done' }); + insertJob(db, { status: 'done' }); + insertJob(db, { status: 'failed' }); + + expect(queue.countJobs()).toBe(3); + expect(queue.countJobs({ status: 'done' })).toBe(2); + expect(queue.countJobs({ status: 'failed' })).toBe(1); + }); +}); + +// --------------------------------------------------------------------------- +// IndexingPipeline +// --------------------------------------------------------------------------- + +describe('IndexingPipeline', () => { + let db: Database.Database; + + beforeEach(() => { + db = createTestDb(); + insertRepo(db, { source: 'local', source_url: '/tmp/test-repo' }); + }); + + function makePipeline( + crawlResult: { + files: Array<{ path: string; content: string; sha: string; language: string }>; + totalFiles: number; + } = { files: [], totalFiles: 0 } + ) { + const mockGithubCrawl = vi.fn().mockResolvedValue({ + ...crawlResult, + skippedFiles: 0, + branch: 'main', + commitSha: 'abc' + }); + + const mockLocalCrawler = { + crawl: vi.fn().mockResolvedValue({ + ...crawlResult, + skippedFiles: 0, + branch: 'main', + commitSha: 'abc' + }) + }; + + return new IndexingPipeline( + db, + mockGithubCrawl as never, + mockLocalCrawler as never, + null + ); + } + + function makeJob(repositoryId = '/test/repo') { + const jobId = insertJob(db, { repository_id: repositoryId, status: 'queued' }); + return db + .prepare(`SELECT * FROM indexing_jobs WHERE id = ?`) + .get(jobId) as { id: string; repositoryId?: string; repository_id?: string; status: string; versionId?: string; version_id?: string }; + } + + it('marks job as done when there are no files to index', async () => { + const pipeline = makePipeline({ files: [], totalFiles: 0 }); + const job = makeJob(); + + await pipeline.run(job as never); + + const updated = db + .prepare(`SELECT status, progress FROM indexing_jobs WHERE id = ?`) + .get(job.id) as { status: string; progress: number }; + expect(updated.status).toBe('done'); + expect(updated.progress).toBe(100); + }); + + it('marks job as running then done (final state is done)', async () => { + const pipeline = makePipeline({ files: [], totalFiles: 0 }); + const job = makeJob(); + + await pipeline.run(job as never); + + const updated = db + .prepare(`SELECT status FROM indexing_jobs WHERE id = ?`) + .get(job.id) as { status: string }; + // The job should end in 'done' — the running→done transition is covered + // by the pipeline's internal updateJob calls. + expect(updated.status).toBe('done'); + }); + + it('marks job as failed and repo as error when pipeline throws', async () => { + const errorCrawl = vi.fn().mockRejectedValue(new Error('crawl failed')); + const pipeline = new IndexingPipeline( + db, + errorCrawl as never, + { crawl: errorCrawl } as never, + null + ); + + const job = makeJob(); + + await expect(pipeline.run(job as never)).rejects.toThrow('crawl failed'); + + const updatedJob = db + .prepare(`SELECT status, error FROM indexing_jobs WHERE id = ?`) + .get(job.id) as { status: string; error: string }; + expect(updatedJob.status).toBe('failed'); + expect(updatedJob.error).toBe('crawl failed'); + + const updatedRepo = db + .prepare(`SELECT state FROM repositories WHERE id = '/test/repo'`) + .get() as { state: string }; + expect(updatedRepo.state).toBe('error'); + }); + + it('inserts documents and snippets for new files', async () => { + const files = [ + { + path: 'README.md', + content: '# Hello\n\nThis is documentation.', + sha: 'sha-readme', + language: 'markdown' + } + ]; + const pipeline = makePipeline({ files, totalFiles: 1 }); + const job = makeJob(); + + await pipeline.run(job as never); + + const docs = db.prepare(`SELECT * FROM documents`).all() as unknown[]; + expect(docs.length).toBeGreaterThan(0); + + const snippets = db.prepare(`SELECT * FROM snippets`).all() as unknown[]; + expect(snippets.length).toBeGreaterThan(0); + + const repo = db + .prepare(`SELECT state, total_snippets FROM repositories WHERE id = '/test/repo'`) + .get() as { state: string; total_snippets: number }; + expect(repo.state).toBe('indexed'); + expect(repo.total_snippets).toBeGreaterThan(0); + }); + + it('skips unchanged files (checksum match)', async () => { + // First indexing run. + const files = [ + { + path: 'README.md', + content: '# Hello\n\nThis is documentation.', + sha: 'sha-readme', + language: 'markdown' + } + ]; + const pipeline = makePipeline({ files, totalFiles: 1 }); + const job1 = makeJob(); + await pipeline.run(job1 as never); + + const firstDocCount = ( + db.prepare(`SELECT COUNT(*) as n FROM documents`).get() as { n: number } + ).n; + const firstSnippetIds = ( + db.prepare(`SELECT id FROM snippets`).all() as { id: string }[] + ).map((r) => r.id); + + // Second run with identical files. + const job2Id = insertJob(db, { repository_id: '/test/repo', status: 'queued' }); + const job2 = db + .prepare(`SELECT * FROM indexing_jobs WHERE id = ?`) + .get(job2Id) as never; + + await pipeline.run(job2); + + const secondDocCount = ( + db.prepare(`SELECT COUNT(*) as n FROM documents`).get() as { n: number } + ).n; + const secondSnippetIds = ( + db.prepare(`SELECT id FROM snippets`).all() as { id: string }[] + ).map((r) => r.id); + + // Document count stays the same and snippet IDs are unchanged. + expect(secondDocCount).toBe(firstDocCount); + expect(secondSnippetIds).toEqual(firstSnippetIds); + }); + + it('replaces snippets atomically when a file changes', async () => { + const pipeline1 = makePipeline({ + files: [ + { + path: 'README.md', + content: '# Original\n\nThis is the original version of the documentation with sufficient content.', + sha: 'sha-v1', + language: 'markdown' + } + ], + totalFiles: 1 + }); + const job1 = makeJob(); + await pipeline1.run(job1 as never); + + const originalSnippetCount = ( + db.prepare(`SELECT COUNT(*) as n FROM snippets`).get() as { n: number } + ).n; + expect(originalSnippetCount).toBeGreaterThan(0); + + // Second run with changed file content. + const pipeline2 = makePipeline({ + files: [ + { + path: 'README.md', + content: '# Updated\n\nThis is a completely different version of the documentation with new content.', + sha: 'sha-v2', + language: 'markdown' + } + ], + totalFiles: 1 + }); + const job2Id = insertJob(db, { repository_id: '/test/repo', status: 'queued' }); + const job2 = db + .prepare(`SELECT * FROM indexing_jobs WHERE id = ?`) + .get(job2Id) as never; + await pipeline2.run(job2); + + const finalDocCount = ( + db.prepare(`SELECT COUNT(*) as n FROM documents`).get() as { n: number } + ).n; + // Only one document should exist (the updated one). + expect(finalDocCount).toBe(1); + + const finalChecksum = ( + db.prepare(`SELECT checksum FROM documents LIMIT 1`).get() as { checksum: string } + ).checksum; + expect(finalChecksum).toBe('sha-v2'); + }); + + it('updates job progress as files are processed', async () => { + const files = Array.from({ length: 5 }, (_, i) => ({ + path: `file${i}.md`, + content: `# File ${i}\n\nContent ${i}.`, + sha: `sha-${i}`, + language: 'markdown' + })); + + const pipeline = makePipeline({ files, totalFiles: 5 }); + const job = makeJob(); + await pipeline.run(job as never); + + const updated = db + .prepare(`SELECT progress FROM indexing_jobs WHERE id = ?`) + .get(job.id) as { progress: number }; + expect(updated.progress).toBe(100); + }); +}); diff --git a/src/lib/server/pipeline/indexing.pipeline.ts b/src/lib/server/pipeline/indexing.pipeline.ts new file mode 100644 index 0000000..d9490f3 --- /dev/null +++ b/src/lib/server/pipeline/indexing.pipeline.ts @@ -0,0 +1,405 @@ +/** + * IndexingPipeline — orchestrates the full crawl → parse → store → embed + * flow for a single repository indexing job (TRUEREF-0009). + * + * Atomicity guarantee: + * Old documents/snippets for changed files are deleted and replaced inside + * a single SQLite transaction. If anything fails after that transaction the + * already-committed data stays intact and the job is marked failed so + * callers can retry. + * + * Progress model: + * - Without embeddings: crawl+parse = 100 % + * - With embeddings : crawl+parse = 80 %, embeddings = 20 % + */ + +import { createHash } from 'node:crypto'; +import type Database from 'better-sqlite3'; +import type { IndexingJob, NewDocument, NewSnippet, Repository } from '$lib/types'; +import type { crawl as GithubCrawlFn } from '$lib/server/crawler/github.crawler.js'; +import type { LocalCrawler } from '$lib/server/crawler/local.crawler.js'; +import type { EmbeddingService } from '$lib/server/embeddings/embedding.service.js'; +import { parseFile } from '$lib/server/parser/index.js'; +import { computeTrustScore } from '$lib/server/search/trust-score.js'; + +// --------------------------------------------------------------------------- +// Progress calculation +// --------------------------------------------------------------------------- + +function calculateProgress( + processedFiles: number, + totalFiles: number, + embeddingsDone: number, + embeddingsTotal: number, + hasEmbeddings: boolean +): number { + if (totalFiles === 0) return 0; + + if (!hasEmbeddings) { + return Math.round((processedFiles / totalFiles) * 100); + } + + const parseProgress = (processedFiles / totalFiles) * 80; + const embedProgress = embeddingsTotal > 0 ? (embeddingsDone / embeddingsTotal) * 20 : 0; + return Math.round(parseProgress + embedProgress); +} + +// --------------------------------------------------------------------------- +// SHA-256 helper +// --------------------------------------------------------------------------- + +function sha256(content: string): string { + return createHash('sha256').update(content, 'utf-8').digest('hex'); +} + +// --------------------------------------------------------------------------- +// IndexingPipeline +// --------------------------------------------------------------------------- + +export class IndexingPipeline { + constructor( + private readonly db: Database.Database, + private readonly githubCrawl: typeof GithubCrawlFn, + private readonly localCrawler: LocalCrawler, + private readonly embeddingService: EmbeddingService | null + ) {} + + // ------------------------------------------------------------------------- + // Public — run a job end to end + // ------------------------------------------------------------------------- + + async run(job: IndexingJob): Promise { + // better-sqlite3 raw queries return snake_case keys; Drizzle types use camelCase. + // Accept both so the pipeline works when called from raw SQL contexts. + const raw = job as unknown as Record; + const repositoryId = (job.repositoryId ?? raw['repository_id']) as string; + const versionId = (job.versionId ?? raw['version_id'] ?? null) as string | null; + + // Rebuild a normalised job view for the rest of this method. + const normJob = { ...job, repositoryId, versionId }; + + this.updateJob(job.id, { status: 'running', startedAt: Math.floor(Date.now() / 1000) }); + + try { + const repo = this.getRepository(repositoryId); + if (!repo) throw new Error(`Repository ${repositoryId} not found`); + + // Mark repo as actively indexing. + this.updateRepo(repo.id, { state: 'indexing' }); + + // ---- Stage 1: Crawl ------------------------------------------------- + const crawlResult = await this.crawl(repo, normJob); + const totalFiles = crawlResult.totalFiles; + + this.updateJob(job.id, { totalFiles }); + + // ---- Stage 2: Parse & diff ------------------------------------------ + // Accumulate new documents/snippets; skip unchanged files. + const newDocuments: NewDocument[] = []; + const newSnippets: NewSnippet[] = []; + const changedDocIds: string[] = []; + + let processedFiles = 0; + + for (const file of crawlResult.files) { + const checksum = file.sha || sha256(file.content); + + // Check whether an identical document already exists. + const existingDoc = this.db + .prepare<[string, string], { id: string; checksum: string }>( + `SELECT id, checksum FROM documents + WHERE repository_id = ? AND file_path = ? LIMIT 1` + ) + .get(repo.id, file.path); + + if (existingDoc && existingDoc.checksum === checksum) { + // File unchanged — reuse existing snippets, nothing to do. + processedFiles++; + const progress = calculateProgress( + processedFiles, + totalFiles, + 0, + 0, + this.embeddingService !== null + ); + this.updateJob(job.id, { processedFiles, progress }); + continue; + } + + // File is new or changed — schedule old doc for deletion. + if (existingDoc) { + changedDocIds.push(existingDoc.id); + } + + // Create new document record. + const documentId = crypto.randomUUID(); + const now = new Date(); + const newDoc: NewDocument = { + id: documentId, + repositoryId: repo.id, + versionId: normJob.versionId ?? null, + filePath: file.path, + title: null, + language: file.language, + tokenCount: 0, + checksum, + indexedAt: now + }; + + // Parse into snippets. + const snippets = parseFile(file, { + repositoryId: repo.id, + documentId, + versionId: normJob.versionId ?? undefined + }); + + // Update document token count from snippet totals. + const tokenCount = snippets.reduce((sum, s) => sum + (s.tokenCount ?? 0), 0); + newDoc.tokenCount = tokenCount; + + newDocuments.push(newDoc); + newSnippets.push(...snippets); + + processedFiles++; + const progress = calculateProgress( + processedFiles, + totalFiles, + 0, + 0, + this.embeddingService !== null + ); + this.updateJob(job.id, { processedFiles, progress }); + } + + // ---- Stage 3: Atomic replacement ------------------------------------ + this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets); + + // ---- Stage 4: Embeddings (if provider is configured) ---------------- + if (this.embeddingService && newSnippets.length > 0) { + const snippetIds = newSnippets.map((s) => s.id!); + const embeddingsTotal = snippetIds.length; + + await this.embeddingService.embedSnippets(snippetIds, (done) => { + const progress = calculateProgress( + processedFiles, + totalFiles, + done, + embeddingsTotal, + true + ); + this.updateJob(job.id, { progress }); + }); + } + + // ---- Stage 5: Update repository stats -------------------------------- + const stats = this.computeStats(repo.id); + const freshRepo = this.getRepository(repo.id)!; + const trustScore = computeTrustScore({ + ...freshRepo, + totalSnippets: stats.totalSnippets, + totalTokens: stats.totalTokens, + state: 'indexed' + }); + + this.updateRepo(repo.id, { + state: 'indexed', + totalSnippets: stats.totalSnippets, + totalTokens: stats.totalTokens, + trustScore, + lastIndexedAt: Math.floor(Date.now() / 1000) + }); + + this.updateJob(job.id, { + status: 'done', + progress: 100, + completedAt: Math.floor(Date.now() / 1000) + }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error(`[IndexingPipeline] Job ${job.id} failed: ${message}`); + + this.updateJob(job.id, { + status: 'failed', + error: message, + completedAt: Math.floor(Date.now() / 1000) + }); + + // Restore repo to error state but preserve any existing indexed data. + this.updateRepo(repositoryId, { state: 'error' }); + + throw error; + } + } + + // ------------------------------------------------------------------------- + // Private — crawl + // ------------------------------------------------------------------------- + + private async crawl( + repo: Repository, + job: IndexingJob + ): Promise<{ files: Array<{ path: string; content: string; sha: string; size: number; language: string }>; totalFiles: number }> { + if (repo.source === 'github') { + // Parse owner/repo from the canonical ID: "/owner/repo" + const parts = repo.id.replace(/^\//, '').split('/'); + const owner = parts[0]; + const repoName = parts[1]; + + if (!owner || !repoName) { + throw new Error(`Cannot parse GitHub owner/repo from id: ${repo.id}`); + } + + const result = await this.githubCrawl({ + owner, + repo: repoName, + ref: repo.branch ?? undefined, + token: repo.githubToken ?? undefined + }); + + return { files: result.files, totalFiles: result.totalFiles }; + } else { + // Local filesystem crawl. + const result = await this.localCrawler.crawl({ + rootPath: repo.sourceUrl, + ref: repo.branch !== 'main' ? (repo.branch ?? undefined) : undefined + }); + + return { files: result.files, totalFiles: result.totalFiles }; + } + } + + // ------------------------------------------------------------------------- + // Private — atomic snippet replacement + // ------------------------------------------------------------------------- + + private replaceSnippets( + _repositoryId: string, + changedDocIds: string[], + newDocuments: NewDocument[], + newSnippets: NewSnippet[] + ): void { + const insertDoc = this.db.prepare( + `INSERT INTO documents + (id, repository_id, version_id, file_path, title, language, + token_count, checksum, indexed_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)` + ); + + const insertSnippet = this.db.prepare( + `INSERT INTO snippets + (id, document_id, repository_id, version_id, type, title, + content, language, breadcrumb, token_count, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ); + + this.db.transaction(() => { + // Delete stale documents (cascade deletes their snippets via FK). + if (changedDocIds.length > 0) { + const placeholders = changedDocIds.map(() => '?').join(','); + this.db + .prepare(`DELETE FROM documents WHERE id IN (${placeholders})`) + .run(...changedDocIds); + } + + // Insert new documents. + for (const doc of newDocuments) { + const indexedAtSeconds = + doc.indexedAt instanceof Date + ? Math.floor(doc.indexedAt.getTime() / 1000) + : Math.floor(Date.now() / 1000); + + insertDoc.run( + doc.id, + doc.repositoryId, + doc.versionId ?? null, + doc.filePath, + doc.title ?? null, + doc.language ?? null, + doc.tokenCount ?? 0, + doc.checksum, + indexedAtSeconds + ); + } + + // Insert new snippets. + for (const snippet of newSnippets) { + const createdAtSeconds = + snippet.createdAt instanceof Date + ? Math.floor(snippet.createdAt.getTime() / 1000) + : Math.floor(Date.now() / 1000); + + insertSnippet.run( + snippet.id, + snippet.documentId, + snippet.repositoryId, + snippet.versionId ?? null, + snippet.type, + snippet.title ?? null, + snippet.content, + snippet.language ?? null, + snippet.breadcrumb ?? null, + snippet.tokenCount ?? 0, + createdAtSeconds + ); + } + })(); + } + + // ------------------------------------------------------------------------- + // Private — stats + // ------------------------------------------------------------------------- + + private computeStats(repositoryId: string): { totalSnippets: number; totalTokens: number } { + const row = this.db + .prepare<[string], { total_snippets: number; total_tokens: number }>( + `SELECT COUNT(*) as total_snippets, + COALESCE(SUM(token_count), 0) as total_tokens + FROM snippets WHERE repository_id = ?` + ) + .get(repositoryId); + + return { + totalSnippets: row?.total_snippets ?? 0, + totalTokens: row?.total_tokens ?? 0 + }; + } + + // ------------------------------------------------------------------------- + // Private — DB helpers + // ------------------------------------------------------------------------- + + private getRepository(id: string): Repository | null { + return ( + (this.db + .prepare<[string], Repository>(`SELECT * FROM repositories WHERE id = ?`) + .get(id) as Repository | undefined) ?? null + ); + } + + private updateJob(id: string, fields: Record): void { + const sets = Object.keys(fields) + .map((k) => `${toSnake(k)} = ?`) + .join(', '); + const values = [...Object.values(fields), id]; + this.db.prepare(`UPDATE indexing_jobs SET ${sets} WHERE id = ?`).run(...values); + } + + private updateRepo(id: string, fields: Record): void { + const now = Math.floor(Date.now() / 1000); + const allFields = { ...fields, updatedAt: now }; + const sets = Object.keys(allFields) + .map((k) => `${toSnake(k)} = ?`) + .join(', '); + const values = [...Object.values(allFields), id]; + this.db.prepare(`UPDATE repositories SET ${sets} WHERE id = ?`).run(...values); + } +} + +// --------------------------------------------------------------------------- +// Utility +// --------------------------------------------------------------------------- + +/** Convert camelCase to snake_case for DB column mapping. */ +function toSnake(key: string): string { + return key.replace(/[A-Z]/g, (c) => `_${c.toLowerCase()}`); +} diff --git a/src/lib/server/pipeline/job-queue.ts b/src/lib/server/pipeline/job-queue.ts new file mode 100644 index 0000000..f9e659b --- /dev/null +++ b/src/lib/server/pipeline/job-queue.ts @@ -0,0 +1,203 @@ +/** + * SQLite-backed job queue for indexing jobs (TRUEREF-0009). + * + * Jobs are processed sequentially (one at a time) to avoid SQLite write + * contention. The queue uses setImmediate to yield to the event loop between + * jobs so that API requests remain responsive. + */ + +import type Database from 'better-sqlite3'; +import type { IndexingJob, NewIndexingJob } from '$lib/types'; +import type { IndexingPipeline } from './indexing.pipeline.js'; + +export class JobQueue { + private isRunning = false; + private pipeline: IndexingPipeline | null = null; + + constructor(private readonly db: Database.Database) {} + + /** + * Inject the pipeline dependency (avoids circular construction order). + */ + setPipeline(pipeline: IndexingPipeline): void { + this.pipeline = pipeline; + } + + /** + * Enqueue a new indexing job for the given repository. + * If a job for this repository is already queued or running, returns the + * existing job instead of creating a duplicate. + */ + enqueue(repositoryId: string, versionId?: string): IndexingJob { + // Return early if there's already an active job for this repo. + const active = this.db + .prepare<[string], IndexingJob>( + `SELECT * FROM indexing_jobs + WHERE repository_id = ? AND status IN ('queued', 'running') + ORDER BY created_at DESC LIMIT 1` + ) + .get(repositoryId); + + if (active) return active; + + const now = Math.floor(Date.now() / 1000); + const job: NewIndexingJob = { + id: crypto.randomUUID(), + repositoryId, + versionId: versionId ?? null, + status: 'queued', + progress: 0, + totalFiles: 0, + processedFiles: 0, + error: null, + startedAt: null, + completedAt: null, + createdAt: new Date(now * 1000) + }; + + this.db + .prepare( + `INSERT INTO indexing_jobs + (id, repository_id, version_id, status, progress, total_files, + processed_files, error, started_at, completed_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + ) + .run( + job.id, + job.repositoryId, + job.versionId, + job.status, + job.progress, + job.totalFiles, + job.processedFiles, + job.error, + job.startedAt, + job.completedAt, + now + ); + + // Kick off sequential processing if not already running. + if (!this.isRunning) { + setImmediate(() => this.processNext()); + } + + return this.db + .prepare<[string], IndexingJob>(`SELECT * FROM indexing_jobs WHERE id = ?`) + .get(job.id)!; + } + + /** + * Pick the oldest queued job and run it through the pipeline. + * Called recursively via setImmediate so the event loop stays unblocked. + */ + private async processNext(): Promise { + if (this.isRunning) return; + if (!this.pipeline) { + console.warn('[JobQueue] No pipeline configured — cannot process jobs.'); + return; + } + + const job = this.db + .prepare<[], IndexingJob>( + `SELECT * FROM indexing_jobs + WHERE status = 'queued' + ORDER BY created_at ASC LIMIT 1` + ) + .get(); + + if (!job) return; + + this.isRunning = true; + try { + await this.pipeline.run(job); + } catch (err) { + // Error is logged inside pipeline.run(); no action needed here. + console.error( + `[JobQueue] Job ${job.id} failed: ${err instanceof Error ? err.message : String(err)}` + ); + } finally { + this.isRunning = false; + + // Check whether another job was queued while this one ran. + const next = this.db + .prepare<[], { id: string }>( + `SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1` + ) + .get(); + if (next) { + setImmediate(() => this.processNext()); + } + } + } + + /** + * Retrieve a single job by ID. + */ + getJob(id: string): IndexingJob | null { + return ( + this.db + .prepare<[string], IndexingJob>(`SELECT * FROM indexing_jobs WHERE id = ?`) + .get(id) ?? null + ); + } + + /** + * List recent jobs, optionally filtered by repository and/or status. + */ + listJobs(options?: { + repositoryId?: string; + status?: IndexingJob['status']; + limit?: number; + }): IndexingJob[] { + const limit = Math.min(options?.limit ?? 20, 200); + const conditions: string[] = []; + const params: unknown[] = []; + + if (options?.repositoryId) { + conditions.push('repository_id = ?'); + params.push(options.repositoryId); + } + if (options?.status) { + conditions.push('status = ?'); + params.push(options.status); + } + + const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; + const sql = `SELECT * FROM indexing_jobs ${where} ORDER BY created_at DESC LIMIT ?`; + params.push(limit); + + return this.db.prepare(sql).all(...params); + } + + /** + * Trigger processing of any queued jobs (e.g. after server restart). + * Safe to call multiple times; a no-op if the queue is already running. + */ + drainQueued(): void { + if (!this.isRunning) { + setImmediate(() => this.processNext()); + } + } + + /** + * Count all jobs matching optional filters. + */ + countJobs(options?: { repositoryId?: string; status?: IndexingJob['status'] }): number { + const conditions: string[] = []; + const params: unknown[] = []; + + if (options?.repositoryId) { + conditions.push('repository_id = ?'); + params.push(options.repositoryId); + } + if (options?.status) { + conditions.push('status = ?'); + params.push(options.status); + } + + const where = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; + const sql = `SELECT COUNT(*) as n FROM indexing_jobs ${where}`; + const row = this.db.prepare(sql).get(...params); + return row?.n ?? 0; + } +} diff --git a/src/lib/server/pipeline/startup.ts b/src/lib/server/pipeline/startup.ts new file mode 100644 index 0000000..11e22a6 --- /dev/null +++ b/src/lib/server/pipeline/startup.ts @@ -0,0 +1,122 @@ +/** + * Server startup routines for the indexing pipeline (TRUEREF-0009). + * + * On every server start: + * 1. Mark any jobs that were left in 'running' state as 'failed' + * (they were interrupted by a process crash / restart). + * 2. Reset any repositories stuck in 'indexing' state to 'error'. + * 3. Construct and wire together the JobQueue + IndexingPipeline singletons. + * 4. Kick off processing of any jobs that were 'queued' before the restart. + */ + +import type Database from 'better-sqlite3'; +import type { EmbeddingService } from '$lib/server/embeddings/embedding.service.js'; +import { crawl as githubCrawl } from '$lib/server/crawler/github.crawler.js'; +import { LocalCrawler } from '$lib/server/crawler/local.crawler.js'; +import { IndexingPipeline } from './indexing.pipeline.js'; +import { JobQueue } from './job-queue.js'; + +// --------------------------------------------------------------------------- +// Stale-job recovery +// --------------------------------------------------------------------------- + +/** + * Mark jobs that were `running` when the server crashed as `failed`, and + * reset repositories that were stuck in `indexing` state to `error`. + * + * Safe to call on every startup — uses unixepoch() so timestamps stay in + * the same integer-seconds format as the rest of the schema. + */ +export function recoverStaleJobs(db: Database.Database): void { + db.prepare( + `UPDATE indexing_jobs + SET status = 'failed', + error = 'Server restarted while job was running', + completed_at = unixepoch() + WHERE status = 'running'` + ).run(); + + db.prepare( + `UPDATE repositories + SET state = 'error' + WHERE state = 'indexing'` + ).run(); +} + +// --------------------------------------------------------------------------- +// Singleton instances +// --------------------------------------------------------------------------- + +let _queue: JobQueue | null = null; +let _pipeline: IndexingPipeline | null = null; + +/** + * Initialise (or return the existing) JobQueue + IndexingPipeline pair. + * + * Must be called once at server startup (e.g. from `hooks.server.ts`). + * Calling it more than once is harmless — subsequent calls are no-ops that + * return the already-constructed singletons. + * + * @param db - Raw better-sqlite3 Database instance. + * @param embeddingService - Optional embedding service; pass null to disable. + * @returns An object with `queue` and `pipeline` accessors. + */ +export function initializePipeline( + db: Database.Database, + embeddingService: EmbeddingService | null = null +): { queue: JobQueue; pipeline: IndexingPipeline } { + if (_queue && _pipeline) { + return { queue: _queue, pipeline: _pipeline }; + } + + // Recover before constructing so no stale job gets picked up. + recoverStaleJobs(db); + + const localCrawler = new LocalCrawler(); + const pipeline = new IndexingPipeline(db, githubCrawl, localCrawler, embeddingService); + const queue = new JobQueue(db); + + queue.setPipeline(pipeline); + + _queue = queue; + _pipeline = pipeline; + + // Drain any jobs that were queued before the restart. + setImmediate(() => { + const pending = db + .prepare<[], { id: string }>(`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`) + .get(); + if (pending) { + // Re-enqueue logic is handled inside JobQueue.processNext; we trigger + // it by asking the queue for any job that is already queued. + // The simplest way is to call enqueue on a repo that has a queued job — + // but since enqueue deduplicates, we just trigger processNext directly. + // We do this via a public helper to avoid exposing private methods. + queue.drainQueued(); + } + }); + + return { queue, pipeline }; +} + +/** + * Return the current JobQueue singleton, or null if not yet initialised. + */ +export function getQueue(): JobQueue | null { + return _queue; +} + +/** + * Return the current IndexingPipeline singleton, or null if not yet initialised. + */ +export function getPipeline(): IndexingPipeline | null { + return _pipeline; +} + +/** + * Reset singletons — intended for use in tests only. + */ +export function _resetSingletons(): void { + _queue = null; + _pipeline = null; +} diff --git a/src/routes/api/v1/jobs/+server.ts b/src/routes/api/v1/jobs/+server.ts new file mode 100644 index 0000000..61ade03 --- /dev/null +++ b/src/routes/api/v1/jobs/+server.ts @@ -0,0 +1,46 @@ +/** + * GET /api/v1/jobs — list recent indexing jobs. + * + * Query parameters: + * repositoryId (optional) — filter by repository + * status (optional) — filter by status: queued|running|done|failed + * limit (optional, default 20, max 200) + */ + +import { json } from '@sveltejs/kit'; +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { JobQueue } from '$lib/server/pipeline/job-queue.js'; +import { handleServiceError } from '$lib/server/utils/validation.js'; +import type { IndexingJob } from '$lib/types'; + +export const GET: RequestHandler = ({ url }) => { + try { + const db = getClient(); + const queue = new JobQueue(db); + + const repositoryId = url.searchParams.get('repositoryId') ?? undefined; + const status = (url.searchParams.get('status') ?? undefined) as + | IndexingJob['status'] + | undefined; + const limit = Math.min(parseInt(url.searchParams.get('limit') ?? '20', 10) || 20, 200); + + const jobs = queue.listJobs({ repositoryId, status, limit }); + const total = queue.countJobs({ repositoryId, status }); + + return json({ jobs, total }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization' + } + }); +}; diff --git a/src/routes/api/v1/jobs/[id]/+server.ts b/src/routes/api/v1/jobs/[id]/+server.ts new file mode 100644 index 0000000..6ec9d8d --- /dev/null +++ b/src/routes/api/v1/jobs/[id]/+server.ts @@ -0,0 +1,34 @@ +/** + * GET /api/v1/jobs/:id — retrieve a single indexing job by ID. + */ + +import { json } from '@sveltejs/kit'; +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { JobQueue } from '$lib/server/pipeline/job-queue.js'; +import { handleServiceError, NotFoundError } from '$lib/server/utils/validation.js'; + +export const GET: RequestHandler = ({ params }) => { + try { + const db = getClient(); + const queue = new JobQueue(db); + + const job = queue.getJob(params.id); + if (!job) throw new NotFoundError(`Job ${params.id} not found`); + + return json({ job }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization' + } + }); +};