Files
trueref/docs/features/TRUEREF-0009.md
2026-03-22 17:08:15 +01:00

331 lines
8.8 KiB
Markdown

# TRUEREF-0009 — Indexing Pipeline & Job Queue
**Priority:** P0
**Status:** Pending
**Depends On:** TRUEREF-0003, TRUEREF-0004, TRUEREF-0005, TRUEREF-0001
**Blocks:** TRUEREF-0010, TRUEREF-0015
---
## Overview
Implement the end-to-end indexing pipeline that orchestrates crawling, parsing, storage, and embedding generation for a repository. Uses a SQLite-backed job queue with sequential processing. Each indexing run is atomic — new data replaces old data only upon successful completion.
---
## Acceptance Criteria
- [ ] `IndexingPipeline` class that orchestrates the full indexing flow
- [ ] SQLite-backed `JobQueue` (no external message broker)
- [ ] Atomic snippet replacement: old snippets deleted only after new ones are stored successfully
- [ ] Progress tracked in `indexing_jobs` table (processedFiles, totalFiles, progress 0-100)
- [ ] `GET /api/v1/jobs/:id` — get job status and progress
- [ ] `GET /api/v1/jobs` — list recent jobs (with filtering by repositoryId)
- [ ] Jobs run sequentially (one at a time) to avoid SQLite write contention
- [ ] Graceful error handling: job marked as failed with error message, existing data preserved
- [ ] Server startup: resume any `running` jobs that were interrupted
- [ ] Unit tests for pipeline stages and job queue
---
## Pipeline Stages
```
1. Create IndexingJob (status: queued)
2. Dequeue job (status: running, startedAt: now)
3. Fetch trueref.json config (if exists)
4. Crawl repository (GitHub or local)
→ Update job.totalFiles
5. For each file:
a. Check checksum against existing document
b. If unchanged, skip (reuse existing snippets)
c. If changed/new: parse into snippets
d. Buffer new snippets in memory
e. Update job.processedFiles + job.progress
6. Transaction: Delete old snippets/documents → Insert new ones
7. Generate embeddings for new snippets (if provider configured)
8. Update repository stats (totalSnippets, totalTokens, trustScore, lastIndexedAt)
9. Mark job as done (status: done, completedAt: now)
```
---
## Job Queue Implementation
```typescript
// src/lib/server/pipeline/job-queue.ts
export class JobQueue {
private isRunning = false;
constructor(private db: BetterSQLite3.Database) {}
enqueue(repositoryId: string, versionId?: string): IndexingJob {
const job: NewIndexingJob = {
id: crypto.randomUUID(),
repositoryId,
versionId: versionId ?? null,
status: 'queued',
progress: 0,
totalFiles: 0,
processedFiles: 0,
error: null,
startedAt: null,
completedAt: null,
createdAt: new Date(),
};
this.db.prepare(`
INSERT INTO indexing_jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(...Object.values(job));
// Kick off processing if not already running
if (!this.isRunning) {
setImmediate(() => this.processNext());
}
return job;
}
private async processNext(): Promise<void> {
if (this.isRunning) return;
const job = this.db.prepare(`
SELECT * FROM indexing_jobs
WHERE status = 'queued'
ORDER BY created_at ASC
LIMIT 1
`).get() as IndexingJob | undefined;
if (!job) return;
this.isRunning = true;
try {
await this.pipeline.run(job);
} finally {
this.isRunning = false;
// Check for next queued job
const nextJob = this.db.prepare(
`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`
).get();
if (nextJob) setImmediate(() => this.processNext());
}
}
getJob(id: string): IndexingJob | null {
return this.db.prepare(
`SELECT * FROM indexing_jobs WHERE id = ?`
).get(id) as IndexingJob | null;
}
listJobs(repositoryId?: string, limit = 20): IndexingJob[] {
const query = repositoryId
? `SELECT * FROM indexing_jobs WHERE repository_id = ? ORDER BY created_at DESC LIMIT ?`
: `SELECT * FROM indexing_jobs ORDER BY created_at DESC LIMIT ?`;
const params = repositoryId ? [repositoryId, limit] : [limit];
return this.db.prepare(query).all(...params) as IndexingJob[];
}
}
```
---
## Indexing Pipeline
```typescript
// src/lib/server/pipeline/indexing.pipeline.ts
export class IndexingPipeline {
constructor(
private db: BetterSQLite3.Database,
private githubCrawler: GitHubCrawler,
private localCrawler: LocalCrawler,
private embeddingService: EmbeddingService | null,
) {}
async run(job: IndexingJob): Promise<void> {
this.updateJob(job.id, { status: 'running', startedAt: new Date() });
try {
const repo = this.getRepository(job.repositoryId);
if (!repo) throw new Error(`Repository ${job.repositoryId} not found`);
// Update repo state
this.updateRepo(repo.id, { state: 'indexing' });
// Step 1: Crawl
const crawlResult = await this.crawl(repo, job);
// Step 2: Parse and diff
const { newSnippets, changedDocIds, newDocuments } =
await this.parseAndDiff(crawlResult, repo, job);
// Step 3: Atomic replacement
this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
// Step 4: Embeddings (async, non-blocking for job completion)
if (this.embeddingService && newSnippets.length > 0) {
await this.embeddingService.embedSnippets(
newSnippets.map(s => s.id),
(done, total) => {
// Update job progress for embedding phase
}
);
}
// Step 5: Update repo stats
const stats = this.computeStats(repo.id);
this.updateRepo(repo.id, {
state: 'indexed',
totalSnippets: stats.totalSnippets,
totalTokens: stats.totalTokens,
trustScore: computeTrustScore({ ...repo, ...stats }),
lastIndexedAt: new Date(),
});
this.updateJob(job.id, {
status: 'done',
progress: 100,
completedAt: new Date(),
});
} catch (error) {
this.updateJob(job.id, {
status: 'failed',
error: (error as Error).message,
completedAt: new Date(),
});
this.updateRepo(job.repositoryId, { state: 'error' });
throw error;
}
}
private replaceSnippets(
repositoryId: string,
changedDocIds: string[],
newDocuments: NewDocument[],
newSnippets: NewSnippet[]
): void {
// Single transaction: delete old → insert new
this.db.transaction(() => {
if (changedDocIds.length > 0) {
// Cascade deletes snippets via FK constraint
this.db.prepare(
`DELETE FROM documents WHERE id IN (${changedDocIds.map(() => '?').join(',')})`
).run(...changedDocIds);
}
for (const doc of newDocuments) {
this.insertDocument(doc);
}
for (const snippet of newSnippets) {
this.insertSnippet(snippet);
}
})();
}
}
```
---
## Progress Calculation
```typescript
function calculateProgress(
processedFiles: number,
totalFiles: number,
embeddingsDone: number,
embeddingsTotal: number,
hasEmbeddings: boolean
): number {
if (totalFiles === 0) return 0;
if (!hasEmbeddings) {
// Crawl + parse = 100%
return Math.round((processedFiles / totalFiles) * 100);
}
// Crawl+parse = 80%, embeddings = 20%
const parseProgress = (processedFiles / totalFiles) * 80;
const embedProgress = embeddingsTotal > 0
? (embeddingsDone / embeddingsTotal) * 20
: 0;
return Math.round(parseProgress + embedProgress);
}
```
---
## API Endpoints
### `GET /api/v1/jobs/:id`
Response `200`:
```json
{
"job": {
"id": "uuid",
"repositoryId": "/facebook/react",
"status": "running",
"progress": 47,
"totalFiles": 342,
"processedFiles": 162,
"error": null,
"startedAt": "2026-03-22T10:00:00Z",
"completedAt": null,
"createdAt": "2026-03-22T09:59:55Z"
}
}
```
### `GET /api/v1/jobs`
Query params: `repositoryId` (optional), `status` (optional), `limit` (default 20).
Response `200`:
```json
{
"jobs": [...],
"total": 5
}
```
---
## Server Startup Recovery
On application start, mark any jobs in `running` state as `failed` (they were interrupted by a process crash):
```typescript
function recoverStaleJobs(db: BetterSQLite3.Database): void {
db.prepare(`
UPDATE indexing_jobs
SET status = 'failed',
error = 'Server restarted while job was running',
completed_at = unixepoch()
WHERE status = 'running'
`).run();
// Also reset any repositories stuck in 'indexing' state
db.prepare(`
UPDATE repositories
SET state = 'error'
WHERE state = 'indexing'
`).run();
}
```
---
## Files to Create
- `src/lib/server/pipeline/indexing.pipeline.ts`
- `src/lib/server/pipeline/job-queue.ts`
- `src/lib/server/pipeline/startup.ts` — recovery + queue initialization
- `src/routes/api/v1/jobs/+server.ts` — GET list
- `src/routes/api/v1/jobs/[id]/+server.ts` — GET single
- `src/hooks.server.ts` — call startup on server init
- `src/lib/server/pipeline/indexing.pipeline.test.ts`