feat(TRUEREF-0009): implement indexing pipeline and job queue

Implements the end-to-end indexing pipeline with a SQLite-backed job
queue, startup recovery, and REST API endpoints for job status.

- IndexingPipeline: orchestrates crawl → parse → atomic replace → embed
  → repo stats update with progress tracking at each stage
- JobQueue: sequential SQLite-backed queue (no external broker), deduplicates
  active jobs per repository, drains queued jobs on startup
- startup.ts: stale job recovery (running→failed), repo state reset, singleton
  initialization wired from hooks.server.ts
- GET /api/v1/jobs with repositoryId/status/limit filtering
- GET /api/v1/jobs/[id] single job lookup
- hooks.server.ts: initializes DB and pipeline on server start
- 18 unit tests covering queue, pipeline stages, recovery, and atomicity

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Giancarmine Salucci
2026-03-22 18:22:20 +01:00
parent bf4caf5e3b
commit 956b2a3a62
7 changed files with 1342 additions and 0 deletions

View File

@@ -0,0 +1,405 @@
/**
* IndexingPipeline — orchestrates the full crawl → parse → store → embed
* flow for a single repository indexing job (TRUEREF-0009).
*
* Atomicity guarantee:
* Old documents/snippets for changed files are deleted and replaced inside
* a single SQLite transaction. If anything fails after that transaction the
* already-committed data stays intact and the job is marked failed so
* callers can retry.
*
* Progress model:
* - Without embeddings: crawl+parse = 100 %
* - With embeddings : crawl+parse = 80 %, embeddings = 20 %
*/
import { createHash } from 'node:crypto';
import type Database from 'better-sqlite3';
import type { IndexingJob, NewDocument, NewSnippet, Repository } from '$lib/types';
import type { crawl as GithubCrawlFn } from '$lib/server/crawler/github.crawler.js';
import type { LocalCrawler } from '$lib/server/crawler/local.crawler.js';
import type { EmbeddingService } from '$lib/server/embeddings/embedding.service.js';
import { parseFile } from '$lib/server/parser/index.js';
import { computeTrustScore } from '$lib/server/search/trust-score.js';
// ---------------------------------------------------------------------------
// Progress calculation
// ---------------------------------------------------------------------------
function calculateProgress(
processedFiles: number,
totalFiles: number,
embeddingsDone: number,
embeddingsTotal: number,
hasEmbeddings: boolean
): number {
if (totalFiles === 0) return 0;
if (!hasEmbeddings) {
return Math.round((processedFiles / totalFiles) * 100);
}
const parseProgress = (processedFiles / totalFiles) * 80;
const embedProgress = embeddingsTotal > 0 ? (embeddingsDone / embeddingsTotal) * 20 : 0;
return Math.round(parseProgress + embedProgress);
}
// ---------------------------------------------------------------------------
// SHA-256 helper
// ---------------------------------------------------------------------------
function sha256(content: string): string {
return createHash('sha256').update(content, 'utf-8').digest('hex');
}
// ---------------------------------------------------------------------------
// IndexingPipeline
// ---------------------------------------------------------------------------
export class IndexingPipeline {
constructor(
private readonly db: Database.Database,
private readonly githubCrawl: typeof GithubCrawlFn,
private readonly localCrawler: LocalCrawler,
private readonly embeddingService: EmbeddingService | null
) {}
// -------------------------------------------------------------------------
// Public — run a job end to end
// -------------------------------------------------------------------------
async run(job: IndexingJob): Promise<void> {
// better-sqlite3 raw queries return snake_case keys; Drizzle types use camelCase.
// Accept both so the pipeline works when called from raw SQL contexts.
const raw = job as unknown as Record<string, unknown>;
const repositoryId = (job.repositoryId ?? raw['repository_id']) as string;
const versionId = (job.versionId ?? raw['version_id'] ?? null) as string | null;
// Rebuild a normalised job view for the rest of this method.
const normJob = { ...job, repositoryId, versionId };
this.updateJob(job.id, { status: 'running', startedAt: Math.floor(Date.now() / 1000) });
try {
const repo = this.getRepository(repositoryId);
if (!repo) throw new Error(`Repository ${repositoryId} not found`);
// Mark repo as actively indexing.
this.updateRepo(repo.id, { state: 'indexing' });
// ---- Stage 1: Crawl -------------------------------------------------
const crawlResult = await this.crawl(repo, normJob);
const totalFiles = crawlResult.totalFiles;
this.updateJob(job.id, { totalFiles });
// ---- Stage 2: Parse & diff ------------------------------------------
// Accumulate new documents/snippets; skip unchanged files.
const newDocuments: NewDocument[] = [];
const newSnippets: NewSnippet[] = [];
const changedDocIds: string[] = [];
let processedFiles = 0;
for (const file of crawlResult.files) {
const checksum = file.sha || sha256(file.content);
// Check whether an identical document already exists.
const existingDoc = this.db
.prepare<[string, string], { id: string; checksum: string }>(
`SELECT id, checksum FROM documents
WHERE repository_id = ? AND file_path = ? LIMIT 1`
)
.get(repo.id, file.path);
if (existingDoc && existingDoc.checksum === checksum) {
// File unchanged — reuse existing snippets, nothing to do.
processedFiles++;
const progress = calculateProgress(
processedFiles,
totalFiles,
0,
0,
this.embeddingService !== null
);
this.updateJob(job.id, { processedFiles, progress });
continue;
}
// File is new or changed — schedule old doc for deletion.
if (existingDoc) {
changedDocIds.push(existingDoc.id);
}
// Create new document record.
const documentId = crypto.randomUUID();
const now = new Date();
const newDoc: NewDocument = {
id: documentId,
repositoryId: repo.id,
versionId: normJob.versionId ?? null,
filePath: file.path,
title: null,
language: file.language,
tokenCount: 0,
checksum,
indexedAt: now
};
// Parse into snippets.
const snippets = parseFile(file, {
repositoryId: repo.id,
documentId,
versionId: normJob.versionId ?? undefined
});
// Update document token count from snippet totals.
const tokenCount = snippets.reduce((sum, s) => sum + (s.tokenCount ?? 0), 0);
newDoc.tokenCount = tokenCount;
newDocuments.push(newDoc);
newSnippets.push(...snippets);
processedFiles++;
const progress = calculateProgress(
processedFiles,
totalFiles,
0,
0,
this.embeddingService !== null
);
this.updateJob(job.id, { processedFiles, progress });
}
// ---- Stage 3: Atomic replacement ------------------------------------
this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
// ---- Stage 4: Embeddings (if provider is configured) ----------------
if (this.embeddingService && newSnippets.length > 0) {
const snippetIds = newSnippets.map((s) => s.id!);
const embeddingsTotal = snippetIds.length;
await this.embeddingService.embedSnippets(snippetIds, (done) => {
const progress = calculateProgress(
processedFiles,
totalFiles,
done,
embeddingsTotal,
true
);
this.updateJob(job.id, { progress });
});
}
// ---- Stage 5: Update repository stats --------------------------------
const stats = this.computeStats(repo.id);
const freshRepo = this.getRepository(repo.id)!;
const trustScore = computeTrustScore({
...freshRepo,
totalSnippets: stats.totalSnippets,
totalTokens: stats.totalTokens,
state: 'indexed'
});
this.updateRepo(repo.id, {
state: 'indexed',
totalSnippets: stats.totalSnippets,
totalTokens: stats.totalTokens,
trustScore,
lastIndexedAt: Math.floor(Date.now() / 1000)
});
this.updateJob(job.id, {
status: 'done',
progress: 100,
completedAt: Math.floor(Date.now() / 1000)
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(`[IndexingPipeline] Job ${job.id} failed: ${message}`);
this.updateJob(job.id, {
status: 'failed',
error: message,
completedAt: Math.floor(Date.now() / 1000)
});
// Restore repo to error state but preserve any existing indexed data.
this.updateRepo(repositoryId, { state: 'error' });
throw error;
}
}
// -------------------------------------------------------------------------
// Private — crawl
// -------------------------------------------------------------------------
private async crawl(
repo: Repository,
job: IndexingJob
): Promise<{ files: Array<{ path: string; content: string; sha: string; size: number; language: string }>; totalFiles: number }> {
if (repo.source === 'github') {
// Parse owner/repo from the canonical ID: "/owner/repo"
const parts = repo.id.replace(/^\//, '').split('/');
const owner = parts[0];
const repoName = parts[1];
if (!owner || !repoName) {
throw new Error(`Cannot parse GitHub owner/repo from id: ${repo.id}`);
}
const result = await this.githubCrawl({
owner,
repo: repoName,
ref: repo.branch ?? undefined,
token: repo.githubToken ?? undefined
});
return { files: result.files, totalFiles: result.totalFiles };
} else {
// Local filesystem crawl.
const result = await this.localCrawler.crawl({
rootPath: repo.sourceUrl,
ref: repo.branch !== 'main' ? (repo.branch ?? undefined) : undefined
});
return { files: result.files, totalFiles: result.totalFiles };
}
}
// -------------------------------------------------------------------------
// Private — atomic snippet replacement
// -------------------------------------------------------------------------
private replaceSnippets(
_repositoryId: string,
changedDocIds: string[],
newDocuments: NewDocument[],
newSnippets: NewSnippet[]
): void {
const insertDoc = this.db.prepare(
`INSERT INTO documents
(id, repository_id, version_id, file_path, title, language,
token_count, checksum, indexed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
const insertSnippet = this.db.prepare(
`INSERT INTO snippets
(id, document_id, repository_id, version_id, type, title,
content, language, breadcrumb, token_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
this.db.transaction(() => {
// Delete stale documents (cascade deletes their snippets via FK).
if (changedDocIds.length > 0) {
const placeholders = changedDocIds.map(() => '?').join(',');
this.db
.prepare(`DELETE FROM documents WHERE id IN (${placeholders})`)
.run(...changedDocIds);
}
// Insert new documents.
for (const doc of newDocuments) {
const indexedAtSeconds =
doc.indexedAt instanceof Date
? Math.floor(doc.indexedAt.getTime() / 1000)
: Math.floor(Date.now() / 1000);
insertDoc.run(
doc.id,
doc.repositoryId,
doc.versionId ?? null,
doc.filePath,
doc.title ?? null,
doc.language ?? null,
doc.tokenCount ?? 0,
doc.checksum,
indexedAtSeconds
);
}
// Insert new snippets.
for (const snippet of newSnippets) {
const createdAtSeconds =
snippet.createdAt instanceof Date
? Math.floor(snippet.createdAt.getTime() / 1000)
: Math.floor(Date.now() / 1000);
insertSnippet.run(
snippet.id,
snippet.documentId,
snippet.repositoryId,
snippet.versionId ?? null,
snippet.type,
snippet.title ?? null,
snippet.content,
snippet.language ?? null,
snippet.breadcrumb ?? null,
snippet.tokenCount ?? 0,
createdAtSeconds
);
}
})();
}
// -------------------------------------------------------------------------
// Private — stats
// -------------------------------------------------------------------------
private computeStats(repositoryId: string): { totalSnippets: number; totalTokens: number } {
const row = this.db
.prepare<[string], { total_snippets: number; total_tokens: number }>(
`SELECT COUNT(*) as total_snippets,
COALESCE(SUM(token_count), 0) as total_tokens
FROM snippets WHERE repository_id = ?`
)
.get(repositoryId);
return {
totalSnippets: row?.total_snippets ?? 0,
totalTokens: row?.total_tokens ?? 0
};
}
// -------------------------------------------------------------------------
// Private — DB helpers
// -------------------------------------------------------------------------
private getRepository(id: string): Repository | null {
return (
(this.db
.prepare<[string], Repository>(`SELECT * FROM repositories WHERE id = ?`)
.get(id) as Repository | undefined) ?? null
);
}
private updateJob(id: string, fields: Record<string, unknown>): void {
const sets = Object.keys(fields)
.map((k) => `${toSnake(k)} = ?`)
.join(', ');
const values = [...Object.values(fields), id];
this.db.prepare(`UPDATE indexing_jobs SET ${sets} WHERE id = ?`).run(...values);
}
private updateRepo(id: string, fields: Record<string, unknown>): void {
const now = Math.floor(Date.now() / 1000);
const allFields = { ...fields, updatedAt: now };
const sets = Object.keys(allFields)
.map((k) => `${toSnake(k)} = ?`)
.join(', ');
const values = [...Object.values(allFields), id];
this.db.prepare(`UPDATE repositories SET ${sets} WHERE id = ?`).run(...values);
}
}
// ---------------------------------------------------------------------------
// Utility
// ---------------------------------------------------------------------------
/** Convert camelCase to snake_case for DB column mapping. */
function toSnake(key: string): string {
return key.replace(/[A-Z]/g, (c) => `_${c.toLowerCase()}`);
}