TRUEREF-0023 rewrite indexing pipeline - parallel reads - serialized writes

This commit is contained in:
Giancarmine Salucci
2026-04-02 09:49:38 +02:00
parent 9525c58e9a
commit f86be4106b
68 changed files with 5042 additions and 3131 deletions

View File

@@ -44,7 +44,10 @@ function createTestDb(): Database.Database {
'0004_complete_sentry.sql'
]) {
const sql = readFileSync(join(migrationsFolder, migrationFile), 'utf-8');
for (const stmt of sql.split('--> statement-breakpoint').map((s) => s.trim()).filter(Boolean)) {
for (const stmt of sql
.split('--> statement-breakpoint')
.map((s) => s.trim())
.filter(Boolean)) {
client.exec(stmt);
}
}
@@ -113,9 +116,10 @@ function insertDocument(db: Database.Database, versionId: string, filePath: stri
.run(
id,
db
.prepare<[string], { repository_id: string }>(
`SELECT repository_id FROM repository_versions WHERE id = ?`
)
.prepare<
[string],
{ repository_id: string }
>(`SELECT repository_id FROM repository_versions WHERE id = ?`)
.get(versionId)?.repository_id ?? '/test/repo',
versionId,
filePath,
@@ -280,9 +284,9 @@ describe('buildDifferentialPlan', () => {
insertDocument(db, v1Id, 'packages/react/index.js');
insertDocument(db, v1Id, 'packages/react-dom/index.js');
const fetchFn = vi.fn().mockResolvedValue([
{ path: 'packages/react/index.js', status: 'modified' as const }
]);
const fetchFn = vi
.fn()
.mockResolvedValue([{ path: 'packages/react/index.js', status: 'modified' as const }]);
const plan = await buildDifferentialPlan({
repo,
@@ -292,13 +296,7 @@ describe('buildDifferentialPlan', () => {
});
expect(fetchFn).toHaveBeenCalledOnce();
expect(fetchFn).toHaveBeenCalledWith(
'facebook',
'react',
'v18.0.0',
'v18.1.0',
'ghp_test123'
);
expect(fetchFn).toHaveBeenCalledWith('facebook', 'react', 'v18.0.0', 'v18.1.0', 'ghp_test123');
expect(plan).not.toBeNull();
expect(plan!.changedPaths.has('packages/react/index.js')).toBe(true);

View File

@@ -41,9 +41,7 @@ export async function buildDifferentialPlan(params: {
try {
// 1. Load all indexed versions for this repository
const rows = db
.prepare(
`SELECT * FROM repository_versions WHERE repository_id = ? AND state = 'indexed'`
)
.prepare(`SELECT * FROM repository_versions WHERE repository_id = ? AND state = 'indexed'`)
.all(repo.id) as RepositoryVersionEntity[];
const indexedVersions: RepositoryVersion[] = rows.map((row) =>

View File

@@ -1,10 +1,19 @@
import { workerData, parentPort } from 'node:worker_threads';
import Database from 'better-sqlite3';
import { EmbeddingService } from '$lib/server/embeddings/embedding.service.js';
import { applySqlitePragmas } from '$lib/server/db/connection.js';
import { createProviderFromProfile } from '$lib/server/embeddings/registry.js';
import { EmbeddingProfileMapper } from '$lib/server/mappers/embedding-profile.mapper.js';
import { EmbeddingProfileEntity, type EmbeddingProfileEntityProps } from '$lib/server/models/embedding-profile.js';
import type { EmbedWorkerRequest, EmbedWorkerResponse, WorkerInitData } from './worker-types.js';
import {
EmbeddingProfileEntity,
type EmbeddingProfileEntityProps
} from '$lib/server/models/embedding-profile.js';
import type {
EmbedWorkerRequest,
EmbedWorkerResponse,
SerializedEmbedding,
WorkerInitData
} from './worker-types.js';
const { dbPath, embeddingProfileId } = workerData as WorkerInitData;
@@ -18,17 +27,12 @@ if (!embeddingProfileId) {
}
const db = new Database(dbPath);
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
db.pragma('busy_timeout = 5000');
db.pragma('synchronous = NORMAL');
db.pragma('cache_size = -65536');
db.pragma('temp_store = MEMORY');
db.pragma('mmap_size = 268435456');
db.pragma('wal_autocheckpoint = 1000');
applySqlitePragmas(db);
// Load the embedding profile from DB
const rawProfile = db.prepare('SELECT * FROM embedding_profiles WHERE id = ?').get(embeddingProfileId);
const rawProfile = db
.prepare('SELECT * FROM embedding_profiles WHERE id = ?')
.get(embeddingProfileId);
if (!rawProfile) {
db.close();
@@ -43,9 +47,55 @@ if (!rawProfile) {
const profileEntity = new EmbeddingProfileEntity(rawProfile as EmbeddingProfileEntityProps);
const profile = EmbeddingProfileMapper.fromEntity(profileEntity);
let pendingWrite: {
jobId: string;
resolve: () => void;
reject: (error: Error) => void;
} | null = null;
let currentJobId: string | null = null;
function requestWrite(
message: Extract<EmbedWorkerResponse, { type: 'write_embeddings' }>
): Promise<void> {
if (pendingWrite) {
return Promise.reject(new Error(`write request already in flight for ${pendingWrite.jobId}`));
}
return new Promise((resolve, reject) => {
pendingWrite = {
jobId: message.jobId,
resolve: () => {
pendingWrite = null;
resolve();
},
reject: (error: Error) => {
pendingWrite = null;
reject(error);
}
};
parentPort!.postMessage(message);
});
}
// Create provider and embedding service
const provider = createProviderFromProfile(profile);
const embeddingService = new EmbeddingService(db, provider, embeddingProfileId);
const embeddingService = new EmbeddingService(db, provider, embeddingProfileId, {
persistEmbeddings: async (embeddings) => {
const serializedEmbeddings: SerializedEmbedding[] = embeddings.map((item) => ({
snippetId: item.snippetId,
profileId: item.profileId,
model: item.model,
dimensions: item.dimensions,
embedding: Uint8Array.from(item.embedding)
}));
await requestWrite({
type: 'write_embeddings',
jobId: currentJobId ?? 'unknown',
embeddings: serializedEmbeddings
});
}
});
// Signal ready after service initialization
parentPort!.postMessage({
@@ -53,12 +103,27 @@ parentPort!.postMessage({
} satisfies EmbedWorkerResponse);
parentPort!.on('message', async (msg: EmbedWorkerRequest) => {
if (msg.type === 'write_ack') {
if (pendingWrite?.jobId === msg.jobId) {
pendingWrite.resolve();
}
return;
}
if (msg.type === 'write_error') {
if (pendingWrite?.jobId === msg.jobId) {
pendingWrite.reject(new Error(msg.error));
}
return;
}
if (msg.type === 'shutdown') {
db.close();
process.exit(0);
}
if (msg.type === 'embed') {
currentJobId = msg.jobId;
try {
const snippetIds = embeddingService.findSnippetIdsMissingEmbeddings(
msg.repositoryId,
@@ -84,6 +149,8 @@ parentPort!.on('message', async (msg: EmbedWorkerRequest) => {
jobId: msg.jobId,
error: err instanceof Error ? err.message : String(err)
} satisfies EmbedWorkerResponse);
} finally {
currentJobId = null;
}
}
});

View File

@@ -466,12 +466,15 @@ describe('IndexingPipeline', () => {
const job1 = makeJob();
await pipeline.run(job1 as never);
const firstSnippetIds = (db.prepare(`SELECT id FROM snippets ORDER BY id`).all() as { id: string }[])
.map((row) => row.id);
const firstSnippetIds = (
db.prepare(`SELECT id FROM snippets ORDER BY id`).all() as { id: string }[]
).map((row) => row.id);
expect(firstSnippetIds.length).toBeGreaterThan(0);
const firstEmbeddingCount = (
db.prepare(`SELECT COUNT(*) as n FROM snippet_embeddings WHERE profile_id = 'local-default'`).get() as {
db
.prepare(`SELECT COUNT(*) as n FROM snippet_embeddings WHERE profile_id = 'local-default'`)
.get() as {
n: number;
}
).n;
@@ -483,11 +486,15 @@ describe('IndexingPipeline', () => {
const job2 = db.prepare(`SELECT * FROM indexing_jobs WHERE id = ?`).get(job2Id) as never;
await pipeline.run(job2);
const secondSnippetIds = (db.prepare(`SELECT id FROM snippets ORDER BY id`).all() as {
id: string;
}[]).map((row) => row.id);
const secondSnippetIds = (
db.prepare(`SELECT id FROM snippets ORDER BY id`).all() as {
id: string;
}[]
).map((row) => row.id);
const secondEmbeddingCount = (
db.prepare(`SELECT COUNT(*) as n FROM snippet_embeddings WHERE profile_id = 'local-default'`).get() as {
db
.prepare(`SELECT COUNT(*) as n FROM snippet_embeddings WHERE profile_id = 'local-default'`)
.get() as {
n: number;
}
).n;
@@ -918,9 +925,9 @@ describe('IndexingPipeline', () => {
await pipeline.run(job as never);
const docs = db
.prepare(`SELECT file_path FROM documents ORDER BY file_path`)
.all() as { file_path: string }[];
const docs = db.prepare(`SELECT file_path FROM documents ORDER BY file_path`).all() as {
file_path: string;
}[];
const filePaths = docs.map((d) => d.file_path);
// migration-guide.md and docs/legacy-api.md must be absent.
@@ -956,7 +963,10 @@ describe('IndexingPipeline', () => {
expect(row).toBeDefined();
const rules = JSON.parse(row!.rules);
expect(rules).toEqual(['Always use TypeScript strict mode', 'Prefer async/await over callbacks']);
expect(rules).toEqual([
'Always use TypeScript strict mode',
'Prefer async/await over callbacks'
]);
});
it('persists version-specific rules under (repositoryId, versionId) when job has versionId', async () => {
@@ -1219,12 +1229,7 @@ describe('differential indexing', () => {
insertSnippet(db, doc1Id, { repository_id: '/test/repo', version_id: ancestorVersionId });
insertSnippet(db, doc2Id, { repository_id: '/test/repo', version_id: ancestorVersionId });
const pipeline = new IndexingPipeline(
db,
vi.fn() as never,
{ crawl: vi.fn() } as never,
null
);
const pipeline = new IndexingPipeline(db, vi.fn() as never, { crawl: vi.fn() } as never, null);
(pipeline as unknown as PipelineInternals).cloneFromAncestor(
ancestorVersionId,
targetVersionId,
@@ -1236,9 +1241,7 @@ describe('differential indexing', () => {
.prepare(`SELECT * FROM documents WHERE version_id = ?`)
.all(targetVersionId) as { id: string; file_path: string }[];
expect(targetDocs).toHaveLength(2);
expect(targetDocs.map((d) => d.file_path).sort()).toEqual(
['README.md', 'src/index.ts'].sort()
);
expect(targetDocs.map((d) => d.file_path).sort()).toEqual(['README.md', 'src/index.ts'].sort());
// New IDs must differ from ancestor doc IDs.
const targetDocIds = targetDocs.map((d) => d.id);
expect(targetDocIds).not.toContain(doc1Id);
@@ -1261,12 +1264,7 @@ describe('differential indexing', () => {
checksum: 'sha-main'
});
const pipeline = new IndexingPipeline(
db,
vi.fn() as never,
{ crawl: vi.fn() } as never,
null
);
const pipeline = new IndexingPipeline(db, vi.fn() as never, { crawl: vi.fn() } as never, null);
(pipeline as unknown as PipelineInternals).cloneFromAncestor(
ancestorVersionId,
targetVersionId,
@@ -1323,9 +1321,9 @@ describe('differential indexing', () => {
await pipeline.run(job);
const updatedJob = db
.prepare(`SELECT status FROM indexing_jobs WHERE id = ?`)
.get(jobId) as { status: string };
const updatedJob = db.prepare(`SELECT status FROM indexing_jobs WHERE id = ?`).get(jobId) as {
status: string;
};
expect(updatedJob.status).toBe('done');
const docs = db
@@ -1375,9 +1373,7 @@ describe('differential indexing', () => {
deletedPaths: new Set<string>(),
unchangedPaths: new Set(['unchanged.md'])
};
const spy = vi
.spyOn(diffStrategy, 'buildDifferentialPlan')
.mockResolvedValueOnce(mockPlan);
const spy = vi.spyOn(diffStrategy, 'buildDifferentialPlan').mockResolvedValueOnce(mockPlan);
const pipeline = new IndexingPipeline(
db,
@@ -1398,9 +1394,9 @@ describe('differential indexing', () => {
spy.mockRestore();
// 6. Assert job completed and both docs exist under the target version.
const finalJob = db
.prepare(`SELECT status FROM indexing_jobs WHERE id = ?`)
.get(jobId) as { status: string };
const finalJob = db.prepare(`SELECT status FROM indexing_jobs WHERE id = ?`).get(jobId) as {
status: string;
};
expect(finalJob.status).toBe('done');
const targetDocs = db

View File

@@ -28,6 +28,14 @@ import { parseFile } from '$lib/server/parser/index.js';
import { computeTrustScore } from '$lib/server/search/trust-score.js';
import { computeDiff } from './diff.js';
import { buildDifferentialPlan, type DifferentialPlan } from './differential-strategy.js';
import {
cloneFromAncestor as cloneFromAncestorInDatabase,
replaceSnippets as replaceSnippetsInDatabase,
updateRepo as updateRepoInDatabase,
updateVersion as updateVersionInDatabase,
type CloneFromAncestorRequest
} from './write-operations.js';
import type { SerializedFields } from './worker-types.js';
// ---------------------------------------------------------------------------
// Progress calculation
@@ -70,7 +78,23 @@ export class IndexingPipeline {
private readonly db: Database.Database,
private readonly githubCrawl: typeof GithubCrawlFn,
private readonly localCrawler: LocalCrawler,
private readonly embeddingService: EmbeddingService | null
private readonly embeddingService: EmbeddingService | null,
private readonly writeDelegate?: {
persistJobUpdates?: boolean;
replaceSnippets?: (
changedDocIds: string[],
newDocuments: NewDocument[],
newSnippets: NewSnippet[]
) => Promise<void>;
cloneFromAncestor?: (request: CloneFromAncestorRequest) => Promise<void>;
updateRepo?: (repositoryId: string, fields: SerializedFields) => Promise<void>;
updateVersion?: (versionId: string, fields: SerializedFields) => Promise<void>;
upsertRepoConfig?: (
repositoryId: string,
versionId: string | null,
rules: string[]
) => Promise<void>;
}
) {
this.sqliteVecStore = new SqliteVecStore(db);
}
@@ -117,14 +141,12 @@ export class IndexingPipeline {
if (!repo) throw new Error(`Repository ${repositoryId} not found`);
// Mark repo as actively indexing.
this.updateRepo(repo.id, { state: 'indexing' });
await this.updateRepo(repo.id, { state: 'indexing' });
if (normJob.versionId) {
this.updateVersion(normJob.versionId, { state: 'indexing' });
await this.updateVersion(normJob.versionId, { state: 'indexing' });
}
const versionTag = normJob.versionId
? this.getVersionTag(normJob.versionId)
: undefined;
const versionTag = normJob.versionId ? this.getVersionTag(normJob.versionId) : undefined;
// ---- Stage 0: Differential strategy (TRUEREF-0021) ----------------------
// When indexing a tagged version, check if we can inherit unchanged files
@@ -147,12 +169,12 @@ export class IndexingPipeline {
// If a differential plan exists, clone unchanged files from ancestor.
if (differentialPlan && differentialPlan.unchangedPaths.size > 0) {
reportStage('cloning');
this.cloneFromAncestor(
differentialPlan.ancestorVersionId,
normJob.versionId!,
repo.id,
differentialPlan.unchangedPaths
);
await this.cloneFromAncestor({
ancestorVersionId: differentialPlan.ancestorVersionId,
targetVersionId: normJob.versionId!,
repositoryId: repo.id,
unchangedPaths: [...differentialPlan.unchangedPaths]
});
console.info(
`[IndexingPipeline] Differential indexing: cloned ${differentialPlan.unchangedPaths.size} unchanged files from ${differentialPlan.ancestorTag}`
);
@@ -174,7 +196,11 @@ export class IndexingPipeline {
if (crawlResult.config) {
// Config was pre-parsed by the crawler — wrap it in a ParsedConfig
// shell so the rest of the pipeline can use it uniformly.
parsedConfig = { config: crawlResult.config, source: 'trueref.json', warnings: [] } satisfies ParsedConfig;
parsedConfig = {
config: crawlResult.config,
source: 'trueref.json',
warnings: []
} satisfies ParsedConfig;
} else {
const configFile = crawlResult.files.find(
(f) => f.path === 'trueref.json' || f.path === 'context7.json'
@@ -189,7 +215,10 @@ export class IndexingPipeline {
const filteredFiles =
excludeFiles.length > 0
? crawlResult.files.filter(
(f) => !excludeFiles.some((pattern) => IndexingPipeline.matchesExcludePattern(f.path, pattern))
(f) =>
!excludeFiles.some((pattern) =>
IndexingPipeline.matchesExcludePattern(f.path, pattern)
)
)
: crawlResult.files;
@@ -303,7 +332,13 @@ export class IndexingPipeline {
this.embeddingService !== null
);
this.updateJob(job.id, { processedFiles: totalProcessed, progress });
reportStage('parsing', `${totalProcessed} / ${totalFiles} files`, progress, totalProcessed, totalFiles);
reportStage(
'parsing',
`${totalProcessed} / ${totalFiles} files`,
progress,
totalProcessed,
totalFiles
);
}
}
@@ -312,7 +347,7 @@ export class IndexingPipeline {
// ---- Stage 3: Atomic replacement ------------------------------------
reportStage('storing');
this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
await this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
// ---- Stage 4: Embeddings (if provider is configured) ----------------
if (this.embeddingService) {
@@ -325,7 +360,7 @@ export class IndexingPipeline {
if (snippetIds.length === 0) {
// No missing embeddings for the active profile; parsing progress is final.
} else {
const embeddingsTotal = snippetIds.length;
const embeddingsTotal = snippetIds.length;
await this.embeddingService.embedSnippets(snippetIds, (done) => {
const progress = calculateProgress(
@@ -350,7 +385,7 @@ export class IndexingPipeline {
state: 'indexed'
});
this.updateRepo(repo.id, {
await this.updateRepo(repo.id, {
state: 'indexed',
totalSnippets: stats.totalSnippets,
totalTokens: stats.totalTokens,
@@ -360,7 +395,7 @@ export class IndexingPipeline {
if (normJob.versionId) {
const versionStats = this.computeVersionStats(normJob.versionId);
this.updateVersion(normJob.versionId, {
await this.updateVersion(normJob.versionId, {
state: 'indexed',
totalSnippets: versionStats.totalSnippets,
indexedAt: Math.floor(Date.now() / 1000)
@@ -371,12 +406,12 @@ export class IndexingPipeline {
if (parsedConfig?.config.rules?.length) {
if (!normJob.versionId) {
// Main-branch job: write the repo-wide entry only.
this.upsertRepoConfig(repo.id, null, parsedConfig.config.rules);
await this.upsertRepoConfig(repo.id, null, parsedConfig.config.rules);
} else {
// Version job: write only the version-specific entry.
// Writing to the NULL row here would overwrite repo-wide rules
// with whatever the last-indexed version happened to carry.
this.upsertRepoConfig(repo.id, normJob.versionId, parsedConfig.config.rules);
await this.upsertRepoConfig(repo.id, normJob.versionId, parsedConfig.config.rules);
}
}
@@ -398,9 +433,9 @@ export class IndexingPipeline {
});
// Restore repo to error state but preserve any existing indexed data.
this.updateRepo(repositoryId, { state: 'error' });
await this.updateRepo(repositoryId, { state: 'error' });
if (normJob.versionId) {
this.updateVersion(normJob.versionId, { state: 'error' });
await this.updateVersion(normJob.versionId, { state: 'error' });
}
throw error;
@@ -411,7 +446,11 @@ export class IndexingPipeline {
// Private — crawl
// -------------------------------------------------------------------------
private async crawl(repo: Repository, ref?: string, allowedPaths?: Set<string>): Promise<{
private async crawl(
repo: Repository,
ref?: string,
allowedPaths?: Set<string>
): Promise<{
files: Array<{ path: string; content: string; sha: string; size: number; language: string }>;
totalFiles: number;
/** Pre-parsed trueref.json / context7.json, or undefined when absent. */
@@ -473,219 +512,50 @@ export class IndexingPipeline {
*
* Runs in a single SQLite transaction for atomicity.
*/
private cloneFromAncestor(
ancestorVersionId: string,
targetVersionId: string,
repositoryId: string,
unchangedPaths: Set<string>
): void {
this.db.transaction(() => {
const pathList = [...unchangedPaths];
const placeholders = pathList.map(() => '?').join(',');
const ancestorDocs = this.db
.prepare(
`SELECT * FROM documents WHERE version_id = ? AND file_path IN (${placeholders})`
)
.all(ancestorVersionId, ...pathList) as Array<{
id: string;
repository_id: string;
file_path: string;
title: string | null;
language: string | null;
token_count: number;
checksum: string;
indexed_at: number;
}>;
private async cloneFromAncestor(
requestOrAncestorVersionId: CloneFromAncestorRequest | string,
targetVersionId?: string,
repositoryId?: string,
unchangedPaths?: Set<string>
): Promise<void> {
const request: CloneFromAncestorRequest =
typeof requestOrAncestorVersionId === 'string'
? {
ancestorVersionId: requestOrAncestorVersionId,
targetVersionId: targetVersionId!,
repositoryId: repositoryId!,
unchangedPaths: [...(unchangedPaths ?? new Set<string>())]
}
: requestOrAncestorVersionId;
const docIdMap = new Map<string, string>();
const nowEpoch = Math.floor(Date.now() / 1000);
if (request.unchangedPaths.length === 0) {
return;
}
for (const doc of ancestorDocs) {
const newDocId = randomUUID();
docIdMap.set(doc.id, newDocId);
this.db
.prepare(
`INSERT INTO documents (id, repository_id, version_id, file_path, title, language, token_count, checksum, indexed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
)
.run(
newDocId,
repositoryId,
targetVersionId,
doc.file_path,
doc.title,
doc.language,
doc.token_count,
doc.checksum,
nowEpoch
);
}
if (this.writeDelegate?.cloneFromAncestor) {
await this.writeDelegate.cloneFromAncestor(request);
return;
}
if (docIdMap.size === 0) return;
const oldDocIds = [...docIdMap.keys()];
const snippetPlaceholders = oldDocIds.map(() => '?').join(',');
const ancestorSnippets = this.db
.prepare(
`SELECT * FROM snippets WHERE document_id IN (${snippetPlaceholders})`
)
.all(...oldDocIds) as Array<{
id: string;
document_id: string;
repository_id: string;
version_id: string | null;
type: string;
title: string | null;
content: string;
language: string | null;
breadcrumb: string | null;
token_count: number;
created_at: number;
}>;
const snippetIdMap = new Map<string, string>();
for (const snippet of ancestorSnippets) {
const newSnippetId = randomUUID();
snippetIdMap.set(snippet.id, newSnippetId);
const newDocId = docIdMap.get(snippet.document_id)!;
this.db
.prepare(
`INSERT INTO snippets (id, document_id, repository_id, version_id, type, title, content, language, breadcrumb, token_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
)
.run(
newSnippetId,
newDocId,
repositoryId,
targetVersionId,
snippet.type,
snippet.title,
snippet.content,
snippet.language,
snippet.breadcrumb,
snippet.token_count,
snippet.created_at
);
}
if (snippetIdMap.size > 0) {
const oldSnippetIds = [...snippetIdMap.keys()];
const embPlaceholders = oldSnippetIds.map(() => '?').join(',');
const ancestorEmbeddings = this.db
.prepare(
`SELECT * FROM snippet_embeddings WHERE snippet_id IN (${embPlaceholders})`
)
.all(...oldSnippetIds) as Array<{
snippet_id: string;
profile_id: string;
model: string;
dimensions: number;
embedding: Buffer;
created_at: number;
}>;
for (const emb of ancestorEmbeddings) {
const newSnippetId = snippetIdMap.get(emb.snippet_id)!;
this.db
.prepare(
`INSERT INTO snippet_embeddings (snippet_id, profile_id, model, dimensions, embedding, created_at)
VALUES (?, ?, ?, ?, ?, ?)`
)
.run(
newSnippetId,
emb.profile_id,
emb.model,
emb.dimensions,
emb.embedding,
emb.created_at
);
this.sqliteVecStore.upsertEmbeddingBuffer(
emb.profile_id,
newSnippetId,
emb.embedding,
emb.dimensions
);
}
}
})();
cloneFromAncestorInDatabase(this.db, request);
}
// -------------------------------------------------------------------------
// Private — atomic snippet replacement
// -------------------------------------------------------------------------
private replaceSnippets(
private async replaceSnippets(
_repositoryId: string,
changedDocIds: string[],
newDocuments: NewDocument[],
newSnippets: NewSnippet[]
): void {
const insertDoc = this.db.prepare(
`INSERT INTO documents
(id, repository_id, version_id, file_path, title, language,
token_count, checksum, indexed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
): Promise<void> {
if (this.writeDelegate?.replaceSnippets) {
await this.writeDelegate.replaceSnippets(changedDocIds, newDocuments, newSnippets);
return;
}
const insertSnippet = this.db.prepare(
`INSERT INTO snippets
(id, document_id, repository_id, version_id, type, title,
content, language, breadcrumb, token_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
this.db.transaction(() => {
this.sqliteVecStore.deleteEmbeddingsForDocumentIds(changedDocIds);
// Delete stale documents (cascade deletes their snippets via FK).
if (changedDocIds.length > 0) {
const placeholders = changedDocIds.map(() => '?').join(',');
this.db
.prepare(`DELETE FROM documents WHERE id IN (${placeholders})`)
.run(...changedDocIds);
}
// Insert new documents.
for (const doc of newDocuments) {
const indexedAtSeconds =
doc.indexedAt instanceof Date
? Math.floor(doc.indexedAt.getTime() / 1000)
: Math.floor(Date.now() / 1000);
insertDoc.run(
doc.id,
doc.repositoryId,
doc.versionId ?? null,
doc.filePath,
doc.title ?? null,
doc.language ?? null,
doc.tokenCount ?? 0,
doc.checksum,
indexedAtSeconds
);
}
// Insert new snippets.
for (const snippet of newSnippets) {
const createdAtSeconds =
snippet.createdAt instanceof Date
? Math.floor(snippet.createdAt.getTime() / 1000)
: Math.floor(Date.now() / 1000);
insertSnippet.run(
snippet.id,
snippet.documentId,
snippet.repositoryId,
snippet.versionId ?? null,
snippet.type,
snippet.title ?? null,
snippet.content,
snippet.language ?? null,
snippet.breadcrumb ?? null,
snippet.tokenCount ?? 0,
createdAtSeconds
);
}
})();
replaceSnippetsInDatabase(this.db, changedDocIds, newDocuments, newSnippets);
}
// -------------------------------------------------------------------------
@@ -709,9 +579,10 @@ export class IndexingPipeline {
private computeVersionStats(versionId: string): { totalSnippets: number } {
const row = this.db
.prepare<[string], { total_snippets: number }>(
`SELECT COUNT(*) as total_snippets FROM snippets WHERE version_id = ?`
)
.prepare<
[string],
{ total_snippets: number }
>(`SELECT COUNT(*) as total_snippets FROM snippets WHERE version_id = ?`)
.get(versionId);
return { totalSnippets: row?.total_snippets ?? 0 };
@@ -750,6 +621,10 @@ export class IndexingPipeline {
}
private updateJob(id: string, fields: Record<string, unknown>): void {
if (this.writeDelegate?.persistJobUpdates === false) {
return;
}
const sets = Object.keys(fields)
.map((k) => `${toSnake(k)} = ?`)
.join(', ');
@@ -757,43 +632,44 @@ export class IndexingPipeline {
this.db.prepare(`UPDATE indexing_jobs SET ${sets} WHERE id = ?`).run(...values);
}
private updateRepo(id: string, fields: Record<string, unknown>): void {
const now = Math.floor(Date.now() / 1000);
const allFields = { ...fields, updatedAt: now };
const sets = Object.keys(allFields)
.map((k) => `${toSnake(k)} = ?`)
.join(', ');
const values = [...Object.values(allFields), id];
this.db.prepare(`UPDATE repositories SET ${sets} WHERE id = ?`).run(...values);
private async updateRepo(id: string, fields: SerializedFields): Promise<void> {
if (this.writeDelegate?.updateRepo) {
await this.writeDelegate.updateRepo(id, fields);
return;
}
updateRepoInDatabase(this.db, id, fields);
}
private updateVersion(id: string, fields: Record<string, unknown>): void {
const sets = Object.keys(fields)
.map((k) => `${toSnake(k)} = ?`)
.join(', ');
const values = [...Object.values(fields), id];
this.db.prepare(`UPDATE repository_versions SET ${sets} WHERE id = ?`).run(...values);
private async updateVersion(id: string, fields: SerializedFields): Promise<void> {
if (this.writeDelegate?.updateVersion) {
await this.writeDelegate.updateVersion(id, fields);
return;
}
updateVersionInDatabase(this.db, id, fields);
}
private upsertRepoConfig(
private async upsertRepoConfig(
repositoryId: string,
versionId: string | null,
rules: string[]
): void {
): Promise<void> {
if (this.writeDelegate?.upsertRepoConfig) {
await this.writeDelegate.upsertRepoConfig(repositoryId, versionId, rules);
return;
}
const now = Math.floor(Date.now() / 1000);
// Use DELETE + INSERT because ON CONFLICT … DO UPDATE doesn't work reliably
// with partial unique indexes in all SQLite versions.
if (versionId === null) {
this.db
.prepare(
`DELETE FROM repository_configs WHERE repository_id = ? AND version_id IS NULL`
)
.prepare(`DELETE FROM repository_configs WHERE repository_id = ? AND version_id IS NULL`)
.run(repositoryId);
} else {
this.db
.prepare(
`DELETE FROM repository_configs WHERE repository_id = ? AND version_id = ?`
)
.prepare(`DELETE FROM repository_configs WHERE repository_id = ? AND version_id = ?`)
.run(repositoryId, versionId);
}
this.db

View File

@@ -36,10 +36,10 @@ function normalizeStatuses(status?: JobStatusFilter): Array<IndexingJob['status'
return [...new Set(statuses)];
}
function buildJobFilterQuery(options?: {
repositoryId?: string;
status?: JobStatusFilter;
}): { where: string; params: unknown[] } {
function buildJobFilterQuery(options?: { repositoryId?: string; status?: JobStatusFilter }): {
where: string;
params: unknown[];
} {
const conditions: string[] = [];
const params: unknown[] = [];
@@ -164,7 +164,9 @@ export class JobQueue {
*/
private async processNext(): Promise<void> {
// Fallback path: no worker pool configured, run directly (used by tests and dev mode)
console.warn('[JobQueue] Running in fallback mode (no worker pool) — direct pipeline execution.');
console.warn(
'[JobQueue] Running in fallback mode (no worker pool) — direct pipeline execution.'
);
const rawJob = this.db
.prepare<[], IndexingJobEntity>(
@@ -176,7 +178,9 @@ export class JobQueue {
if (!rawJob) return;
console.warn('[JobQueue] processNext: no pipeline or pool configured — skipping job processing');
console.warn(
'[JobQueue] processNext: no pipeline or pool configured — skipping job processing'
);
}
/**

View File

@@ -181,7 +181,9 @@ describe('ProgressBroadcaster', () => {
concurrency: 2,
active: 1,
idle: 1,
workers: [{ index: 0, state: 'running', jobId: 'job-1', repositoryId: '/repo/1', versionId: null }]
workers: [
{ index: 0, state: 'running', jobId: 'job-1', repositoryId: '/repo/1', versionId: null }
]
});
const { value } = await reader.read();

View File

@@ -19,6 +19,7 @@ import { WorkerPool } from './worker-pool.js';
import { initBroadcaster } from './progress-broadcaster.js';
import type { ProgressBroadcaster } from './progress-broadcaster.js';
import path from 'node:path';
import { existsSync } from 'node:fs';
import { fileURLToPath } from 'node:url';
// ---------------------------------------------------------------------------
@@ -57,6 +58,21 @@ let _pipeline: IndexingPipeline | null = null;
let _pool: WorkerPool | null = null;
let _broadcaster: ProgressBroadcaster | null = null;
function resolveWorkerScript(...segments: string[]): string {
const candidates = [
path.resolve(process.cwd(), ...segments),
path.resolve(path.dirname(fileURLToPath(import.meta.url)), '../../../../', ...segments)
];
for (const candidate of candidates) {
if (existsSync(candidate)) {
return candidate;
}
}
return candidates[0];
}
/**
* Initialise (or return the existing) JobQueue + IndexingPipeline pair.
*
@@ -91,19 +107,17 @@ export function initializePipeline(
const getRepositoryIdForJob = (jobId: string): string => {
const row = db
.prepare<[string], { repository_id: string }>(
`SELECT repository_id FROM indexing_jobs WHERE id = ?`
)
.prepare<
[string],
{ repository_id: string }
>(`SELECT repository_id FROM indexing_jobs WHERE id = ?`)
.get(jobId);
return row?.repository_id ?? '';
};
// Resolve worker script paths relative to this file (build/workers/ directory)
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const workerScript = path.join(__dirname, '../../../build/workers/worker-entry.mjs');
const embedWorkerScript = path.join(__dirname, '../../../build/workers/embed-worker-entry.mjs');
const writeWorkerScript = path.join(__dirname, '../../../build/workers/write-worker-entry.mjs');
const workerScript = resolveWorkerScript('build', 'workers', 'worker-entry.mjs');
const embedWorkerScript = resolveWorkerScript('build', 'workers', 'embed-worker-entry.mjs');
const writeWorkerScript = resolveWorkerScript('build', 'workers', 'write-worker-entry.mjs');
try {
_pool = new WorkerPool({
@@ -113,13 +127,6 @@ export function initializePipeline(
writeWorkerScript,
dbPath: options.dbPath,
onProgress: (jobId, msg) => {
// Update DB with progress
db.prepare(
`UPDATE indexing_jobs
SET stage = ?, stage_detail = ?, progress = ?, processed_files = ?, total_files = ?
WHERE id = ?`
).run(msg.stage, msg.stageDetail ?? null, msg.progress, msg.processedFiles, msg.totalFiles, jobId);
// Broadcast progress event
if (_broadcaster) {
_broadcaster.broadcast(jobId, getRepositoryIdForJob(jobId), 'job-progress', {
@@ -129,11 +136,6 @@ export function initializePipeline(
}
},
onJobDone: (jobId: string) => {
// Update job status to done
db.prepare(`UPDATE indexing_jobs SET status = 'done', completed_at = unixepoch() WHERE id = ?`).run(
jobId
);
// Broadcast done event
if (_broadcaster) {
_broadcaster.broadcast(jobId, getRepositoryIdForJob(jobId), 'job-done', {
@@ -143,11 +145,6 @@ export function initializePipeline(
}
},
onJobFailed: (jobId: string, error: string) => {
// Update job status to failed with error message
db.prepare(
`UPDATE indexing_jobs SET status = 'failed', error = ?, completed_at = unixepoch() WHERE id = ?`
).run(error, jobId);
// Broadcast failed event
if (_broadcaster) {
_broadcaster.broadcast(jobId, getRepositoryIdForJob(jobId), 'job-failed', {
@@ -231,5 +228,3 @@ export function _resetSingletons(): void {
_pool = null;
_broadcaster = null;
}

View File

@@ -5,24 +5,175 @@ import { crawl as githubCrawl } from '$lib/server/crawler/github.crawler.js';
import { LocalCrawler } from '$lib/server/crawler/local.crawler.js';
import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js';
import { IndexingJobEntity, type IndexingJobEntityProps } from '$lib/server/models/indexing-job.js';
import type { ParseWorkerRequest, ParseWorkerResponse, WorkerInitData } from './worker-types.js';
import { applySqlitePragmas } from '$lib/server/db/connection.js';
import type {
ParseWorkerRequest,
ParseWorkerResponse,
SerializedDocument,
SerializedSnippet,
WorkerInitData
} from './worker-types.js';
import type { IndexingStage } from '$lib/types.js';
const { dbPath } = workerData as WorkerInitData;
const db = new Database(dbPath);
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
db.pragma('busy_timeout = 5000');
db.pragma('synchronous = NORMAL');
db.pragma('cache_size = -65536');
db.pragma('temp_store = MEMORY');
db.pragma('mmap_size = 268435456');
db.pragma('wal_autocheckpoint = 1000');
applySqlitePragmas(db);
const pipeline = new IndexingPipeline(db, githubCrawl, new LocalCrawler(), null);
let pendingWrite: {
jobId: string;
resolve: () => void;
reject: (error: Error) => void;
} | null = null;
function serializeDocument(document: {
id: string;
repositoryId: string;
versionId?: string | null;
filePath: string;
title?: string | null;
language?: string | null;
tokenCount?: number | null;
checksum: string;
indexedAt: Date;
}): SerializedDocument {
return {
id: document.id,
repositoryId: document.repositoryId,
versionId: document.versionId ?? null,
filePath: document.filePath,
title: document.title ?? null,
language: document.language ?? null,
tokenCount: document.tokenCount ?? 0,
checksum: document.checksum,
indexedAt: Math.floor(document.indexedAt.getTime() / 1000)
};
}
function serializeSnippet(snippet: {
id: string;
documentId: string;
repositoryId: string;
versionId?: string | null;
type: 'code' | 'info';
title?: string | null;
content: string;
language?: string | null;
breadcrumb?: string | null;
tokenCount?: number | null;
createdAt: Date;
}): SerializedSnippet {
return {
id: snippet.id,
documentId: snippet.documentId,
repositoryId: snippet.repositoryId,
versionId: snippet.versionId ?? null,
type: snippet.type,
title: snippet.title ?? null,
content: snippet.content,
language: snippet.language ?? null,
breadcrumb: snippet.breadcrumb ?? null,
tokenCount: snippet.tokenCount ?? 0,
createdAt: Math.floor(snippet.createdAt.getTime() / 1000)
};
}
function requestWrite(
message: Extract<
ParseWorkerResponse,
{
type:
| 'write_replace'
| 'write_clone'
| 'write_repo_update'
| 'write_version_update'
| 'write_repo_config';
}
>
): Promise<void> {
if (pendingWrite) {
return Promise.reject(new Error(`write request already in flight for ${pendingWrite.jobId}`));
}
return new Promise((resolve, reject) => {
pendingWrite = {
jobId: message.jobId,
resolve: () => {
pendingWrite = null;
resolve();
},
reject: (error: Error) => {
pendingWrite = null;
reject(error);
}
};
parentPort!.postMessage(message);
});
}
const pipeline = new IndexingPipeline(db, githubCrawl, new LocalCrawler(), null, {
persistJobUpdates: false,
replaceSnippets: async (changedDocIds, newDocuments, newSnippets) => {
await requestWrite({
type: 'write_replace',
jobId: currentJobId ?? 'unknown',
changedDocIds,
documents: newDocuments.map(serializeDocument),
snippets: newSnippets.map(serializeSnippet)
});
},
cloneFromAncestor: async (request) => {
await requestWrite({
type: 'write_clone',
jobId: currentJobId ?? 'unknown',
ancestorVersionId: request.ancestorVersionId,
targetVersionId: request.targetVersionId,
repositoryId: request.repositoryId,
unchangedPaths: request.unchangedPaths
});
},
updateRepo: async (repositoryId, fields) => {
await requestWrite({
type: 'write_repo_update',
jobId: currentJobId ?? 'unknown',
repositoryId,
fields
});
},
updateVersion: async (versionId, fields) => {
await requestWrite({
type: 'write_version_update',
jobId: currentJobId ?? 'unknown',
versionId,
fields
});
},
upsertRepoConfig: async (repositoryId, versionId, rules) => {
await requestWrite({
type: 'write_repo_config',
jobId: currentJobId ?? 'unknown',
repositoryId,
versionId,
rules
});
}
});
let currentJobId: string | null = null;
parentPort!.on('message', async (msg: ParseWorkerRequest) => {
if (msg.type === 'write_ack') {
if (pendingWrite?.jobId === msg.jobId) {
pendingWrite.resolve();
}
return;
}
if (msg.type === 'write_error') {
if (pendingWrite?.jobId === msg.jobId) {
pendingWrite.reject(new Error(msg.error));
}
return;
}
if (msg.type === 'shutdown') {
db.close();
process.exit(0);
@@ -35,11 +186,19 @@ parentPort!.on('message', async (msg: ParseWorkerRequest) => {
if (!rawJob) {
throw new Error(`Job ${msg.jobId} not found`);
}
const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob as IndexingJobEntityProps));
const job = IndexingJobMapper.fromEntity(
new IndexingJobEntity(rawJob as IndexingJobEntityProps)
);
await pipeline.run(
job,
(stage: IndexingStage, detail?: string, progress?: number, processedFiles?: number, totalFiles?: number) => {
(
stage: IndexingStage,
detail?: string,
progress?: number,
processedFiles?: number,
totalFiles?: number
) => {
parentPort!.postMessage({
type: 'progress',
jobId: msg.jobId,

View File

@@ -8,7 +8,6 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { writeFileSync, unlinkSync, existsSync } from 'node:fs';
import { EventEmitter } from 'node:events';
// ---------------------------------------------------------------------------
// Hoist FakeWorker + registry so vi.mock can reference them.
@@ -36,7 +35,7 @@ const { createdWorkers, FakeWorker } = vi.hoisted(() => {
this.threadId = 0;
});
constructor(_script: string, _opts?: unknown) {
constructor() {
super();
createdWorkers.push(this);
}
@@ -67,6 +66,7 @@ function makeOpts(overrides: Partial<WorkerPoolOptions> = {}): WorkerPoolOptions
concurrency: 2,
workerScript: FAKE_SCRIPT,
embedWorkerScript: MISSING_SCRIPT,
writeWorkerScript: MISSING_SCRIPT,
dbPath: ':memory:',
onProgress: vi.fn(),
onJobDone: vi.fn(),
@@ -142,6 +142,12 @@ describe('WorkerPool normal mode', () => {
expect(createdWorkers).toHaveLength(3);
});
it('spawns a write worker when writeWorkerScript exists', () => {
new WorkerPool(makeOpts({ concurrency: 2, writeWorkerScript: FAKE_SCRIPT }));
expect(createdWorkers).toHaveLength(3);
});
// -------------------------------------------------------------------------
// enqueue dispatches to an idle worker
// -------------------------------------------------------------------------
@@ -208,8 +214,12 @@ describe('WorkerPool normal mode', () => {
const runCalls = createdWorkers.flatMap((w) =>
w.postMessage.mock.calls.filter((c) => (c[0] as { type: string })?.type === 'run')
);
expect(runCalls.filter((c) => (c[0] as unknown as { jobId: string }).jobId === 'job-1')).toHaveLength(1);
expect(runCalls.filter((c) => (c[0] as unknown as { jobId: string }).jobId === 'job-2')).toHaveLength(0);
expect(
runCalls.filter((c) => (c[0] as unknown as { jobId: string }).jobId === 'job-1')
).toHaveLength(1);
expect(
runCalls.filter((c) => (c[0] as unknown as { jobId: string }).jobId === 'job-2')
).toHaveLength(0);
});
it('starts jobs for different repos concurrently', () => {
@@ -227,6 +237,83 @@ describe('WorkerPool normal mode', () => {
expect(dispatchedIds).toContain('job-beta');
});
it('dispatches same-repo jobs concurrently when versionIds differ', () => {
const pool = new WorkerPool(makeOpts({ concurrency: 2 }));
pool.enqueue('job-v1', '/repo/same', 'v1');
pool.enqueue('job-v2', '/repo/same', 'v2');
const runCalls = createdWorkers.flatMap((w) =>
w.postMessage.mock.calls.filter((c) => (c[0] as { type: string })?.type === 'run')
);
const dispatchedIds = runCalls.map((c) => (c[0] as unknown as { jobId: string }).jobId);
expect(dispatchedIds).toContain('job-v1');
expect(dispatchedIds).toContain('job-v2');
});
it('forwards write worker acknowledgements back to the originating parse worker', () => {
new WorkerPool(makeOpts({ concurrency: 1, writeWorkerScript: FAKE_SCRIPT }));
const parseWorker = createdWorkers[0];
const writeWorker = createdWorkers[1];
writeWorker.emit('message', { type: 'ready' });
parseWorker.emit('message', {
type: 'write_replace',
jobId: 'job-write',
changedDocIds: [],
documents: [],
snippets: []
});
writeWorker.emit('message', { type: 'write_ack', jobId: 'job-write' });
expect(writeWorker.postMessage).toHaveBeenCalledWith({
type: 'write_replace',
jobId: 'job-write',
changedDocIds: [],
documents: [],
snippets: []
});
expect(parseWorker.postMessage).toHaveBeenCalledWith({ type: 'write_ack', jobId: 'job-write' });
});
it('forwards write worker acknowledgements back to the embed worker', () => {
new WorkerPool(
makeOpts({
concurrency: 1,
writeWorkerScript: FAKE_SCRIPT,
embedWorkerScript: FAKE_SCRIPT,
embeddingProfileId: 'local-default'
})
);
const parseWorker = createdWorkers[0];
const embedWorker = createdWorkers[1];
const writeWorker = createdWorkers[2];
writeWorker.emit('message', { type: 'ready' });
embedWorker.emit('message', { type: 'ready' });
embedWorker.emit('message', {
type: 'write_embeddings',
jobId: 'job-embed',
embeddings: []
});
writeWorker.emit('message', { type: 'write_ack', jobId: 'job-embed', embeddingCount: 0 });
expect(parseWorker.postMessage).not.toHaveBeenCalledWith({
type: 'write_ack',
jobId: 'job-embed'
});
expect(writeWorker.postMessage).toHaveBeenCalledWith({
type: 'write_embeddings',
jobId: 'job-embed',
embeddings: []
});
expect(embedWorker.postMessage).toHaveBeenCalledWith({
type: 'write_ack',
jobId: 'job-embed',
embeddingCount: 0
});
});
// -------------------------------------------------------------------------
// Worker crash (exit code != 0)
// -------------------------------------------------------------------------
@@ -248,7 +335,7 @@ describe('WorkerPool normal mode', () => {
it('does NOT call onJobFailed when a worker exits cleanly (code 0)', () => {
const opts = makeOpts({ concurrency: 1 });
const pool = new WorkerPool(opts);
new WorkerPool(opts);
// Exit without any running job
const worker = createdWorkers[0];

View File

@@ -6,9 +6,12 @@ import type {
EmbedWorkerRequest,
EmbedWorkerResponse,
WorkerInitData,
WriteWorkerRequest,
WriteWorkerResponse
} from './worker-types.js';
type InFlightWriteRequest = Exclude<WriteWorkerRequest, { type: 'shutdown' }>;
export interface WorkerPoolOptions {
concurrency: number;
workerScript: string;
@@ -68,6 +71,7 @@ export class WorkerPool {
private runningJobs = new Map<Worker, RunningJob>();
private runningJobKeys = new Set<string>();
private embedQueue: EmbedQueuedJob[] = [];
private pendingWriteWorkers = new Map<string, Worker>();
private options: WorkerPoolOptions;
private fallbackMode = false;
private shuttingDown = false;
@@ -179,7 +183,11 @@ export class WorkerPool {
const job = this.jobQueue.splice(jobIdx, 1)[0];
const worker = this.idleWorkers.pop()!;
this.runningJobs.set(worker, { jobId: job.jobId, repositoryId: job.repositoryId, versionId: job.versionId });
this.runningJobs.set(worker, {
jobId: job.jobId,
repositoryId: job.repositoryId,
versionId: job.versionId
});
this.runningJobKeys.add(WorkerPool.jobKey(job.repositoryId, job.versionId));
statusChanged = true;
@@ -192,14 +200,66 @@ export class WorkerPool {
}
}
private postWriteRequest(request: InFlightWriteRequest, worker?: Worker): void {
if (!this.writeWorker || !this.writeReady) {
if (worker) {
worker.postMessage({
type: 'write_error',
jobId: request.jobId,
error: 'Write worker is not ready'
} satisfies ParseWorkerRequest);
}
return;
}
if (worker) {
this.pendingWriteWorkers.set(request.jobId, worker);
}
this.writeWorker.postMessage(request);
}
private onWorkerMessage(worker: Worker, msg: ParseWorkerResponse): void {
if (msg.type === 'progress') {
this.postWriteRequest({
type: 'write_job_update',
jobId: msg.jobId,
fields: {
status: 'running',
startedAt: Math.floor(Date.now() / 1000),
stage: msg.stage,
stageDetail: msg.stageDetail ?? null,
progress: msg.progress,
processedFiles: msg.processedFiles,
totalFiles: msg.totalFiles
}
});
this.options.onProgress(msg.jobId, msg);
} else if (
msg.type === 'write_replace' ||
msg.type === 'write_clone' ||
msg.type === 'write_repo_update' ||
msg.type === 'write_version_update' ||
msg.type === 'write_repo_config'
) {
this.postWriteRequest(msg, worker);
} else if (msg.type === 'done') {
const runningJob = this.runningJobs.get(worker);
this.postWriteRequest({
type: 'write_job_update',
jobId: msg.jobId,
fields: {
status: 'done',
stage: 'done',
progress: 100,
completedAt: Math.floor(Date.now() / 1000)
}
});
if (runningJob) {
this.runningJobs.delete(worker);
this.runningJobKeys.delete(WorkerPool.jobKey(runningJob.repositoryId, runningJob.versionId));
this.runningJobKeys.delete(
WorkerPool.jobKey(runningJob.repositoryId, runningJob.versionId)
);
}
this.idleWorkers.push(worker);
this.options.onJobDone(msg.jobId);
@@ -207,20 +267,32 @@ export class WorkerPool {
// If embedding configured, enqueue embed request
if (this.embedWorker && this.options.embeddingProfileId) {
const runningJobData = runningJob || { jobId: msg.jobId, repositoryId: '', versionId: null };
this.enqueueEmbed(
msg.jobId,
runningJobData.repositoryId,
runningJobData.versionId ?? null
);
const runningJobData = runningJob || {
jobId: msg.jobId,
repositoryId: '',
versionId: null
};
this.enqueueEmbed(msg.jobId, runningJobData.repositoryId, runningJobData.versionId ?? null);
}
this.dispatch();
} else if (msg.type === 'failed') {
const runningJob = this.runningJobs.get(worker);
this.postWriteRequest({
type: 'write_job_update',
jobId: msg.jobId,
fields: {
status: 'failed',
stage: 'failed',
error: msg.error,
completedAt: Math.floor(Date.now() / 1000)
}
});
if (runningJob) {
this.runningJobs.delete(worker);
this.runningJobKeys.delete(WorkerPool.jobKey(runningJob.repositoryId, runningJob.versionId));
this.runningJobKeys.delete(
WorkerPool.jobKey(runningJob.repositoryId, runningJob.versionId)
);
}
this.idleWorkers.push(worker);
this.options.onJobFailed(msg.jobId, msg.error);
@@ -273,6 +345,22 @@ export class WorkerPool {
this.embedReady = true;
// Process any queued embed requests
this.processEmbedQueue();
} else if (msg.type === 'write_embeddings') {
const embedWorker = this.embedWorker;
if (!embedWorker) {
return;
}
if (!this.writeWorker || !this.writeReady) {
embedWorker.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: 'Write worker is not ready'
} satisfies EmbedWorkerRequest);
return;
}
this.postWriteRequest(msg, embedWorker);
} else if (msg.type === 'embed-progress') {
// Progress message - could be tracked but not strictly required
} else if (msg.type === 'embed-done') {
@@ -288,6 +376,12 @@ export class WorkerPool {
return;
}
const worker = this.pendingWriteWorkers.get(msg.jobId);
if (worker) {
this.pendingWriteWorkers.delete(msg.jobId);
worker.postMessage(msg satisfies ParseWorkerRequest);
}
if (msg.type === 'write_error') {
console.error('[WorkerPool] Write worker failed for job:', msg.jobId, msg.error);
}
@@ -433,6 +527,7 @@ export class WorkerPool {
this.idleWorkers = [];
this.embedWorker = null;
this.writeWorker = null;
this.pendingWriteWorkers.clear();
this.emitStatusChanged();
}

View File

@@ -2,29 +2,58 @@ import type { IndexingStage } from '$lib/types.js';
export type ParseWorkerRequest =
| { type: 'run'; jobId: string }
| { type: 'write_ack'; jobId: string }
| { type: 'write_error'; jobId: string; error: string }
| { type: 'shutdown' };
export type ParseWorkerResponse =
| { type: 'progress'; jobId: string; stage: IndexingStage; stageDetail?: string; progress: number; processedFiles: number; totalFiles: number }
| {
type: 'progress';
jobId: string;
stage: IndexingStage;
stageDetail?: string;
progress: number;
processedFiles: number;
totalFiles: number;
}
| { type: 'done'; jobId: string }
| { type: 'failed'; jobId: string; error: string };
| { type: 'failed'; jobId: string; error: string }
| WriteReplaceRequest
| WriteCloneRequest
| WriteRepoUpdateRequest
| WriteVersionUpdateRequest
| WriteRepoConfigRequest;
export type EmbedWorkerRequest =
| { type: 'embed'; jobId: string; repositoryId: string; versionId: string | null }
| {
type: 'write_ack';
jobId: string;
documentCount?: number;
snippetCount?: number;
embeddingCount?: number;
}
| { type: 'write_error'; jobId: string; error: string }
| { type: 'shutdown' };
export type EmbedWorkerResponse =
| { type: 'ready' }
| { type: 'embed-progress'; jobId: string; done: number; total: number }
| { type: 'embed-done'; jobId: string }
| { type: 'embed-failed'; jobId: string; error: string };
| { type: 'embed-failed'; jobId: string; error: string }
| WriteEmbeddingsRequest;
export type WriteWorkerRequest = WriteRequest | { type: 'shutdown' };
export type WriteWorkerRequest =
| ReplaceWriteRequest
| CloneWriteRequest
| JobUpdateWriteRequest
| RepoUpdateWriteRequest
| VersionUpdateWriteRequest
| RepoConfigWriteRequest
| EmbeddingsWriteRequest
| { type: 'shutdown' };
export type WriteWorkerResponse =
| { type: 'ready' }
| WriteAck
| WriteError;
export type WriteWorkerResponse = { type: 'ready' } | WriteAck | WriteError;
export interface WorkerInitData {
dbPath: string;
@@ -58,18 +87,84 @@ export interface SerializedSnippet {
createdAt: number;
}
export type WriteRequest = {
type: 'write';
export interface SerializedEmbedding {
snippetId: string;
profileId: string;
model: string;
dimensions: number;
embedding: Uint8Array;
}
export type SerializedFieldValue = string | number | null;
export type SerializedFields = Record<string, SerializedFieldValue>;
export type ReplaceWriteRequest = {
type: 'write_replace';
jobId: string;
changedDocIds: string[];
documents: SerializedDocument[];
snippets: SerializedSnippet[];
};
export type CloneWriteRequest = {
type: 'write_clone';
jobId: string;
ancestorVersionId: string;
targetVersionId: string;
repositoryId: string;
unchangedPaths: string[];
};
export type WriteReplaceRequest = ReplaceWriteRequest;
export type WriteCloneRequest = CloneWriteRequest;
export type EmbeddingsWriteRequest = {
type: 'write_embeddings';
jobId: string;
embeddings: SerializedEmbedding[];
};
export type RepoUpdateWriteRequest = {
type: 'write_repo_update';
jobId: string;
repositoryId: string;
fields: SerializedFields;
};
export type VersionUpdateWriteRequest = {
type: 'write_version_update';
jobId: string;
versionId: string;
fields: SerializedFields;
};
export type RepoConfigWriteRequest = {
type: 'write_repo_config';
jobId: string;
repositoryId: string;
versionId: string | null;
rules: string[];
};
export type JobUpdateWriteRequest = {
type: 'write_job_update';
jobId: string;
fields: SerializedFields;
};
export type WriteEmbeddingsRequest = EmbeddingsWriteRequest;
export type WriteRepoUpdateRequest = RepoUpdateWriteRequest;
export type WriteVersionUpdateRequest = VersionUpdateWriteRequest;
export type WriteRepoConfigRequest = RepoConfigWriteRequest;
export type WriteAck = {
type: 'write_ack';
jobId: string;
documentCount: number;
snippetCount: number;
documentCount?: number;
snippetCount?: number;
embeddingCount?: number;
};
export type WriteError = {

View File

@@ -0,0 +1,343 @@
import { randomUUID } from 'node:crypto';
import type Database from 'better-sqlite3';
import type { NewDocument, NewSnippet } from '$lib/types';
import { SqliteVecStore } from '$lib/server/search/sqlite-vec.store.js';
import type {
SerializedDocument,
SerializedEmbedding,
SerializedFields,
SerializedSnippet
} from './worker-types.js';
type DocumentLike = Pick<
NewDocument,
| 'id'
| 'repositoryId'
| 'versionId'
| 'filePath'
| 'title'
| 'language'
| 'tokenCount'
| 'checksum'
> & {
indexedAt: Date | number;
};
type SnippetLike = Pick<
NewSnippet,
| 'id'
| 'documentId'
| 'repositoryId'
| 'versionId'
| 'type'
| 'title'
| 'content'
| 'language'
| 'breadcrumb'
| 'tokenCount'
> & {
createdAt: Date | number;
};
export interface CloneFromAncestorRequest {
ancestorVersionId: string;
targetVersionId: string;
repositoryId: string;
unchangedPaths: string[];
}
export interface PersistedEmbedding {
snippetId: string;
profileId: string;
model: string;
dimensions: number;
embedding: Buffer | Uint8Array;
}
function toEpochSeconds(value: Date | number): number {
return value instanceof Date ? Math.floor(value.getTime() / 1000) : value;
}
function toSnake(key: string): string {
return key.replace(/[A-Z]/g, (char) => `_${char.toLowerCase()}`);
}
function replaceSnippetsInternal(
db: Database.Database,
changedDocIds: string[],
newDocuments: DocumentLike[],
newSnippets: SnippetLike[]
): void {
const sqliteVecStore = new SqliteVecStore(db);
const insertDoc = db.prepare(
`INSERT INTO documents
(id, repository_id, version_id, file_path, title, language,
token_count, checksum, indexed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
const insertSnippet = db.prepare(
`INSERT INTO snippets
(id, document_id, repository_id, version_id, type, title,
content, language, breadcrumb, token_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
db.transaction(() => {
sqliteVecStore.deleteEmbeddingsForDocumentIds(changedDocIds);
if (changedDocIds.length > 0) {
const placeholders = changedDocIds.map(() => '?').join(',');
db.prepare(`DELETE FROM documents WHERE id IN (${placeholders})`).run(...changedDocIds);
}
for (const doc of newDocuments) {
insertDoc.run(
doc.id,
doc.repositoryId,
doc.versionId ?? null,
doc.filePath,
doc.title ?? null,
doc.language ?? null,
doc.tokenCount ?? 0,
doc.checksum,
toEpochSeconds(doc.indexedAt)
);
}
for (const snippet of newSnippets) {
insertSnippet.run(
snippet.id,
snippet.documentId,
snippet.repositoryId,
snippet.versionId ?? null,
snippet.type,
snippet.title ?? null,
snippet.content,
snippet.language ?? null,
snippet.breadcrumb ?? null,
snippet.tokenCount ?? 0,
toEpochSeconds(snippet.createdAt)
);
}
})();
}
export function replaceSnippets(
db: Database.Database,
changedDocIds: string[],
newDocuments: NewDocument[],
newSnippets: NewSnippet[]
): void {
replaceSnippetsInternal(db, changedDocIds, newDocuments, newSnippets);
}
export function replaceSerializedSnippets(
db: Database.Database,
changedDocIds: string[],
documents: SerializedDocument[],
snippets: SerializedSnippet[]
): void {
replaceSnippetsInternal(db, changedDocIds, documents, snippets);
}
export function cloneFromAncestor(db: Database.Database, request: CloneFromAncestorRequest): void {
const sqliteVecStore = new SqliteVecStore(db);
const { ancestorVersionId, targetVersionId, repositoryId, unchangedPaths } = request;
db.transaction(() => {
const pathList = [...unchangedPaths];
if (pathList.length === 0) {
return;
}
const placeholders = pathList.map(() => '?').join(',');
const ancestorDocs = db
.prepare(`SELECT * FROM documents WHERE version_id = ? AND file_path IN (${placeholders})`)
.all(ancestorVersionId, ...pathList) as Array<{
id: string;
repository_id: string;
file_path: string;
title: string | null;
language: string | null;
token_count: number;
checksum: string;
indexed_at: number;
}>;
const docIdMap = new Map<string, string>();
const nowEpoch = Math.floor(Date.now() / 1000);
for (const doc of ancestorDocs) {
const newDocId = randomUUID();
docIdMap.set(doc.id, newDocId);
db.prepare(
`INSERT INTO documents (id, repository_id, version_id, file_path, title, language, token_count, checksum, indexed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
).run(
newDocId,
repositoryId,
targetVersionId,
doc.file_path,
doc.title,
doc.language,
doc.token_count,
doc.checksum,
nowEpoch
);
}
if (docIdMap.size === 0) return;
const oldDocIds = [...docIdMap.keys()];
const snippetPlaceholders = oldDocIds.map(() => '?').join(',');
const ancestorSnippets = db
.prepare(`SELECT * FROM snippets WHERE document_id IN (${snippetPlaceholders})`)
.all(...oldDocIds) as Array<{
id: string;
document_id: string;
repository_id: string;
version_id: string | null;
type: string;
title: string | null;
content: string;
language: string | null;
breadcrumb: string | null;
token_count: number;
created_at: number;
}>;
const snippetIdMap = new Map<string, string>();
for (const snippet of ancestorSnippets) {
const newSnippetId = randomUUID();
snippetIdMap.set(snippet.id, newSnippetId);
const newDocId = docIdMap.get(snippet.document_id)!;
db.prepare(
`INSERT INTO snippets (id, document_id, repository_id, version_id, type, title, content, language, breadcrumb, token_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
).run(
newSnippetId,
newDocId,
repositoryId,
targetVersionId,
snippet.type,
snippet.title,
snippet.content,
snippet.language,
snippet.breadcrumb,
snippet.token_count,
snippet.created_at
);
}
if (snippetIdMap.size === 0) {
return;
}
const oldSnippetIds = [...snippetIdMap.keys()];
const embPlaceholders = oldSnippetIds.map(() => '?').join(',');
const ancestorEmbeddings = db
.prepare(`SELECT * FROM snippet_embeddings WHERE snippet_id IN (${embPlaceholders})`)
.all(...oldSnippetIds) as Array<{
snippet_id: string;
profile_id: string;
model: string;
dimensions: number;
embedding: Buffer;
created_at: number;
}>;
for (const emb of ancestorEmbeddings) {
const newSnippetId = snippetIdMap.get(emb.snippet_id)!;
db.prepare(
`INSERT INTO snippet_embeddings (snippet_id, profile_id, model, dimensions, embedding, created_at)
VALUES (?, ?, ?, ?, ?, ?)`
).run(newSnippetId, emb.profile_id, emb.model, emb.dimensions, emb.embedding, emb.created_at);
sqliteVecStore.upsertEmbeddingBuffer(
emb.profile_id,
newSnippetId,
emb.embedding,
emb.dimensions
);
}
})();
}
export function upsertEmbeddings(db: Database.Database, embeddings: PersistedEmbedding[]): void {
if (embeddings.length === 0) {
return;
}
const sqliteVecStore = new SqliteVecStore(db);
const insert = db.prepare<[string, string, string, number, Buffer]>(`
INSERT OR REPLACE INTO snippet_embeddings (snippet_id, profile_id, model, dimensions, embedding, created_at)
VALUES (?, ?, ?, ?, ?, unixepoch())
`);
db.transaction(() => {
for (const item of embeddings) {
const embeddingBuffer = Buffer.isBuffer(item.embedding)
? item.embedding
: Buffer.from(item.embedding);
insert.run(item.snippetId, item.profileId, item.model, item.dimensions, embeddingBuffer);
sqliteVecStore.upsertEmbeddingBuffer(
item.profileId,
item.snippetId,
embeddingBuffer,
item.dimensions
);
}
})();
}
export function upsertSerializedEmbeddings(
db: Database.Database,
embeddings: SerializedEmbedding[]
): void {
upsertEmbeddings(
db,
embeddings.map((item) => ({
snippetId: item.snippetId,
profileId: item.profileId,
model: item.model,
dimensions: item.dimensions,
embedding: item.embedding
}))
);
}
export function updateRepo(
db: Database.Database,
repositoryId: string,
fields: SerializedFields
): void {
const now = Math.floor(Date.now() / 1000);
const allFields = { ...fields, updatedAt: now };
const sets = Object.keys(allFields)
.map((key) => `${toSnake(key)} = ?`)
.join(', ');
const values = [...Object.values(allFields), repositoryId];
db.prepare(`UPDATE repositories SET ${sets} WHERE id = ?`).run(...values);
}
export function updateJob(db: Database.Database, jobId: string, fields: SerializedFields): void {
const sets = Object.keys(fields)
.map((key) => `${toSnake(key)} = ?`)
.join(', ');
const values = [...Object.values(fields), jobId];
db.prepare(`UPDATE indexing_jobs SET ${sets} WHERE id = ?`).run(...values);
}
export function updateVersion(
db: Database.Database,
versionId: string,
fields: SerializedFields
): void {
const sets = Object.keys(fields)
.map((key) => `${toSnake(key)} = ?`)
.join(', ');
const values = [...Object.values(fields), versionId];
db.prepare(`UPDATE repository_versions SET ${sets} WHERE id = ?`).run(...values);
}

View File

@@ -1,67 +1,21 @@
import { workerData, parentPort } from 'node:worker_threads';
import Database from 'better-sqlite3';
import type {
SerializedDocument,
SerializedSnippet,
WorkerInitData,
WriteWorkerRequest,
WriteWorkerResponse
} from './worker-types.js';
import { applySqlitePragmas } from '$lib/server/db/connection.js';
import { loadSqliteVec } from '$lib/server/db/sqlite-vec.js';
import type { WorkerInitData, WriteWorkerRequest, WriteWorkerResponse } from './worker-types.js';
import {
cloneFromAncestor,
replaceSerializedSnippets,
updateJob,
updateRepo,
updateVersion,
upsertSerializedEmbeddings
} from './write-operations.js';
const { dbPath } = workerData as WorkerInitData;
const db = new Database(dbPath);
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
db.pragma('busy_timeout = 5000');
db.pragma('synchronous = NORMAL');
db.pragma('cache_size = -65536');
db.pragma('temp_store = MEMORY');
db.pragma('mmap_size = 268435456');
db.pragma('wal_autocheckpoint = 1000');
const insertDocument = db.prepare(
`INSERT OR REPLACE INTO documents
(id, repository_id, version_id, file_path, title, language, token_count, checksum, indexed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
const insertSnippet = db.prepare(
`INSERT OR REPLACE INTO snippets
(id, document_id, repository_id, version_id, type, title, content, language, breadcrumb, token_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
);
const writeBatch = db.transaction((documents: SerializedDocument[], snippets: SerializedSnippet[]) => {
for (const document of documents) {
insertDocument.run(
document.id,
document.repositoryId,
document.versionId,
document.filePath,
document.title,
document.language,
document.tokenCount,
document.checksum,
document.indexedAt
);
}
for (const snippet of snippets) {
insertSnippet.run(
snippet.id,
snippet.documentId,
snippet.repositoryId,
snippet.versionId,
snippet.type,
snippet.title,
snippet.content,
snippet.language,
snippet.breadcrumb,
snippet.tokenCount,
snippet.createdAt
);
}
});
applySqlitePragmas(db);
loadSqliteVec(db);
parentPort?.postMessage({ type: 'ready' } satisfies WriteWorkerResponse);
@@ -71,23 +25,145 @@ parentPort?.on('message', (msg: WriteWorkerRequest) => {
process.exit(0);
}
if (msg.type !== 'write') {
if (msg.type === 'write_replace') {
try {
replaceSerializedSnippets(db, msg.changedDocIds, msg.documents, msg.snippets);
parentPort?.postMessage({
type: 'write_ack',
jobId: msg.jobId,
documentCount: msg.documents.length,
snippetCount: msg.snippets.length
} satisfies WriteWorkerResponse);
} catch (error) {
parentPort?.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: error instanceof Error ? error.message : String(error)
} satisfies WriteWorkerResponse);
}
return;
}
try {
writeBatch(msg.documents, msg.snippets);
parentPort?.postMessage({
type: 'write_ack',
jobId: msg.jobId,
documentCount: msg.documents.length,
snippetCount: msg.snippets.length
} satisfies WriteWorkerResponse);
} catch (error) {
parentPort?.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: error instanceof Error ? error.message : String(error)
} satisfies WriteWorkerResponse);
if (msg.type === 'write_clone') {
try {
cloneFromAncestor(db, {
ancestorVersionId: msg.ancestorVersionId,
targetVersionId: msg.targetVersionId,
repositoryId: msg.repositoryId,
unchangedPaths: msg.unchangedPaths
});
parentPort?.postMessage({
type: 'write_ack',
jobId: msg.jobId
} satisfies WriteWorkerResponse);
} catch (error) {
parentPort?.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: error instanceof Error ? error.message : String(error)
} satisfies WriteWorkerResponse);
}
return;
}
});
if (msg.type === 'write_embeddings') {
try {
upsertSerializedEmbeddings(db, msg.embeddings);
parentPort?.postMessage({
type: 'write_ack',
jobId: msg.jobId,
embeddingCount: msg.embeddings.length
} satisfies WriteWorkerResponse);
} catch (error) {
parentPort?.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: error instanceof Error ? error.message : String(error)
} satisfies WriteWorkerResponse);
}
return;
}
if (msg.type === 'write_job_update') {
try {
updateJob(db, msg.jobId, msg.fields);
parentPort?.postMessage({
type: 'write_ack',
jobId: msg.jobId
} satisfies WriteWorkerResponse);
} catch (error) {
parentPort?.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: error instanceof Error ? error.message : String(error)
} satisfies WriteWorkerResponse);
}
return;
}
if (msg.type === 'write_repo_update') {
try {
updateRepo(db, msg.repositoryId, msg.fields);
parentPort?.postMessage({
type: 'write_ack',
jobId: msg.jobId
} satisfies WriteWorkerResponse);
} catch (error) {
parentPort?.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: error instanceof Error ? error.message : String(error)
} satisfies WriteWorkerResponse);
}
return;
}
if (msg.type === 'write_version_update') {
try {
updateVersion(db, msg.versionId, msg.fields);
parentPort?.postMessage({
type: 'write_ack',
jobId: msg.jobId
} satisfies WriteWorkerResponse);
} catch (error) {
parentPort?.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: error instanceof Error ? error.message : String(error)
} satisfies WriteWorkerResponse);
}
return;
}
if (msg.type === 'write_repo_config') {
try {
const now = Math.floor(Date.now() / 1000);
if (msg.versionId === null) {
db.prepare(
`DELETE FROM repository_configs WHERE repository_id = ? AND version_id IS NULL`
).run(msg.repositoryId);
} else {
db.prepare(`DELETE FROM repository_configs WHERE repository_id = ? AND version_id = ?`).run(
msg.repositoryId,
msg.versionId
);
}
db.prepare(
`INSERT INTO repository_configs (repository_id, version_id, rules, updated_at)
VALUES (?, ?, ?, ?)`
).run(msg.repositoryId, msg.versionId, JSON.stringify(msg.rules), now);
parentPort?.postMessage({
type: 'write_ack',
jobId: msg.jobId
} satisfies WriteWorkerResponse);
} catch (error) {
parentPort?.postMessage({
type: 'write_error',
jobId: msg.jobId,
error: error instanceof Error ? error.message : String(error)
} satisfies WriteWorkerResponse);
}
}
});