Files
trueref/docs/features/TRUEREF-0009.md
2026-03-22 17:08:15 +01:00

8.8 KiB

TRUEREF-0009 — Indexing Pipeline & Job Queue

Priority: P0 Status: Pending Depends On: TRUEREF-0003, TRUEREF-0004, TRUEREF-0005, TRUEREF-0001 Blocks: TRUEREF-0010, TRUEREF-0015


Overview

Implement the end-to-end indexing pipeline that orchestrates crawling, parsing, storage, and embedding generation for a repository. Uses a SQLite-backed job queue with sequential processing. Each indexing run is atomic — new data replaces old data only upon successful completion.


Acceptance Criteria

  • IndexingPipeline class that orchestrates the full indexing flow
  • SQLite-backed JobQueue (no external message broker)
  • Atomic snippet replacement: old snippets deleted only after new ones are stored successfully
  • Progress tracked in indexing_jobs table (processedFiles, totalFiles, progress 0-100)
  • GET /api/v1/jobs/:id — get job status and progress
  • GET /api/v1/jobs — list recent jobs (with filtering by repositoryId)
  • Jobs run sequentially (one at a time) to avoid SQLite write contention
  • Graceful error handling: job marked as failed with error message, existing data preserved
  • Server startup: resume any running jobs that were interrupted
  • Unit tests for pipeline stages and job queue

Pipeline Stages

1. Create IndexingJob (status: queued)
2. Dequeue job (status: running, startedAt: now)
3. Fetch trueref.json config (if exists)
4. Crawl repository (GitHub or local)
   → Update job.totalFiles
5. For each file:
   a. Check checksum against existing document
   b. If unchanged, skip (reuse existing snippets)
   c. If changed/new: parse into snippets
   d. Buffer new snippets in memory
   e. Update job.processedFiles + job.progress
6. Transaction: Delete old snippets/documents → Insert new ones
7. Generate embeddings for new snippets (if provider configured)
8. Update repository stats (totalSnippets, totalTokens, trustScore, lastIndexedAt)
9. Mark job as done (status: done, completedAt: now)

Job Queue Implementation

// src/lib/server/pipeline/job-queue.ts

export class JobQueue {
  private isRunning = false;

  constructor(private db: BetterSQLite3.Database) {}

  enqueue(repositoryId: string, versionId?: string): IndexingJob {
    const job: NewIndexingJob = {
      id: crypto.randomUUID(),
      repositoryId,
      versionId: versionId ?? null,
      status: 'queued',
      progress: 0,
      totalFiles: 0,
      processedFiles: 0,
      error: null,
      startedAt: null,
      completedAt: null,
      createdAt: new Date(),
    };

    this.db.prepare(`
      INSERT INTO indexing_jobs VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    `).run(...Object.values(job));

    // Kick off processing if not already running
    if (!this.isRunning) {
      setImmediate(() => this.processNext());
    }

    return job;
  }

  private async processNext(): Promise<void> {
    if (this.isRunning) return;

    const job = this.db.prepare(`
      SELECT * FROM indexing_jobs
      WHERE status = 'queued'
      ORDER BY created_at ASC
      LIMIT 1
    `).get() as IndexingJob | undefined;

    if (!job) return;

    this.isRunning = true;
    try {
      await this.pipeline.run(job);
    } finally {
      this.isRunning = false;
      // Check for next queued job
      const nextJob = this.db.prepare(
        `SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`
      ).get();
      if (nextJob) setImmediate(() => this.processNext());
    }
  }

  getJob(id: string): IndexingJob | null {
    return this.db.prepare(
      `SELECT * FROM indexing_jobs WHERE id = ?`
    ).get(id) as IndexingJob | null;
  }

  listJobs(repositoryId?: string, limit = 20): IndexingJob[] {
    const query = repositoryId
      ? `SELECT * FROM indexing_jobs WHERE repository_id = ? ORDER BY created_at DESC LIMIT ?`
      : `SELECT * FROM indexing_jobs ORDER BY created_at DESC LIMIT ?`;
    const params = repositoryId ? [repositoryId, limit] : [limit];
    return this.db.prepare(query).all(...params) as IndexingJob[];
  }
}

Indexing Pipeline

// src/lib/server/pipeline/indexing.pipeline.ts

export class IndexingPipeline {
  constructor(
    private db: BetterSQLite3.Database,
    private githubCrawler: GitHubCrawler,
    private localCrawler: LocalCrawler,
    private embeddingService: EmbeddingService | null,
  ) {}

  async run(job: IndexingJob): Promise<void> {
    this.updateJob(job.id, { status: 'running', startedAt: new Date() });

    try {
      const repo = this.getRepository(job.repositoryId);
      if (!repo) throw new Error(`Repository ${job.repositoryId} not found`);

      // Update repo state
      this.updateRepo(repo.id, { state: 'indexing' });

      // Step 1: Crawl
      const crawlResult = await this.crawl(repo, job);

      // Step 2: Parse and diff
      const { newSnippets, changedDocIds, newDocuments } =
        await this.parseAndDiff(crawlResult, repo, job);

      // Step 3: Atomic replacement
      this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);

      // Step 4: Embeddings (async, non-blocking for job completion)
      if (this.embeddingService && newSnippets.length > 0) {
        await this.embeddingService.embedSnippets(
          newSnippets.map(s => s.id),
          (done, total) => {
            // Update job progress for embedding phase
          }
        );
      }

      // Step 5: Update repo stats
      const stats = this.computeStats(repo.id);
      this.updateRepo(repo.id, {
        state: 'indexed',
        totalSnippets: stats.totalSnippets,
        totalTokens: stats.totalTokens,
        trustScore: computeTrustScore({ ...repo, ...stats }),
        lastIndexedAt: new Date(),
      });

      this.updateJob(job.id, {
        status: 'done',
        progress: 100,
        completedAt: new Date(),
      });

    } catch (error) {
      this.updateJob(job.id, {
        status: 'failed',
        error: (error as Error).message,
        completedAt: new Date(),
      });
      this.updateRepo(job.repositoryId, { state: 'error' });
      throw error;
    }
  }

  private replaceSnippets(
    repositoryId: string,
    changedDocIds: string[],
    newDocuments: NewDocument[],
    newSnippets: NewSnippet[]
  ): void {
    // Single transaction: delete old → insert new
    this.db.transaction(() => {
      if (changedDocIds.length > 0) {
        // Cascade deletes snippets via FK constraint
        this.db.prepare(
          `DELETE FROM documents WHERE id IN (${changedDocIds.map(() => '?').join(',')})`
        ).run(...changedDocIds);
      }

      for (const doc of newDocuments) {
        this.insertDocument(doc);
      }

      for (const snippet of newSnippets) {
        this.insertSnippet(snippet);
      }
    })();
  }
}

Progress Calculation

function calculateProgress(
  processedFiles: number,
  totalFiles: number,
  embeddingsDone: number,
  embeddingsTotal: number,
  hasEmbeddings: boolean
): number {
  if (totalFiles === 0) return 0;

  if (!hasEmbeddings) {
    // Crawl + parse = 100%
    return Math.round((processedFiles / totalFiles) * 100);
  }

  // Crawl+parse = 80%, embeddings = 20%
  const parseProgress = (processedFiles / totalFiles) * 80;
  const embedProgress = embeddingsTotal > 0
    ? (embeddingsDone / embeddingsTotal) * 20
    : 0;

  return Math.round(parseProgress + embedProgress);
}

API Endpoints

GET /api/v1/jobs/:id

Response 200:

{
  "job": {
    "id": "uuid",
    "repositoryId": "/facebook/react",
    "status": "running",
    "progress": 47,
    "totalFiles": 342,
    "processedFiles": 162,
    "error": null,
    "startedAt": "2026-03-22T10:00:00Z",
    "completedAt": null,
    "createdAt": "2026-03-22T09:59:55Z"
  }
}

GET /api/v1/jobs

Query params: repositoryId (optional), status (optional), limit (default 20).

Response 200:

{
  "jobs": [...],
  "total": 5
}

Server Startup Recovery

On application start, mark any jobs in running state as failed (they were interrupted by a process crash):

function recoverStaleJobs(db: BetterSQLite3.Database): void {
  db.prepare(`
    UPDATE indexing_jobs
    SET status = 'failed',
        error = 'Server restarted while job was running',
        completed_at = unixepoch()
    WHERE status = 'running'
  `).run();

  // Also reset any repositories stuck in 'indexing' state
  db.prepare(`
    UPDATE repositories
    SET state = 'error'
    WHERE state = 'indexing'
  `).run();
}

Files to Create

  • src/lib/server/pipeline/indexing.pipeline.ts
  • src/lib/server/pipeline/job-queue.ts
  • src/lib/server/pipeline/startup.ts — recovery + queue initialization
  • src/routes/api/v1/jobs/+server.ts — GET list
  • src/routes/api/v1/jobs/[id]/+server.ts — GET single
  • src/hooks.server.ts — call startup on server init
  • src/lib/server/pipeline/indexing.pipeline.test.ts