chore(FEEDBACK-0001): linting
This commit is contained in:
@@ -56,75 +56,83 @@ Implement the end-to-end indexing pipeline that orchestrates crawling, parsing,
|
||||
// src/lib/server/pipeline/job-queue.ts
|
||||
|
||||
export class JobQueue {
|
||||
private isRunning = false;
|
||||
private isRunning = false;
|
||||
|
||||
constructor(private db: BetterSQLite3.Database) {}
|
||||
constructor(private db: BetterSQLite3.Database) {}
|
||||
|
||||
enqueue(repositoryId: string, versionId?: string): IndexingJob {
|
||||
const job: NewIndexingJob = {
|
||||
id: crypto.randomUUID(),
|
||||
repositoryId,
|
||||
versionId: versionId ?? null,
|
||||
status: 'queued',
|
||||
progress: 0,
|
||||
totalFiles: 0,
|
||||
processedFiles: 0,
|
||||
error: null,
|
||||
startedAt: null,
|
||||
completedAt: null,
|
||||
createdAt: new Date(),
|
||||
};
|
||||
enqueue(repositoryId: string, versionId?: string): IndexingJob {
|
||||
const job: NewIndexingJob = {
|
||||
id: crypto.randomUUID(),
|
||||
repositoryId,
|
||||
versionId: versionId ?? null,
|
||||
status: 'queued',
|
||||
progress: 0,
|
||||
totalFiles: 0,
|
||||
processedFiles: 0,
|
||||
error: null,
|
||||
startedAt: null,
|
||||
completedAt: null,
|
||||
createdAt: new Date()
|
||||
};
|
||||
|
||||
this.db.prepare(`
|
||||
this.db
|
||||
.prepare(
|
||||
`
|
||||
INSERT INTO indexing_jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`).run(...Object.values(job));
|
||||
`
|
||||
)
|
||||
.run(...Object.values(job));
|
||||
|
||||
// Kick off processing if not already running
|
||||
if (!this.isRunning) {
|
||||
setImmediate(() => this.processNext());
|
||||
}
|
||||
// Kick off processing if not already running
|
||||
if (!this.isRunning) {
|
||||
setImmediate(() => this.processNext());
|
||||
}
|
||||
|
||||
return job;
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
private async processNext(): Promise<void> {
|
||||
if (this.isRunning) return;
|
||||
private async processNext(): Promise<void> {
|
||||
if (this.isRunning) return;
|
||||
|
||||
const job = this.db.prepare(`
|
||||
const job = this.db
|
||||
.prepare(
|
||||
`
|
||||
SELECT * FROM indexing_jobs
|
||||
WHERE status = 'queued'
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 1
|
||||
`).get() as IndexingJob | undefined;
|
||||
`
|
||||
)
|
||||
.get() as IndexingJob | undefined;
|
||||
|
||||
if (!job) return;
|
||||
if (!job) return;
|
||||
|
||||
this.isRunning = true;
|
||||
try {
|
||||
await this.pipeline.run(job);
|
||||
} finally {
|
||||
this.isRunning = false;
|
||||
// Check for next queued job
|
||||
const nextJob = this.db.prepare(
|
||||
`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`
|
||||
).get();
|
||||
if (nextJob) setImmediate(() => this.processNext());
|
||||
}
|
||||
}
|
||||
this.isRunning = true;
|
||||
try {
|
||||
await this.pipeline.run(job);
|
||||
} finally {
|
||||
this.isRunning = false;
|
||||
// Check for next queued job
|
||||
const nextJob = this.db
|
||||
.prepare(`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`)
|
||||
.get();
|
||||
if (nextJob) setImmediate(() => this.processNext());
|
||||
}
|
||||
}
|
||||
|
||||
getJob(id: string): IndexingJob | null {
|
||||
return this.db.prepare(
|
||||
`SELECT * FROM indexing_jobs WHERE id = ?`
|
||||
).get(id) as IndexingJob | null;
|
||||
}
|
||||
getJob(id: string): IndexingJob | null {
|
||||
return this.db
|
||||
.prepare(`SELECT * FROM indexing_jobs WHERE id = ?`)
|
||||
.get(id) as IndexingJob | null;
|
||||
}
|
||||
|
||||
listJobs(repositoryId?: string, limit = 20): IndexingJob[] {
|
||||
const query = repositoryId
|
||||
? `SELECT * FROM indexing_jobs WHERE repository_id = ? ORDER BY created_at DESC LIMIT ?`
|
||||
: `SELECT * FROM indexing_jobs ORDER BY created_at DESC LIMIT ?`;
|
||||
const params = repositoryId ? [repositoryId, limit] : [limit];
|
||||
return this.db.prepare(query).all(...params) as IndexingJob[];
|
||||
}
|
||||
listJobs(repositoryId?: string, limit = 20): IndexingJob[] {
|
||||
const query = repositoryId
|
||||
? `SELECT * FROM indexing_jobs WHERE repository_id = ? ORDER BY created_at DESC LIMIT ?`
|
||||
: `SELECT * FROM indexing_jobs ORDER BY created_at DESC LIMIT ?`;
|
||||
const params = repositoryId ? [repositoryId, limit] : [limit];
|
||||
return this.db.prepare(query).all(...params) as IndexingJob[];
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
@@ -136,94 +144,96 @@ export class JobQueue {
|
||||
// src/lib/server/pipeline/indexing.pipeline.ts
|
||||
|
||||
export class IndexingPipeline {
|
||||
constructor(
|
||||
private db: BetterSQLite3.Database,
|
||||
private githubCrawler: GitHubCrawler,
|
||||
private localCrawler: LocalCrawler,
|
||||
private embeddingService: EmbeddingService | null,
|
||||
) {}
|
||||
constructor(
|
||||
private db: BetterSQLite3.Database,
|
||||
private githubCrawler: GitHubCrawler,
|
||||
private localCrawler: LocalCrawler,
|
||||
private embeddingService: EmbeddingService | null
|
||||
) {}
|
||||
|
||||
async run(job: IndexingJob): Promise<void> {
|
||||
this.updateJob(job.id, { status: 'running', startedAt: new Date() });
|
||||
async run(job: IndexingJob): Promise<void> {
|
||||
this.updateJob(job.id, { status: 'running', startedAt: new Date() });
|
||||
|
||||
try {
|
||||
const repo = this.getRepository(job.repositoryId);
|
||||
if (!repo) throw new Error(`Repository ${job.repositoryId} not found`);
|
||||
try {
|
||||
const repo = this.getRepository(job.repositoryId);
|
||||
if (!repo) throw new Error(`Repository ${job.repositoryId} not found`);
|
||||
|
||||
// Update repo state
|
||||
this.updateRepo(repo.id, { state: 'indexing' });
|
||||
// Update repo state
|
||||
this.updateRepo(repo.id, { state: 'indexing' });
|
||||
|
||||
// Step 1: Crawl
|
||||
const crawlResult = await this.crawl(repo, job);
|
||||
// Step 1: Crawl
|
||||
const crawlResult = await this.crawl(repo, job);
|
||||
|
||||
// Step 2: Parse and diff
|
||||
const { newSnippets, changedDocIds, newDocuments } =
|
||||
await this.parseAndDiff(crawlResult, repo, job);
|
||||
// Step 2: Parse and diff
|
||||
const { newSnippets, changedDocIds, newDocuments } = await this.parseAndDiff(
|
||||
crawlResult,
|
||||
repo,
|
||||
job
|
||||
);
|
||||
|
||||
// Step 3: Atomic replacement
|
||||
this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
|
||||
// Step 3: Atomic replacement
|
||||
this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
|
||||
|
||||
// Step 4: Embeddings (async, non-blocking for job completion)
|
||||
if (this.embeddingService && newSnippets.length > 0) {
|
||||
await this.embeddingService.embedSnippets(
|
||||
newSnippets.map(s => s.id),
|
||||
(done, total) => {
|
||||
// Update job progress for embedding phase
|
||||
}
|
||||
);
|
||||
}
|
||||
// Step 4: Embeddings (async, non-blocking for job completion)
|
||||
if (this.embeddingService && newSnippets.length > 0) {
|
||||
await this.embeddingService.embedSnippets(
|
||||
newSnippets.map((s) => s.id),
|
||||
(done, total) => {
|
||||
// Update job progress for embedding phase
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// Step 5: Update repo stats
|
||||
const stats = this.computeStats(repo.id);
|
||||
this.updateRepo(repo.id, {
|
||||
state: 'indexed',
|
||||
totalSnippets: stats.totalSnippets,
|
||||
totalTokens: stats.totalTokens,
|
||||
trustScore: computeTrustScore({ ...repo, ...stats }),
|
||||
lastIndexedAt: new Date(),
|
||||
});
|
||||
// Step 5: Update repo stats
|
||||
const stats = this.computeStats(repo.id);
|
||||
this.updateRepo(repo.id, {
|
||||
state: 'indexed',
|
||||
totalSnippets: stats.totalSnippets,
|
||||
totalTokens: stats.totalTokens,
|
||||
trustScore: computeTrustScore({ ...repo, ...stats }),
|
||||
lastIndexedAt: new Date()
|
||||
});
|
||||
|
||||
this.updateJob(job.id, {
|
||||
status: 'done',
|
||||
progress: 100,
|
||||
completedAt: new Date(),
|
||||
});
|
||||
this.updateJob(job.id, {
|
||||
status: 'done',
|
||||
progress: 100,
|
||||
completedAt: new Date()
|
||||
});
|
||||
} catch (error) {
|
||||
this.updateJob(job.id, {
|
||||
status: 'failed',
|
||||
error: (error as Error).message,
|
||||
completedAt: new Date()
|
||||
});
|
||||
this.updateRepo(job.repositoryId, { state: 'error' });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
} catch (error) {
|
||||
this.updateJob(job.id, {
|
||||
status: 'failed',
|
||||
error: (error as Error).message,
|
||||
completedAt: new Date(),
|
||||
});
|
||||
this.updateRepo(job.repositoryId, { state: 'error' });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
private replaceSnippets(
|
||||
repositoryId: string,
|
||||
changedDocIds: string[],
|
||||
newDocuments: NewDocument[],
|
||||
newSnippets: NewSnippet[]
|
||||
): void {
|
||||
// Single transaction: delete old → insert new
|
||||
this.db.transaction(() => {
|
||||
if (changedDocIds.length > 0) {
|
||||
// Cascade deletes snippets via FK constraint
|
||||
this.db
|
||||
.prepare(`DELETE FROM documents WHERE id IN (${changedDocIds.map(() => '?').join(',')})`)
|
||||
.run(...changedDocIds);
|
||||
}
|
||||
|
||||
private replaceSnippets(
|
||||
repositoryId: string,
|
||||
changedDocIds: string[],
|
||||
newDocuments: NewDocument[],
|
||||
newSnippets: NewSnippet[]
|
||||
): void {
|
||||
// Single transaction: delete old → insert new
|
||||
this.db.transaction(() => {
|
||||
if (changedDocIds.length > 0) {
|
||||
// Cascade deletes snippets via FK constraint
|
||||
this.db.prepare(
|
||||
`DELETE FROM documents WHERE id IN (${changedDocIds.map(() => '?').join(',')})`
|
||||
).run(...changedDocIds);
|
||||
}
|
||||
for (const doc of newDocuments) {
|
||||
this.insertDocument(doc);
|
||||
}
|
||||
|
||||
for (const doc of newDocuments) {
|
||||
this.insertDocument(doc);
|
||||
}
|
||||
|
||||
for (const snippet of newSnippets) {
|
||||
this.insertSnippet(snippet);
|
||||
}
|
||||
})();
|
||||
}
|
||||
for (const snippet of newSnippets) {
|
||||
this.insertSnippet(snippet);
|
||||
}
|
||||
})();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
@@ -233,26 +243,24 @@ export class IndexingPipeline {
|
||||
|
||||
```typescript
|
||||
function calculateProgress(
|
||||
processedFiles: number,
|
||||
totalFiles: number,
|
||||
embeddingsDone: number,
|
||||
embeddingsTotal: number,
|
||||
hasEmbeddings: boolean
|
||||
processedFiles: number,
|
||||
totalFiles: number,
|
||||
embeddingsDone: number,
|
||||
embeddingsTotal: number,
|
||||
hasEmbeddings: boolean
|
||||
): number {
|
||||
if (totalFiles === 0) return 0;
|
||||
if (totalFiles === 0) return 0;
|
||||
|
||||
if (!hasEmbeddings) {
|
||||
// Crawl + parse = 100%
|
||||
return Math.round((processedFiles / totalFiles) * 100);
|
||||
}
|
||||
if (!hasEmbeddings) {
|
||||
// Crawl + parse = 100%
|
||||
return Math.round((processedFiles / totalFiles) * 100);
|
||||
}
|
||||
|
||||
// Crawl+parse = 80%, embeddings = 20%
|
||||
const parseProgress = (processedFiles / totalFiles) * 80;
|
||||
const embedProgress = embeddingsTotal > 0
|
||||
? (embeddingsDone / embeddingsTotal) * 20
|
||||
: 0;
|
||||
// Crawl+parse = 80%, embeddings = 20%
|
||||
const parseProgress = (processedFiles / totalFiles) * 80;
|
||||
const embedProgress = embeddingsTotal > 0 ? (embeddingsDone / embeddingsTotal) * 20 : 0;
|
||||
|
||||
return Math.round(parseProgress + embedProgress);
|
||||
return Math.round(parseProgress + embedProgress);
|
||||
}
|
||||
```
|
||||
|
||||
@@ -263,20 +271,21 @@ function calculateProgress(
|
||||
### `GET /api/v1/jobs/:id`
|
||||
|
||||
Response `200`:
|
||||
|
||||
```json
|
||||
{
|
||||
"job": {
|
||||
"id": "uuid",
|
||||
"repositoryId": "/facebook/react",
|
||||
"status": "running",
|
||||
"progress": 47,
|
||||
"totalFiles": 342,
|
||||
"processedFiles": 162,
|
||||
"error": null,
|
||||
"startedAt": "2026-03-22T10:00:00Z",
|
||||
"completedAt": null,
|
||||
"createdAt": "2026-03-22T09:59:55Z"
|
||||
}
|
||||
"job": {
|
||||
"id": "uuid",
|
||||
"repositoryId": "/facebook/react",
|
||||
"status": "running",
|
||||
"progress": 47,
|
||||
"totalFiles": 342,
|
||||
"processedFiles": 162,
|
||||
"error": null,
|
||||
"startedAt": "2026-03-22T10:00:00Z",
|
||||
"completedAt": null,
|
||||
"createdAt": "2026-03-22T09:59:55Z"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
@@ -285,6 +294,7 @@ Response `200`:
|
||||
Query params: `repositoryId` (optional), `status` (optional), `limit` (default 20).
|
||||
|
||||
Response `200`:
|
||||
|
||||
```json
|
||||
{
|
||||
"jobs": [...],
|
||||
@@ -300,20 +310,24 @@ On application start, mark any jobs in `running` state as `failed` (they were in
|
||||
|
||||
```typescript
|
||||
function recoverStaleJobs(db: BetterSQLite3.Database): void {
|
||||
db.prepare(`
|
||||
db.prepare(
|
||||
`
|
||||
UPDATE indexing_jobs
|
||||
SET status = 'failed',
|
||||
error = 'Server restarted while job was running',
|
||||
completed_at = unixepoch()
|
||||
WHERE status = 'running'
|
||||
`).run();
|
||||
`
|
||||
).run();
|
||||
|
||||
// Also reset any repositories stuck in 'indexing' state
|
||||
db.prepare(`
|
||||
// Also reset any repositories stuck in 'indexing' state
|
||||
db.prepare(
|
||||
`
|
||||
UPDATE repositories
|
||||
SET state = 'error'
|
||||
WHERE state = 'indexing'
|
||||
`).run();
|
||||
`
|
||||
).run();
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
Reference in New Issue
Block a user