Files
trueref-legacy/src/lib/server/pipeline/indexing.pipeline.ts

722 lines
25 KiB
TypeScript

/**
* IndexingPipeline — orchestrates the full crawl → parse → store → embed
* flow for a single repository indexing job (TRUEREF-0009).
*
* Atomicity guarantee:
* Old documents/snippets for changed files are deleted and replaced inside
* a single SQLite transaction. If anything fails after that transaction the
* already-committed data stays intact and the job is marked failed so
* callers can retry.
*
* Progress model:
* - Without embeddings: crawl+parse = 100 %
* - With embeddings : crawl+parse = 80 %, embeddings = 20 %
*/
import { createHash, randomUUID } from 'node:crypto';
import type Database from 'better-sqlite3';
import type { Document, NewDocument, NewSnippet, TrueRefConfig, IndexingStage } from '$lib/types';
import type { crawl as GithubCrawlFn } from '$lib/server/crawler/github.crawler.js';
import type { LocalCrawler } from '$lib/server/crawler/local.crawler.js';
import type { EmbeddingService } from '$lib/server/embeddings/embedding.service.js';
import { RepositoryMapper } from '$lib/server/mappers/repository.mapper.js';
import { IndexingJob } from '$lib/server/models/indexing-job.js';
import { Repository, RepositoryEntity } from '$lib/server/models/repository.js';
import { SqliteVecStore } from '$lib/server/search/sqlite-vec.store.js';
import { resolveConfig, type ParsedConfig } from '$lib/server/config/config-parser.js';
import { parseFile } from '$lib/server/parser/index.js';
import { computeTrustScore } from '$lib/server/search/trust-score.js';
import { computeDiff } from './diff.js';
import { buildDifferentialPlan, type DifferentialPlan } from './differential-strategy.js';
import {
cloneFromAncestor as cloneFromAncestorInDatabase,
replaceSnippets as replaceSnippetsInDatabase,
updateRepo as updateRepoInDatabase,
updateVersion as updateVersionInDatabase,
type CloneFromAncestorRequest
} from './write-operations.js';
import type { SerializedFields } from './worker-types.js';
// ---------------------------------------------------------------------------
// Progress calculation
// ---------------------------------------------------------------------------
function calculateProgress(
processedFiles: number,
totalFiles: number,
embeddingsDone: number,
embeddingsTotal: number,
hasEmbeddings: boolean
): number {
if (totalFiles === 0) return 0;
if (!hasEmbeddings) {
return Math.round((processedFiles / totalFiles) * 100);
}
const parseProgress = (processedFiles / totalFiles) * 80;
const embedProgress = embeddingsTotal > 0 ? (embeddingsDone / embeddingsTotal) * 20 : 0;
return Math.round(parseProgress + embedProgress);
}
// ---------------------------------------------------------------------------
// SHA-256 helper
// ---------------------------------------------------------------------------
function sha256(content: string): string {
return createHash('sha256').update(content, 'utf-8').digest('hex');
}
// ---------------------------------------------------------------------------
// IndexingPipeline
// ---------------------------------------------------------------------------
export class IndexingPipeline {
private readonly sqliteVecStore: SqliteVecStore;
constructor(
private readonly db: Database.Database,
private readonly githubCrawl: typeof GithubCrawlFn,
private readonly localCrawler: LocalCrawler,
private readonly embeddingService: EmbeddingService | null,
private readonly writeDelegate?: {
persistJobUpdates?: boolean;
replaceSnippets?: (
changedDocIds: string[],
newDocuments: NewDocument[],
newSnippets: NewSnippet[]
) => Promise<void>;
cloneFromAncestor?: (request: CloneFromAncestorRequest) => Promise<void>;
updateRepo?: (repositoryId: string, fields: SerializedFields) => Promise<void>;
updateVersion?: (versionId: string, fields: SerializedFields) => Promise<void>;
upsertRepoConfig?: (
repositoryId: string,
versionId: string | null,
rules: string[]
) => Promise<void>;
}
) {
this.sqliteVecStore = new SqliteVecStore(db);
}
// -------------------------------------------------------------------------
// Public — run a job end to end
// -------------------------------------------------------------------------
async run(
job: IndexingJob,
onStageChange?: (
stage: IndexingStage,
detail?: string,
progress?: number,
processedFiles?: number,
totalFiles?: number
) => void
): Promise<void> {
// better-sqlite3 raw queries return snake_case keys; Drizzle types use camelCase.
// Accept both so the pipeline works when called from raw SQL contexts.
const raw = job as unknown as Record<string, unknown>;
const repositoryId = (job.repositoryId ?? raw['repository_id']) as string;
const versionId = (job.versionId ?? raw['version_id'] ?? null) as string | null;
// Rebuild a normalised job view for the rest of this method.
const normJob = { ...job, repositoryId, versionId };
// Helper to report stage transitions and invoke optional callback.
const reportStage = (
stage: IndexingStage,
detail?: string,
progress?: number,
processed?: number,
total?: number
) => {
this.updateJob(job.id, { stage, stageDetail: detail ?? null });
onStageChange?.(stage, detail, progress, processed, total);
};
this.updateJob(job.id, { status: 'running', startedAt: Math.floor(Date.now() / 1000) });
try {
const repo = this.getRepository(repositoryId);
if (!repo) throw new Error(`Repository ${repositoryId} not found`);
// Mark repo as actively indexing.
await this.updateRepo(repo.id, { state: 'indexing' });
if (normJob.versionId) {
await this.updateVersion(normJob.versionId, { state: 'indexing' });
}
const versionTag = normJob.versionId ? this.getVersionTag(normJob.versionId) : undefined;
// ---- Stage 0: Differential strategy (TRUEREF-0021) ----------------------
// When indexing a tagged version, check if we can inherit unchanged files
// from an already-indexed ancestor version instead of crawling everything.
let differentialPlan: DifferentialPlan | null = null;
if (normJob.versionId && versionTag) {
reportStage('differential');
differentialPlan = await buildDifferentialPlan({
repo,
targetTag: versionTag,
db: this.db
}).catch((err) => {
console.warn(
`[IndexingPipeline] Differential plan failed, falling back to full crawl: ${err instanceof Error ? err.message : String(err)}`
);
return null;
});
}
// If a differential plan exists, clone unchanged files from ancestor.
if (differentialPlan && differentialPlan.unchangedPaths.size > 0) {
reportStage('cloning');
await this.cloneFromAncestor({
ancestorVersionId: differentialPlan.ancestorVersionId,
targetVersionId: normJob.versionId!,
repositoryId: repo.id,
unchangedPaths: [...differentialPlan.unchangedPaths]
});
console.info(
`[IndexingPipeline] Differential indexing: cloned ${differentialPlan.unchangedPaths.size} unchanged files from ${differentialPlan.ancestorTag}`
);
}
// ---- Stage 1: Crawl -------------------------------------------------
// Pass changedPaths as allowlist so crawl only fetches/returns changed files.
reportStage('crawling');
const crawlAllowedPaths = differentialPlan ? differentialPlan.changedPaths : undefined;
const crawlResult = await this.crawl(repo, versionTag, crawlAllowedPaths);
// Resolve trueref.json / context7.json configuration.
// Prefer the pre-parsed config carried in the CrawlResult (set by
// LocalCrawler so it is available even when a `folders` allowlist
// excludes the repo root and trueref.json never appears in files[]).
// Fall back to locating the file in crawlResult.files for GitHub crawls
// which do not yet populate CrawlResult.config.
let parsedConfig: ReturnType<typeof resolveConfig> | null = null;
if (crawlResult.config) {
// Config was pre-parsed by the crawler — wrap it in a ParsedConfig
// shell so the rest of the pipeline can use it uniformly.
parsedConfig = {
config: crawlResult.config,
source: 'trueref.json',
warnings: []
} satisfies ParsedConfig;
} else {
const configFile = crawlResult.files.find(
(f) => f.path === 'trueref.json' || f.path === 'context7.json'
);
parsedConfig = configFile
? resolveConfig([{ filename: configFile.path, content: configFile.content }])
: null;
}
const excludeFiles: string[] = parsedConfig?.config.excludeFiles ?? [];
// Filter out excluded files before diff computation.
const filteredFiles =
excludeFiles.length > 0
? crawlResult.files.filter(
(f) =>
!excludeFiles.some((pattern) =>
IndexingPipeline.matchesExcludePattern(f.path, pattern)
)
)
: crawlResult.files;
const totalFiles = filteredFiles.length;
this.updateJob(job.id, { totalFiles });
// ---- Stage 2: Parse & diff ------------------------------------------
// Load all existing documents for this repo so computeDiff can
// classify every crawled file and detect deletions.
const existingDocs = this.getExistingDocuments(repo.id, normJob.versionId);
// Exclude files that were cloned from the ancestor — they are not candidates
// for deletion or re-processing (computeDiff must not see them in existingDocs).
const clonedPaths = differentialPlan?.unchangedPaths ?? new Set<string>();
const existingDocsForDiff =
clonedPaths.size > 0
? existingDocs.filter((d) => !clonedPaths.has(d.filePath))
: existingDocs;
const diff = computeDiff(filteredFiles, existingDocsForDiff);
// Accumulate new documents/snippets; skip unchanged files.
const newDocuments: NewDocument[] = [];
const newSnippets: NewSnippet[] = [];
const changedDocIds: string[] = [];
// Schedule stale documents (modified + deleted) for deletion.
for (const file of diff.modified) {
const existing = existingDocsForDiff.find((d) => d.filePath === file.path);
if (existing) changedDocIds.push(existing.id);
}
for (const filePath of diff.deleted) {
const existing = existingDocsForDiff.find((d) => d.filePath === filePath);
if (existing) changedDocIds.push(existing.id);
}
// Only parse and embed files that are new or have changed.
const filesToProcess = [...diff.added, ...diff.modified];
let processedFiles = diff.unchanged.length; // unchanged files count as processed
// Report unchanged files as already processed so the progress bar
// immediately reflects real work done (especially on incremental re-index
// where most or all files are unchanged).
if (processedFiles > 0) {
const initialProgress = calculateProgress(
processedFiles,
totalFiles,
0,
0,
this.embeddingService !== null
);
this.updateJob(job.id, { processedFiles, progress: initialProgress });
}
// Yield the event loop and flush progress every N files.
// Lower = more responsive UI; higher = less overhead.
const YIELD_EVERY = 20;
reportStage('parsing', `0 / ${totalFiles} files`);
for (const [i, file] of filesToProcess.entries()) {
// Yield the Node.js event loop periodically so the HTTP server can
// handle incoming requests (navigation, polling) between file parses.
// Without this, the synchronous parse + SQLite work blocks the thread
// entirely and the UI becomes unresponsive during indexing.
if (i > 0 && i % YIELD_EVERY === 0) {
await new Promise<void>((resolve) => setImmediate(resolve));
}
const checksum = file.sha || sha256(file.content);
// Create new document record.
const documentId = randomUUID();
const now = new Date();
const newDoc: NewDocument = {
id: documentId,
repositoryId: repo.id,
versionId: normJob.versionId ?? null,
filePath: file.path,
title: null,
language: file.language,
tokenCount: 0,
checksum,
indexedAt: now
};
// Parse into snippets.
const snippets = parseFile(file, {
repositoryId: repo.id,
documentId,
versionId: normJob.versionId ?? undefined
});
// Update document token count from snippet totals.
const tokenCount = snippets.reduce((sum, s) => sum + (s.tokenCount ?? 0), 0);
newDoc.tokenCount = tokenCount;
newDocuments.push(newDoc);
newSnippets.push(...snippets);
// Write progress to the DB only on yield boundaries or the final file.
// Avoids a synchronous SQLite UPDATE on every single iteration.
const totalProcessed = diff.unchanged.length + i + 1;
const isLast = i === filesToProcess.length - 1;
if (isLast || i % YIELD_EVERY === YIELD_EVERY - 1) {
const progress = calculateProgress(
totalProcessed,
totalFiles,
0,
0,
this.embeddingService !== null
);
this.updateJob(job.id, { processedFiles: totalProcessed, progress });
reportStage(
'parsing',
`${totalProcessed} / ${totalFiles} files`,
progress,
totalProcessed,
totalFiles
);
}
}
// After the loop processedFiles should reflect the full count.
processedFiles = diff.unchanged.length + filesToProcess.length;
// ---- Stage 3: Atomic replacement ------------------------------------
reportStage('storing');
await this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
// ---- Stage 4: Embeddings (if provider is configured) ----------------
if (this.embeddingService) {
reportStage('embedding');
const snippetIds = this.embeddingService.findSnippetIdsMissingEmbeddings(
repo.id,
normJob.versionId
);
if (snippetIds.length === 0) {
// No missing embeddings for the active profile; parsing progress is final.
} else {
const embeddingsTotal = snippetIds.length;
await this.embeddingService.embedSnippets(snippetIds, (done) => {
const progress = calculateProgress(
processedFiles,
totalFiles,
done,
embeddingsTotal,
true
);
this.updateJob(job.id, { progress });
});
}
}
// ---- Stage 5: Update repository stats --------------------------------
const stats = this.computeStats(repo.id);
const freshRepo = this.getRepository(repo.id)!;
const trustScore = computeTrustScore({
...freshRepo,
totalSnippets: stats.totalSnippets,
totalTokens: stats.totalTokens,
state: 'indexed'
});
await this.updateRepo(repo.id, {
state: 'indexed',
totalSnippets: stats.totalSnippets,
totalTokens: stats.totalTokens,
trustScore,
lastIndexedAt: Math.floor(Date.now() / 1000)
});
if (normJob.versionId) {
const versionStats = this.computeVersionStats(normJob.versionId);
await this.updateVersion(normJob.versionId, {
state: 'indexed',
totalSnippets: versionStats.totalSnippets,
indexedAt: Math.floor(Date.now() / 1000)
});
}
// ---- Stage 6: Persist rules from config ----------------------------
if (parsedConfig?.config.rules?.length) {
if (!normJob.versionId) {
// Main-branch job: write the repo-wide entry only.
await this.upsertRepoConfig(repo.id, null, parsedConfig.config.rules);
} else {
// Version job: write only the version-specific entry.
// Writing to the NULL row here would overwrite repo-wide rules
// with whatever the last-indexed version happened to carry.
await this.upsertRepoConfig(repo.id, normJob.versionId, parsedConfig.config.rules);
}
}
reportStage('done');
this.updateJob(job.id, {
status: 'done',
progress: 100,
completedAt: Math.floor(Date.now() / 1000)
});
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(`[IndexingPipeline] Job ${job.id} failed: ${message}`);
reportStage('failed');
this.updateJob(job.id, {
status: 'failed',
error: message,
completedAt: Math.floor(Date.now() / 1000)
});
// Restore repo to error state but preserve any existing indexed data.
await this.updateRepo(repositoryId, { state: 'error' });
if (normJob.versionId) {
await this.updateVersion(normJob.versionId, { state: 'error' });
}
throw error;
}
}
// -------------------------------------------------------------------------
// Private — crawl
// -------------------------------------------------------------------------
private async crawl(
repo: Repository,
ref?: string,
allowedPaths?: Set<string>
): Promise<{
files: Array<{ path: string; content: string; sha: string; size: number; language: string }>;
totalFiles: number;
/** Pre-parsed trueref.json / context7.json, or undefined when absent. */
config?: TrueRefConfig;
}> {
if (repo.source === 'github') {
// Parse owner/repo from the canonical ID: "/owner/repo"
const parts = repo.id.replace(/^\//, '').split('/');
const owner = parts[0];
const repoName = parts[1];
if (!owner || !repoName) {
throw new Error(`Cannot parse GitHub owner/repo from id: ${repo.id}`);
}
const result = await this.githubCrawl({
owner,
repo: repoName,
ref: ref ?? repo.branch ?? undefined,
token: repo.githubToken ?? undefined
});
// Apply allowedPaths filter for differential indexing.
const githubFinalFiles =
allowedPaths && allowedPaths.size > 0
? result.files.filter((f) => allowedPaths.has(f.path))
: result.files;
return { files: githubFinalFiles, totalFiles: result.totalFiles };
} else {
// Local filesystem crawl.
const result = await this.localCrawler.crawl({
rootPath: repo.sourceUrl,
ref: ref ?? (repo.branch !== 'main' ? (repo.branch ?? undefined) : undefined)
});
// Apply allowedPaths filter for differential indexing.
const localFinalFiles =
allowedPaths && allowedPaths.size > 0
? result.files.filter((f) => allowedPaths.has(f.path))
: result.files;
return { files: localFinalFiles, totalFiles: result.totalFiles, config: result.config };
}
}
private getVersionTag(versionId: string): string | undefined {
const row = this.db
.prepare<[string], { tag: string }>(`SELECT tag FROM repository_versions WHERE id = ?`)
.get(versionId);
return row?.tag;
}
// -------------------------------------------------------------------------
// Private — differential clone (TRUEREF-0021)
// -------------------------------------------------------------------------
/**
* Clone documents, snippets, and embeddings from an ancestor version into
* the target version for all unchanged file paths.
*
* Runs in a single SQLite transaction for atomicity.
*/
private async cloneFromAncestor(
requestOrAncestorVersionId: CloneFromAncestorRequest | string,
targetVersionId?: string,
repositoryId?: string,
unchangedPaths?: Set<string>
): Promise<void> {
const request: CloneFromAncestorRequest =
typeof requestOrAncestorVersionId === 'string'
? {
ancestorVersionId: requestOrAncestorVersionId,
targetVersionId: targetVersionId!,
repositoryId: repositoryId!,
unchangedPaths: [...(unchangedPaths ?? new Set<string>())]
}
: requestOrAncestorVersionId;
if (request.unchangedPaths.length === 0) {
return;
}
if (this.writeDelegate?.cloneFromAncestor) {
await this.writeDelegate.cloneFromAncestor(request);
return;
}
cloneFromAncestorInDatabase(this.db, request);
}
// -------------------------------------------------------------------------
// Private — atomic snippet replacement
// -------------------------------------------------------------------------
private async replaceSnippets(
_repositoryId: string,
changedDocIds: string[],
newDocuments: NewDocument[],
newSnippets: NewSnippet[]
): Promise<void> {
if (this.writeDelegate?.replaceSnippets) {
await this.writeDelegate.replaceSnippets(changedDocIds, newDocuments, newSnippets);
return;
}
replaceSnippetsInDatabase(this.db, changedDocIds, newDocuments, newSnippets);
}
// -------------------------------------------------------------------------
// Private — stats
// -------------------------------------------------------------------------
private computeStats(repositoryId: string): { totalSnippets: number; totalTokens: number } {
const row = this.db
.prepare<[string], { total_snippets: number; total_tokens: number }>(
`SELECT COUNT(*) as total_snippets,
COALESCE(SUM(token_count), 0) as total_tokens
FROM snippets WHERE repository_id = ?`
)
.get(repositoryId);
return {
totalSnippets: row?.total_snippets ?? 0,
totalTokens: row?.total_tokens ?? 0
};
}
private computeVersionStats(versionId: string): { totalSnippets: number } {
const row = this.db
.prepare<
[string],
{ total_snippets: number }
>(`SELECT COUNT(*) as total_snippets FROM snippets WHERE version_id = ?`)
.get(versionId);
return { totalSnippets: row?.total_snippets ?? 0 };
}
// -------------------------------------------------------------------------
// Private — DB helpers
// -------------------------------------------------------------------------
private getExistingDocuments(repositoryId: string, versionId: string | null): Document[] {
if (versionId) {
return this.db
.prepare<[string, string], Document>(
`SELECT id, repository_id as repositoryId, version_id as versionId,
file_path as filePath, title, language, token_count as tokenCount,
checksum, indexed_at as indexedAt
FROM documents WHERE repository_id = ? AND version_id = ?`
)
.all(repositoryId, versionId) as Document[];
}
return this.db
.prepare<[string], Document>(
`SELECT id, repository_id as repositoryId, version_id as versionId,
file_path as filePath, title, language, token_count as tokenCount,
checksum, indexed_at as indexedAt
FROM documents WHERE repository_id = ? AND version_id IS NULL`
)
.all(repositoryId) as Document[];
}
private getRepository(id: string): Repository | null {
const raw = this.db
.prepare<[string], RepositoryEntity>(`SELECT * FROM repositories WHERE id = ?`)
.get(id);
return raw ? RepositoryMapper.fromEntity(new RepositoryEntity(raw)) : null;
}
private updateJob(id: string, fields: Record<string, unknown>): void {
if (this.writeDelegate?.persistJobUpdates === false) {
return;
}
const sets = Object.keys(fields)
.map((k) => `${toSnake(k)} = ?`)
.join(', ');
const values = [...Object.values(fields), id];
this.db.prepare(`UPDATE indexing_jobs SET ${sets} WHERE id = ?`).run(...values);
}
private async updateRepo(id: string, fields: SerializedFields): Promise<void> {
if (this.writeDelegate?.updateRepo) {
await this.writeDelegate.updateRepo(id, fields);
return;
}
updateRepoInDatabase(this.db, id, fields);
}
private async updateVersion(id: string, fields: SerializedFields): Promise<void> {
if (this.writeDelegate?.updateVersion) {
await this.writeDelegate.updateVersion(id, fields);
return;
}
updateVersionInDatabase(this.db, id, fields);
}
private async upsertRepoConfig(
repositoryId: string,
versionId: string | null,
rules: string[]
): Promise<void> {
if (this.writeDelegate?.upsertRepoConfig) {
await this.writeDelegate.upsertRepoConfig(repositoryId, versionId, rules);
return;
}
const now = Math.floor(Date.now() / 1000);
// Use DELETE + INSERT because ON CONFLICT … DO UPDATE doesn't work reliably
// with partial unique indexes in all SQLite versions.
if (versionId === null) {
this.db
.prepare(`DELETE FROM repository_configs WHERE repository_id = ? AND version_id IS NULL`)
.run(repositoryId);
} else {
this.db
.prepare(`DELETE FROM repository_configs WHERE repository_id = ? AND version_id = ?`)
.run(repositoryId, versionId);
}
this.db
.prepare(
`INSERT INTO repository_configs (repository_id, version_id, rules, updated_at)
VALUES (?, ?, ?, ?)`
)
.run(repositoryId, versionId, JSON.stringify(rules), now);
}
// -------------------------------------------------------------------------
// Private — static helpers
// -------------------------------------------------------------------------
/**
* Returns true when `filePath` matches the given exclude `pattern`.
*
* Supported patterns:
* - Plain filename: `migration-guide.md` matches any path ending in `/migration-guide.md`
* or equal to `migration-guide.md`.
* - Glob prefix with wildcard: `docs/migration*` matches paths that start with `docs/migration`.
* - Exact path: `src/legacy/old-api.ts` matches exactly that path.
*/
private static matchesExcludePattern(filePath: string, pattern: string): boolean {
if (pattern.includes('*')) {
// Glob-style: treat everything before the '*' as a required prefix.
const prefix = pattern.slice(0, pattern.indexOf('*'));
return filePath.startsWith(prefix);
}
// No wildcard — treat as plain name or exact path.
if (!pattern.includes('/')) {
// Plain filename: match basename (path ends with /<pattern> or equals pattern).
return filePath === pattern || filePath.endsWith('/' + pattern);
}
// Contains a slash — exact path match.
return filePath === pattern;
}
}
// ---------------------------------------------------------------------------
// Utility
// ---------------------------------------------------------------------------
/** Convert camelCase to snake_case for DB column mapping. */
function toSnake(key: string): string {
return key.replace(/[A-Z]/g, (c) => `_${c.toLowerCase()}`);
}