22 KiB
TRUEREF-0022 — Worker-Thread Indexing, Parallel Job Execution, and Real-Time Progress Streaming
Priority: P1 Status: Draft Depends On: TRUEREF-0009, TRUEREF-0014, TRUEREF-0017, TRUEREF-0021 Blocks: —
Overview
The indexing pipeline currently runs on the same Node.js event loop as the HTTP and MCP servers. Because better-sqlite3 is synchronous and parseFile is CPU-bound, a single indexing job can starve the event loop for seconds at a time, making the web UI completely unresponsive during indexing. With hundreds of version tags queued simultaneously, the problem compounds: the UI cannot navigate, poll for progress, or serve MCP requests while work is in flight.
This feature fixes the root cause by moving all indexing work into a dedicated Node.js Worker Thread, enabling controlled parallel execution of multiple jobs, and replacing the polling-based progress model with a real-time push mechanism — either Server-Sent Events (SSE) or a lightweight WebSocket channel.
Problem Statement
1. Event Loop Starvation
IndexingPipeline.run() spends the bulk of its time in two blocking operations:
parseFile(file, ...)— CPU-bound text/AST parsing, fully synchronousbetter-sqlite3writes — synchronous I/O by design
Neither yields to the event loop. A repository with 2 000 files produces ~2 000 consecutive blocking micro-tasks with no opportunity for the HTTP server to process any incoming request in between. The current mitigation (yielding every 20 files via setImmediate) reduces the freeze to sub-second intervals but does not eliminate it — it is a band-aid, not a structural fix.
2. Sequential-only Queue
JobQueue serialises all jobs one at a time to avoid SQLite write contention. This is correct for a single-writer model, but it means:
- Indexing
/my-lib/v3.0.0blocks/other-libfrom starting, even though they write to entirely disjoint rows. - With hundreds of version tags registered from Discover Tags, a user must wait for every previous tag to finish before the next one starts — typically hours for a large monorepo.
3. Polling Overhead and Lag
The UI currently polls GET /api/v1/jobs?repositoryId=... every 2 seconds. This means:
- Progress updates are always up to 2 seconds stale.
- Each poll is a full DB read regardless of whether anything changed.
- The polling interval itself adds load during the highest-contention window.
Goals
- Move
IndexingPipeline.run()into a Node.js Worker Thread so the HTTP event loop is never blocked by indexing work. - Support configurable parallel job execution (default: 2 concurrent workers, max: N where N is the number of CPU cores minus 1).
- Replace polling with Server-Sent Events (SSE) for real-time per-job progress streaming.
- Keep a single SQLite file as the persistence layer — no external message broker.
- Detailed progress: expose current stage name (crawl / diff / parse / store / embed), not just a percentage.
- Remain backward-compatible: the existing
GET /api/v1/jobs/{id}REST endpoint continues to work unchanged.
Non-Goals
- Moving to a multi-process (fork) architecture.
- External queue systems (Redis, BullMQ, etc.).
- Distributed or cluster execution across multiple machines.
- Resumable indexing (pause mid-parse and continue after restart).
- Changing the SQLite storage backend.
Architecture
Worker Thread Model
┌─────────────────────────────────────────────────────────────────┐
│ Main Thread (SvelteKit / HTTP / MCP) │
│ │
│ WorkerPool JobQueue (SQLite) │
│ ┌────────────┐ ┌──────────────────────────────────────┐ │
│ │ Worker 0 │◄────►│ indexing_jobs (queued/running/done) │ │
│ │ Worker 1 │ └──────────────────────────────────────┘ │
│ │ Worker N │ │
│ └────────────┘ ProgressBroadcaster │
│ │ ┌──────────────────────────────────────┐ │
│ └─────────────►│ SSE channels (Map<jobId, Response>) │ │
│ postMessage └──────────────────────────────────────┘ │
│ { type: 'progress', jobId, stage, ... } │
└─────────────────────────────────────────────────────────────────┘
Worker Thread lifecycle
Each worker is a long-lived node:worker_threads Worker instance that:
- Opens its own
better-sqlite3connection to the same database file. - Listens for
{ type: 'run', jobId }messages from the main thread. - Runs
IndexingPipeline.run(job), emittingpostMessageprogress events at each stage boundary and every N files. - Posts
{ type: 'done', jobId }or{ type: 'failed', jobId, error }when finished. - Is reused for subsequent jobs (no spawn-per-job overhead).
WorkerPool (main thread)
Manages a pool of concurrency workers.
interface WorkerPoolOptions {
concurrency: number; // default: Math.max(1, os.cpus().length - 1), capped at 4
workerScript: string; // absolute path to the compiled worker entry
}
class WorkerPool {
private workers: Worker[];
private idle: Worker[];
enqueue(jobId: string): void;
private dispatch(worker: Worker, jobId: string): void;
private onWorkerMessage(msg: WorkerMessage): void;
private onWorkerExit(worker: Worker, code: number): void;
}
Workers are kept alive across jobs. If a worker crashes (non-zero exit), the pool spawns a replacement and marks any in-flight job as failed.
Parallelism and write contention
With WAL mode enabled (already the case), SQLite supports:
- One concurrent writer (the transaction lock)
- Many concurrent readers
The replaceSnippets transaction for different repositories never contends — they write different rows. The cloneFromAncestor operation writes to the same tables but different version_id values, so WAL checkpoint logic keeps them non-overlapping at the page level.
Two jobs on the same repository (e.g. /my-lib/v1.0.0 and /my-lib/v2.0.0) can run in parallel because:
- Differential indexing (TRUEREF-0021) ensures
v2.0.0reads fromv1.0.0's already-committed rows. - The write transactions for each version touch disjoint
version_idpartitions.
If write contention still occurs under parallel load, busy_timeout = 5000 (already set) absorbs transient waits.
Concurrency limit per repository
To prevent a user from queuing 500 tags and overwhelming the worker pool, the pool enforces:
- Max 1 running job per repository for the default branch (re-index).
- Max
concurrencytotal running jobs across all repositories. - Version jobs for the same repository are serialised within the pool (the queue picks the oldest queued version job for a given repo only when no other version job for that repo is running).
Progress Model
Stage Enumeration
Replace the opaque integer progress with a structured stage model:
type IndexingStage =
| 'queued'
| 'differential' // computing ancestor diff
| 'crawling' // fetching files from GitHub or local FS
| 'cloning' // cloning unchanged files from ancestor (differential only)
| 'parsing' // parsing files into snippets
| 'storing' // writing documents + snippets to DB
| 'embedding' // generating vector embeddings
| 'done'
| 'failed';
Extended Job Schema
ALTER TABLE indexing_jobs ADD COLUMN stage TEXT NOT NULL DEFAULT 'queued';
ALTER TABLE indexing_jobs ADD COLUMN stage_detail TEXT; -- e.g. "42 / 200 files"
The progress column (0–100) is retained for backward compatibility and overall bar rendering.
Worker → Main thread progress message
interface ProgressMessage {
type: 'progress';
jobId: string;
stage: IndexingStage;
stageDetail?: string; // human-readable detail for the current stage
progress: number; // 0–100 overall
processedFiles: number;
totalFiles: number;
}
Workers emit this message:
- On every stage transition (crawl start, parse start, store start, embed start).
- Every
PROGRESS_EMIT_EVERY = 10files during the parse loop. - On job completion or failure.
The main thread receives these messages and does two things:
- Writes the update to
indexing_jobsin SQLite (batched — one write per message, not per file). - Pushes the payload to any open SSE channels for that jobId.
Server-Sent Events API
GET /api/v1/jobs/:id/stream
Opens an SSE connection for a specific job. The server:
- Sends the current job state as the first event immediately (no initial lag).
- Pushes
ProgressMessageevents as the worker emits them. - Sends a final
event: doneorevent: failedevent, then closes the connection. - Accepts
Last-Event-IDheader for reconnect support — replays the last known state.
GET /api/v1/jobs/abc-123/stream HTTP/1.1
Accept: text/event-stream
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
X-Accel-Buffering: no
id: 1
event: progress
data: {"stage":"crawling","progress":0,"processedFiles":0,"totalFiles":0}
id: 2
event: progress
data: {"stage":"parsing","progress":12,"processedFiles":240,"totalFiles":2000}
id: 47
event: done
data: {"stage":"done","progress":100,"processedFiles":2000,"totalFiles":2000}
The connection is automatically closed by the server after event: done or event: failed. If the client disconnects and reconnects with Last-Event-ID: 47, the server replays the last cached event (only the most recent event per job is cached in memory).
GET /api/v1/jobs/stream (batch)
A second endpoint streams progress for all active jobs of a repository:
GET /api/v1/jobs/stream?repositoryId=/my-lib HTTP/1.1
Accept: text/event-stream
Events are multiplexed in the same stream:
event: job-progress
data: {"jobId":"abc-123","stage":"parsing","progress":34,...}
event: job-progress
data: {"jobId":"def-456","stage":"crawling","progress":5,...}
This replaces the current single-interval GET /api/v1/jobs?repositoryId=... poll on the repository detail page entirely.
UI Changes
Repository detail page (/repos/[id])
- Replace the
$effectpoll for version jobs with a singleEventSourceconnection toGET /api/v1/jobs/stream?repositoryId=.... - Replace the inline progress bar markup with a refined component that shows stage name + file count + percentage.
- Show a compact "N jobs in queue" badge when jobs are queued but not yet running.
Admin jobs page (/admin/jobs)
- Replace the
setInterval(fetchJobs, 3000)poll with anEventSourceonGET /api/v1/jobs/stream(all jobs, no repositoryId filter).
Progress component
┌──────────────────────────────────────────────────────────────┐
│ v2.1.0 Parsing 42 / 200 21% │
│ ████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ │
└──────────────────────────────────────────────────────────────┘
Stage label transitions: Queued → Diff → Crawling → Cloning → Parsing → Storing → Embedding → Done
Configuration
Expose via the settings table (key indexing.concurrency):
interface IndexingSettings {
concurrency: number; // 1–max(cpus-1, 1); default 2
}
Surfaced in the settings UI (/settings) alongside the embedding provider config.
Schema Migration
-- Migration: add stage columns to indexing_jobs
ALTER TABLE indexing_jobs ADD COLUMN stage TEXT NOT NULL DEFAULT 'queued';
ALTER TABLE indexing_jobs ADD COLUMN stage_detail TEXT;
The progress, processedFiles, and totalFiles columns are retained. The status column (queued / running / paused / cancelled / done / failed) is also retained. stage provides sub-status granularity within running.
Acceptance Criteria
Worker Thread
IndexingPipeline.run()executes entirely inside a Worker Thread — zeroparseFile/replaceSnippetscalls on the main thread during indexing- Worker crashes are detected: the pool spawns a replacement and marks the in-flight job as
failed - Worker pool concurrency is configurable via settings (min 1, max
cpus - 1, default 2) - Restarting the server cleans up stale
runningjobs and re-queues them (existing behaviour preserved)
Parallel Execution
- Two jobs for two different repositories run concurrently when
concurrency ≥ 2 - Two version jobs for the same repository are serialised (at most one per repo at a time)
- Main-branch re-index job and version jobs for the same repo are serialised
- Admin jobs page shows parallel running jobs simultaneously
Progress Streaming
GET /api/v1/jobs/:id/streamreturnstext/event-streamwith stage + progress eventsGET /api/v1/jobs/stream?repositoryId=...multiplexes all active jobs for a repo- First event is sent immediately (no wait for the first stage transition)
- SSE connection closes automatically after
done/failed Last-Event-IDreconnect replays the last cached event- Existing
GET /api/v1/jobs/:idREST endpoint still works (no breaking change)
Stage Detail
stagecolumn inindexing_jobsreflects the current pipeline stage- UI shows stage label next to the progress bar
- Stage transitions:
queued → differential → crawling → cloning → parsing → storing → embedding → done
UI Responsiveness
- Navigating between pages while indexing is in progress has < 200 ms response time
- MCP
query-docscalls resolve correctly while indexing is running in parallel - No
SQLITE_BUSYerrors under concurrent indexing + read load
Implementation Order
- Schema migration — add
stageandstage_detailcolumns (non-breaking, backward-compatible defaults) - Worker entry point —
src/lib/server/pipeline/worker.ts— thin wrapper that receivesrunmessages and callsIndexingPipeline.run() - WorkerPool —
src/lib/server/pipeline/worker-pool.ts— pool management, message routing, crash recovery - ProgressBroadcaster —
src/lib/server/pipeline/progress-broadcaster.ts— in-memory SSE channel registry, last-event cache - SSE endpoints —
src/routes/api/v1/jobs/[id]/stream/+server.tsandsrc/routes/api/v1/jobs/stream/+server.ts - JobQueue update — replace
processNext's directpipeline.run()call withworkerPool.enqueue(jobId); enforce per-repo serialisation - Pipeline stage reporting — add
this.reportStage(stage, detail)calls at each stage boundary inIndexingPipeline.run() - UI: SSE client on repository page — replace
$effectpoll withEventSource - UI: SSE client on admin/jobs page — replace
setIntervalwithEventSource - Settings UI — add concurrency slider to
/settings - Integration tests — parallel execution, crash recovery, SSE event sequence
Dedicated Embedding Worker
The embedding stage must not run inside the same Worker Thread as the crawl/parse/store pipeline. The reasons are structural:
Why a dedicated embedding worker
| Concern | Per-parse-worker model | Dedicated embedding worker |
|---|---|---|
| Memory | N × ~100 MB (model weights + WASM heap) per worker | 1 × ~100 MB regardless of concurrency |
| Model warm-up | Paid once per worker spawn; cold starts slow | Paid once at server startup |
| Batch size | Each worker batches only its own job's snippets | All in-flight jobs queue to one worker → larger batches → higher WASM throughput |
| Provider migration | Must update every worker | Update one file |
| API rate limiting | N parallel streams to the same API → N×rate-limit hits | One serial stream, naturally throttled |
With Xenova/all-MiniLM-L6-v2, the WASM model and weight files occupy ~90–120 MB of heap. Running three parse workers with embedded model loading costs ~300–360 MB of resident memory that can never be freed while the server is alive. A dedicated worker keeps that cost fixed at one instance.
Batch efficiency matters too: embedSnippets already uses BATCH_SIZE = 50. A single embedding worker receiving snippet batches from multiple concurrently completing parse jobs can saturate its WASM batch budget (50 texts) far more consistently than individual workers whose parse jobs complete asynchronously.
Architecture
The embedding worker is a separate, long-lived worker thread distinct from the parse worker pool:
┌─────────────────────────────────────────────────────────────┐
│ Main Thread │
│ │
│ WorkerPool (parse workers, concurrency N) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Worker 0 │ │ Worker 1 │ │ Worker N │ │
│ │ crawl │ │ crawl │ │ crawl │ │
│ │ diff │ │ diff │ │ diff │ │
│ │ parse │ │ parse │ │ parse │ │
│ │ store │ │ store │ │ store │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ notify │ notify │ notify │
│ └──────────────┴─────────────┘ │
│ │ (via main thread broadcast) │
│ ┌──────▼───────┐ │
│ │ Embed Worker │ ← single instance │
│ │ loads model once │
│ │ drains snippet_embeddings deficit │
│ │ writes embeddings to DB │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
Communication protocol
Parse workers do not send snippet content to the embedding worker over IPC — that would serialise potentially megabytes of text per job and negate the bandwidth savings of the deficit-drain pattern.
Instead, the existing findSnippetIdsMissingEmbeddings query is the handshake:
- Parse worker completes stage
storingand posts{ type: 'snippets-ready', repositoryId, versionId }to the main thread. - Main thread forwards this to the embedding worker.
- Embedding worker calls
findSnippetIdsMissingEmbeddings(repositoryId, versionId)on its own DB connection, then runsembedSnippets()as it does today. - Embedding worker posts
{ type: 'embed-progress', jobId, done, total }back to the main thread at each batch boundary. - Main thread routes this to the SSE broadcaster → UI updates the embedding progress slice.
This means:
- The embedding worker reads snippet text from the DB itself (no IPC serialisation of content).
- The model is loaded once, stays warm, and processes batches from all repositories in FIFO order.
- Parse workers are never blocked waiting for embeddings — they complete their job stages and exit immediately.
Embedding worker message contract
// Main → Embedding worker
type EmbedRequest =
| { type: 'embed'; jobId: string; repositoryId: string; versionId: string | null }
| { type: 'shutdown' };
// Embedding worker → Main
type EmbedResponse =
| { type: 'embed-progress'; jobId: string; done: number; total: number }
| { type: 'embed-done'; jobId: string }
| { type: 'embed-failed'; jobId: string; error: string }
| { type: 'ready' }; // emitted once after model warm-up completes
The ready message allows the server startup sequence to defer routing any embed requests until the model is loaded, preventing a race on first-run.
Open Questions
- Should
cloneFromAncestor(TRUEREF-0021) remain synchronous on the parse worker, or be split into its owncloningstage with explicit SSE events and a progress count? Given that cloning is a bulk DB operation rather than a per-file loop, a single stage-transition event (stage: 'cloning', no per-row progress) is sufficient. - Does the
busy_timeout = 5000setting need to increase under high-concurrency parallel writes, or is 5 s sufficient? Empirically test withconcurrency = 4+ the embedding worker all writing simultaneously before increasing it. - Should the embedding worker support priority queueing — e.g. embedding the most recently completed parse job first — or is strict FIFO sufficient? FIFO is simpler and correct for the current use case.