chore: initial project scaffold
This commit is contained in:
330
docs/features/TRUEREF-0009.md
Normal file
330
docs/features/TRUEREF-0009.md
Normal file
@@ -0,0 +1,330 @@
|
||||
# TRUEREF-0009 — Indexing Pipeline & Job Queue
|
||||
|
||||
**Priority:** P0
|
||||
**Status:** Pending
|
||||
**Depends On:** TRUEREF-0003, TRUEREF-0004, TRUEREF-0005, TRUEREF-0001
|
||||
**Blocks:** TRUEREF-0010, TRUEREF-0015
|
||||
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
Implement the end-to-end indexing pipeline that orchestrates crawling, parsing, storage, and embedding generation for a repository. Uses a SQLite-backed job queue with sequential processing. Each indexing run is atomic — new data replaces old data only upon successful completion.
|
||||
|
||||
---
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [ ] `IndexingPipeline` class that orchestrates the full indexing flow
|
||||
- [ ] SQLite-backed `JobQueue` (no external message broker)
|
||||
- [ ] Atomic snippet replacement: old snippets deleted only after new ones are stored successfully
|
||||
- [ ] Progress tracked in `indexing_jobs` table (processedFiles, totalFiles, progress 0-100)
|
||||
- [ ] `GET /api/v1/jobs/:id` — get job status and progress
|
||||
- [ ] `GET /api/v1/jobs` — list recent jobs (with filtering by repositoryId)
|
||||
- [ ] Jobs run sequentially (one at a time) to avoid SQLite write contention
|
||||
- [ ] Graceful error handling: job marked as failed with error message, existing data preserved
|
||||
- [ ] Server startup: resume any `running` jobs that were interrupted
|
||||
- [ ] Unit tests for pipeline stages and job queue
|
||||
|
||||
---
|
||||
|
||||
## Pipeline Stages
|
||||
|
||||
```
|
||||
1. Create IndexingJob (status: queued)
|
||||
2. Dequeue job (status: running, startedAt: now)
|
||||
3. Fetch trueref.json config (if exists)
|
||||
4. Crawl repository (GitHub or local)
|
||||
→ Update job.totalFiles
|
||||
5. For each file:
|
||||
a. Check checksum against existing document
|
||||
b. If unchanged, skip (reuse existing snippets)
|
||||
c. If changed/new: parse into snippets
|
||||
d. Buffer new snippets in memory
|
||||
e. Update job.processedFiles + job.progress
|
||||
6. Transaction: Delete old snippets/documents → Insert new ones
|
||||
7. Generate embeddings for new snippets (if provider configured)
|
||||
8. Update repository stats (totalSnippets, totalTokens, trustScore, lastIndexedAt)
|
||||
9. Mark job as done (status: done, completedAt: now)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Job Queue Implementation
|
||||
|
||||
```typescript
|
||||
// src/lib/server/pipeline/job-queue.ts
|
||||
|
||||
export class JobQueue {
|
||||
private isRunning = false;
|
||||
|
||||
constructor(private db: BetterSQLite3.Database) {}
|
||||
|
||||
enqueue(repositoryId: string, versionId?: string): IndexingJob {
|
||||
const job: NewIndexingJob = {
|
||||
id: crypto.randomUUID(),
|
||||
repositoryId,
|
||||
versionId: versionId ?? null,
|
||||
status: 'queued',
|
||||
progress: 0,
|
||||
totalFiles: 0,
|
||||
processedFiles: 0,
|
||||
error: null,
|
||||
startedAt: null,
|
||||
completedAt: null,
|
||||
createdAt: new Date(),
|
||||
};
|
||||
|
||||
this.db.prepare(`
|
||||
INSERT INTO indexing_jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`).run(...Object.values(job));
|
||||
|
||||
// Kick off processing if not already running
|
||||
if (!this.isRunning) {
|
||||
setImmediate(() => this.processNext());
|
||||
}
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
private async processNext(): Promise<void> {
|
||||
if (this.isRunning) return;
|
||||
|
||||
const job = this.db.prepare(`
|
||||
SELECT * FROM indexing_jobs
|
||||
WHERE status = 'queued'
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 1
|
||||
`).get() as IndexingJob | undefined;
|
||||
|
||||
if (!job) return;
|
||||
|
||||
this.isRunning = true;
|
||||
try {
|
||||
await this.pipeline.run(job);
|
||||
} finally {
|
||||
this.isRunning = false;
|
||||
// Check for next queued job
|
||||
const nextJob = this.db.prepare(
|
||||
`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`
|
||||
).get();
|
||||
if (nextJob) setImmediate(() => this.processNext());
|
||||
}
|
||||
}
|
||||
|
||||
getJob(id: string): IndexingJob | null {
|
||||
return this.db.prepare(
|
||||
`SELECT * FROM indexing_jobs WHERE id = ?`
|
||||
).get(id) as IndexingJob | null;
|
||||
}
|
||||
|
||||
listJobs(repositoryId?: string, limit = 20): IndexingJob[] {
|
||||
const query = repositoryId
|
||||
? `SELECT * FROM indexing_jobs WHERE repository_id = ? ORDER BY created_at DESC LIMIT ?`
|
||||
: `SELECT * FROM indexing_jobs ORDER BY created_at DESC LIMIT ?`;
|
||||
const params = repositoryId ? [repositoryId, limit] : [limit];
|
||||
return this.db.prepare(query).all(...params) as IndexingJob[];
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Indexing Pipeline
|
||||
|
||||
```typescript
|
||||
// src/lib/server/pipeline/indexing.pipeline.ts
|
||||
|
||||
export class IndexingPipeline {
|
||||
constructor(
|
||||
private db: BetterSQLite3.Database,
|
||||
private githubCrawler: GitHubCrawler,
|
||||
private localCrawler: LocalCrawler,
|
||||
private embeddingService: EmbeddingService | null,
|
||||
) {}
|
||||
|
||||
async run(job: IndexingJob): Promise<void> {
|
||||
this.updateJob(job.id, { status: 'running', startedAt: new Date() });
|
||||
|
||||
try {
|
||||
const repo = this.getRepository(job.repositoryId);
|
||||
if (!repo) throw new Error(`Repository ${job.repositoryId} not found`);
|
||||
|
||||
// Update repo state
|
||||
this.updateRepo(repo.id, { state: 'indexing' });
|
||||
|
||||
// Step 1: Crawl
|
||||
const crawlResult = await this.crawl(repo, job);
|
||||
|
||||
// Step 2: Parse and diff
|
||||
const { newSnippets, changedDocIds, newDocuments } =
|
||||
await this.parseAndDiff(crawlResult, repo, job);
|
||||
|
||||
// Step 3: Atomic replacement
|
||||
this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
|
||||
|
||||
// Step 4: Embeddings (async, non-blocking for job completion)
|
||||
if (this.embeddingService && newSnippets.length > 0) {
|
||||
await this.embeddingService.embedSnippets(
|
||||
newSnippets.map(s => s.id),
|
||||
(done, total) => {
|
||||
// Update job progress for embedding phase
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Step 5: Update repo stats
|
||||
const stats = this.computeStats(repo.id);
|
||||
this.updateRepo(repo.id, {
|
||||
state: 'indexed',
|
||||
totalSnippets: stats.totalSnippets,
|
||||
totalTokens: stats.totalTokens,
|
||||
trustScore: computeTrustScore({ ...repo, ...stats }),
|
||||
lastIndexedAt: new Date(),
|
||||
});
|
||||
|
||||
this.updateJob(job.id, {
|
||||
status: 'done',
|
||||
progress: 100,
|
||||
completedAt: new Date(),
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
this.updateJob(job.id, {
|
||||
status: 'failed',
|
||||
error: (error as Error).message,
|
||||
completedAt: new Date(),
|
||||
});
|
||||
this.updateRepo(job.repositoryId, { state: 'error' });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private replaceSnippets(
|
||||
repositoryId: string,
|
||||
changedDocIds: string[],
|
||||
newDocuments: NewDocument[],
|
||||
newSnippets: NewSnippet[]
|
||||
): void {
|
||||
// Single transaction: delete old → insert new
|
||||
this.db.transaction(() => {
|
||||
if (changedDocIds.length > 0) {
|
||||
// Cascade deletes snippets via FK constraint
|
||||
this.db.prepare(
|
||||
`DELETE FROM documents WHERE id IN (${changedDocIds.map(() => '?').join(',')})`
|
||||
).run(...changedDocIds);
|
||||
}
|
||||
|
||||
for (const doc of newDocuments) {
|
||||
this.insertDocument(doc);
|
||||
}
|
||||
|
||||
for (const snippet of newSnippets) {
|
||||
this.insertSnippet(snippet);
|
||||
}
|
||||
})();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Progress Calculation
|
||||
|
||||
```typescript
|
||||
function calculateProgress(
|
||||
processedFiles: number,
|
||||
totalFiles: number,
|
||||
embeddingsDone: number,
|
||||
embeddingsTotal: number,
|
||||
hasEmbeddings: boolean
|
||||
): number {
|
||||
if (totalFiles === 0) return 0;
|
||||
|
||||
if (!hasEmbeddings) {
|
||||
// Crawl + parse = 100%
|
||||
return Math.round((processedFiles / totalFiles) * 100);
|
||||
}
|
||||
|
||||
// Crawl+parse = 80%, embeddings = 20%
|
||||
const parseProgress = (processedFiles / totalFiles) * 80;
|
||||
const embedProgress = embeddingsTotal > 0
|
||||
? (embeddingsDone / embeddingsTotal) * 20
|
||||
: 0;
|
||||
|
||||
return Math.round(parseProgress + embedProgress);
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## API Endpoints
|
||||
|
||||
### `GET /api/v1/jobs/:id`
|
||||
|
||||
Response `200`:
|
||||
```json
|
||||
{
|
||||
"job": {
|
||||
"id": "uuid",
|
||||
"repositoryId": "/facebook/react",
|
||||
"status": "running",
|
||||
"progress": 47,
|
||||
"totalFiles": 342,
|
||||
"processedFiles": 162,
|
||||
"error": null,
|
||||
"startedAt": "2026-03-22T10:00:00Z",
|
||||
"completedAt": null,
|
||||
"createdAt": "2026-03-22T09:59:55Z"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### `GET /api/v1/jobs`
|
||||
|
||||
Query params: `repositoryId` (optional), `status` (optional), `limit` (default 20).
|
||||
|
||||
Response `200`:
|
||||
```json
|
||||
{
|
||||
"jobs": [...],
|
||||
"total": 5
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Server Startup Recovery
|
||||
|
||||
On application start, mark any jobs in `running` state as `failed` (they were interrupted by a process crash):
|
||||
|
||||
```typescript
|
||||
function recoverStaleJobs(db: BetterSQLite3.Database): void {
|
||||
db.prepare(`
|
||||
UPDATE indexing_jobs
|
||||
SET status = 'failed',
|
||||
error = 'Server restarted while job was running',
|
||||
completed_at = unixepoch()
|
||||
WHERE status = 'running'
|
||||
`).run();
|
||||
|
||||
// Also reset any repositories stuck in 'indexing' state
|
||||
db.prepare(`
|
||||
UPDATE repositories
|
||||
SET state = 'error'
|
||||
WHERE state = 'indexing'
|
||||
`).run();
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Files to Create
|
||||
|
||||
- `src/lib/server/pipeline/indexing.pipeline.ts`
|
||||
- `src/lib/server/pipeline/job-queue.ts`
|
||||
- `src/lib/server/pipeline/startup.ts` — recovery + queue initialization
|
||||
- `src/routes/api/v1/jobs/+server.ts` — GET list
|
||||
- `src/routes/api/v1/jobs/[id]/+server.ts` — GET single
|
||||
- `src/hooks.server.ts` — call startup on server init
|
||||
- `src/lib/server/pipeline/indexing.pipeline.test.ts`
|
||||
Reference in New Issue
Block a user