Compare commits

...

2 Commits

Author SHA1 Message Date
Giancarmine Salucci
6297edf109 chore(TRUEREF-0022): fix lint errors and update architecture docs
- Fix 15 ESLint errors across pipeline workers, SSE endpoints, and UI
- Replace explicit any with proper entity types in worker entries
- Remove unused imports and variables (basename, SSEEvent, getBroadcasterFn, seedRules)
- Use empty catch clauses instead of unused error variables
- Use SvelteSet for reactive Set state in repository page
- Fix operator precedence in nullish coalescing expression
- Replace $state+$effect with $derived for concurrency input
- Use resolve() directly in href for navigation lint rule
- Update ARCHITECTURE.md and FINDINGS.md for worker-thread architecture
2026-03-30 17:28:38 +02:00
Giancarmine Salucci
7630740403 feat(TRUEREF-0022): complete iteration 0 — worker-thread indexing, parallel jobs, SSE progress
- Move IndexingPipeline.run() into Worker Threads via WorkerPool
- Add dedicated embedding worker thread with single model instance
- Add stage/stageDetail columns to indexing_jobs schema
- Create ProgressBroadcaster for SSE channel management
- Add SSE endpoints: GET /api/v1/jobs/:id/stream, GET /api/v1/jobs/stream
- Replace UI polling with EventSource on repo detail and admin pages
- Add concurrency settings UI and API endpoint
- Build worker entries separately via esbuild
2026-03-30 17:08:23 +02:00
33 changed files with 2726 additions and 1042 deletions

View File

@@ -1,15 +1,16 @@
# Architecture
Last Updated: 2026-03-27T00:24:13.000Z
Last Updated: 2026-03-30T00:00:00.000Z
## Overview
TrueRef is a TypeScript-first, self-hosted documentation retrieval platform built on SvelteKit. The repository contains a Node-targeted web application, a REST API, a Model Context Protocol server, and a server-side indexing pipeline backed by SQLite via better-sqlite3 and Drizzle ORM.
TrueRef is a TypeScript-first, self-hosted documentation retrieval platform built on SvelteKit. The repository contains a Node-targeted web application, a REST API, a Model Context Protocol server, and a multi-threaded server-side indexing pipeline backed by SQLite via better-sqlite3 and Drizzle ORM.
- Primary language: TypeScript (110 files) with a small amount of JavaScript configuration (2 files)
- Application type: Full-stack SvelteKit application with server-side indexing and retrieval services
- Primary language: TypeScript (141 files) with a small amount of JavaScript configuration (2 files)
- Application type: Full-stack SvelteKit application with worker-threaded indexing and retrieval services
- Runtime framework: SvelteKit with adapter-node
- Storage: SQLite with Drizzle-managed schema plus hand-written FTS5 setup
- Storage: SQLite (WAL mode) with Drizzle-managed schema plus hand-written FTS5 setup
- Concurrency: Node.js worker_threads for parse and embedding work
- Testing: Vitest with separate client and server projects
## Project Structure
@@ -25,7 +26,7 @@ TrueRef is a TypeScript-first, self-hosted documentation retrieval platform buil
### src/routes
Contains the UI entry points and API routes. The API tree under src/routes/api/v1 is the public HTTP contract for repository management, indexing jobs, search/context retrieval, settings, filesystem browsing, and JSON schema discovery.
Contains the UI entry points and API routes. The API tree under src/routes/api/v1 is the public HTTP contract for repository management, indexing jobs, search/context retrieval, settings, filesystem browsing, JSON schema discovery, real-time SSE progress streaming, and job control (pause/resume/cancel).
### src/lib/server/db
@@ -33,7 +34,15 @@ Owns SQLite schema definitions, migration bootstrapping, and FTS initialization.
### src/lib/server/pipeline
Coordinates crawl, parse, chunk, store, and optional embedding generation work. Startup recovery marks stale jobs as failed, resets repositories stuck in indexing state, initializes singleton queue/pipeline instances, and drains queued work after restart.
Coordinates crawl, parse, chunk, store, and optional embedding generation work using a worker thread pool. The pipeline module consists of:
- **WorkerPool** (`worker-pool.ts`): Manages a configurable number of Node.js `worker_threads` for parse jobs and an optional dedicated embed worker. Dispatches jobs round-robin to idle workers, enforces per-repository serialisation (one active job per repo), auto-respawns crashed workers, and supports runtime concurrency adjustment via `setMaxConcurrency()`. Falls back to main-thread execution when worker scripts are not found.
- **Parse worker** (`worker-entry.ts`): Runs in a worker thread. Opens its own `better-sqlite3` connection (WAL mode, `busy_timeout = 5000`), constructs a local `IndexingPipeline` instance, and processes jobs by posting `progress`, `done`, or `failed` messages back to the parent.
- **Embed worker** (`embed-worker-entry.ts`): Dedicated worker for embedding generation. Loads the embedding profile from the database, creates an `EmbeddingService`, and processes embed requests after the parse worker finishes a job.
- **ProgressBroadcaster** (`progress-broadcaster.ts`): Server-side pub/sub for real-time SSE streaming. Supports per-job, per-repository, and global subscriptions. Caches the last event per job for reconnect support.
- **Worker types** (`worker-types.ts`): Shared TypeScript discriminated union types for `ParseWorkerRequest`/`ParseWorkerResponse` and `EmbedWorkerRequest`/`EmbedWorkerResponse` message protocols.
- **Startup** (`startup.ts`): Recovers stale jobs, constructs singleton `JobQueue`, `IndexingPipeline`, `WorkerPool`, and `ProgressBroadcaster` instances, reads concurrency settings from the database, and drains queued work after restart.
- **JobQueue** (`job-queue.ts`): SQLite-backed queue that delegates to the `WorkerPool` when available, with pause/resume/cancel support.
### src/lib/server/search
@@ -49,16 +58,18 @@ Provides a thin compatibility layer over the HTTP API. The MCP server exposes re
## Design Patterns
- No explicit design patterns detected from semantic analysis.
- The implementation does consistently use service classes such as RepositoryService, SearchService, and HybridSearchService for business logic.
- Mapping and entity layers separate raw database rows from domain objects through mapper/entity pairs such as RepositoryMapper and RepositoryEntity.
- Pipeline startup uses module-level singleton state for JobQueue and IndexingPipeline lifecycle management.
- The WorkerPool implements an **observer/callback pattern**: the pool owner provides `onProgress`, `onJobDone`, `onJobFailed`, `onEmbedDone`, and `onEmbedFailed` callbacks at construction time, and the pool invokes them when workers post messages.
- ProgressBroadcaster implements a **pub/sub pattern** with three subscription tiers (per-job, per-repository, global) and last-event caching for SSE reconnect.
- The implementation consistently uses **service classes** such as RepositoryService, SearchService, and HybridSearchService for business logic.
- Mapping and entity layers separate raw database rows from domain objects through **mapper/entity pairs** such as RepositoryMapper and RepositoryEntity.
- Pipeline startup uses **module-level singletons** for JobQueue, IndexingPipeline, WorkerPool, and ProgressBroadcaster lifecycle management, with accessor functions (getQueue, getPool, getBroadcaster) for route handlers.
- Worker message protocols use **TypeScript discriminated unions** (`type` field) for type-safe worker ↔ parent communication.
## Key Components
### SvelteKit server bootstrap
src/hooks.server.ts initializes the database, loads persisted embedding configuration, creates the optional EmbeddingService, starts the indexing pipeline, and applies CORS headers to all /api routes.
src/hooks.server.ts initializes the database, loads persisted embedding configuration, creates the optional EmbeddingService, reads indexing concurrency settings from the database, starts the indexing pipeline with WorkerPool and ProgressBroadcaster via `initializePipeline(db, embeddingService, { concurrency, dbPath })`, and applies CORS headers to all /api routes.
### Database layer
@@ -80,6 +91,22 @@ src/lib/server/services/repository.service.ts provides CRUD and statistics for i
src/mcp/index.ts creates the MCP server, registers the two supported tools, and exposes them over stdio or streamable HTTP.
### Worker thread pool
src/lib/server/pipeline/worker-pool.ts manages a pool of Node.js worker threads. Parse workers run the full crawl → parse → store pipeline inside isolated threads with their own better-sqlite3 connections (WAL mode enables concurrent readers). An optional embed worker handles embedding generation in a separate thread. The pool enforces per-repository serialisation, auto-respawns crashed workers, and supports runtime concurrency changes persisted through the settings table.
### SSE streaming
src/lib/server/pipeline/progress-broadcaster.ts provides real-time Server-Sent Event streaming of indexing progress. Route handlers in src/routes/api/v1/jobs/stream and src/routes/api/v1/jobs/[id]/stream expose SSE endpoints. The broadcaster supports per-job, per-repository, and global subscriptions, with last-event caching for reconnect via the `Last-Event-ID` header.
### Job control
src/routes/api/v1/jobs/[id]/pause, resume, and cancel endpoints allow runtime control of indexing jobs. The JobQueue supports pause/resume/cancel state transitions persisted to SQLite.
### Indexing settings
src/routes/api/v1/settings/indexing exposes GET and PUT for indexing concurrency. PUT validates and clamps the value to `max(cpus - 1, 1)`, persists it to the settings table, and live-updates the WorkerPool via `setMaxConcurrency()`.
## Dependencies
### Production
@@ -93,6 +120,7 @@ src/mcp/index.ts creates the MCP server, registers the two supported tools, and
- @sveltejs/kit and @sveltejs/adapter-node: application framework and Node deployment target
- drizzle-kit and drizzle-orm: schema management and typed database access
- esbuild: worker thread entry point bundling (build/workers/)
- vite and @tailwindcss/vite: bundling and Tailwind integration
- vitest and @vitest/browser-playwright: server and browser test execution
- eslint, typescript-eslint, eslint-plugin-svelte, prettier, prettier-plugin-svelte, prettier-plugin-tailwindcss: linting and formatting
@@ -116,12 +144,13 @@ The frontend and backend share the same SvelteKit repository, but most non-UI be
### Indexing flow
1. Server startup runs initializeDatabase() and initializePipeline() from src/hooks.server.ts.
2. The pipeline recovers stale jobs, initializes crawler/parser infrastructure, and resumes queued work.
3. Crawlers ingest GitHub or local repository contents.
4. Parsers split files into document and snippet records with token counts and metadata.
5. Database modules persist repositories, documents, snippets, versions, configs, and job state.
6. If an embedding provider is configured, embedding services generate vectors for snippet search.
1. Server startup runs initializeDatabase() and initializePipeline() from src/hooks.server.ts, which creates the WorkerPool, ProgressBroadcaster, and JobQueue singletons.
2. The pipeline recovers stale jobs (marks running → failed, indexing → error), reads concurrency settings, and resumes queued work.
3. When a job is enqueued, the JobQueue delegates to the WorkerPool, which dispatches work to an idle parse worker thread.
4. Each parse worker opens its own better-sqlite3 connection (WAL mode) and runs the full crawl → parse → store pipeline, posting progress messages back to the parent thread.
5. The parent thread updates job progress in the database and broadcasts SSE events through the ProgressBroadcaster.
6. On parse completion, if an embedding provider is configured, the WorkerPool enqueues an embed request to the dedicated embed worker, which generates vectors in its own thread.
7. Job control endpoints allow pausing, resuming, or cancelling jobs at runtime.
### Retrieval flow
@@ -135,7 +164,8 @@ The frontend and backend share the same SvelteKit repository, but most non-UI be
## Build System
- Build command: npm run build
- Build command: npm run build (runs `vite build` then `node scripts/build-workers.mjs`)
- Worker bundling: scripts/build-workers.mjs uses esbuild to compile worker-entry.ts and embed-worker-entry.ts into build/workers/ as ESM bundles (.mjs), with $lib path aliases resolved and better-sqlite3/@xenova/transformers marked external
- Test command: npm run test
- Primary local run command from package.json: npm run dev
- MCP entry points: npm run mcp:start and npm run mcp:http

View File

@@ -1,25 +1,29 @@
# Findings
Last Updated: 2026-03-27T00:24:13.000Z
Last Updated: 2026-03-30T00:00:00.000Z
## Initializer Summary
- JIRA: FEEDBACK-0001
- JIRA: TRUEREF-0022
- Refresh mode: REFRESH_IF_REQUIRED
- Result: refreshed affected documentation only. ARCHITECTURE.md and FINDINGS.md were updated from current repository analysis; CODE_STYLE.md remained trusted and unchanged because the documented conventions still match the codebase.
- Result: Refreshed ARCHITECTURE.md and FINDINGS.md. CODE_STYLE.md remained trusted — new worker thread code follows established conventions.
## Research Performed
- Discovered source-language distribution, dependency manifest, import patterns, and project structure.
- Read the retrieval, formatter, token-budget, parser, mapper, and response-model modules affected by the latest implementation changes.
- Compared the trusted cache state with current behavior to identify which documentation files were actually stale.
- Confirmed package scripts for build and test.
- Confirmed Linux-native md5sum availability for documentation trust metadata.
- Discovered 141 TypeScript/JavaScript source files (up from 110), with new pipeline worker, broadcaster, and SSE endpoint files.
- Read worker-pool.ts, worker-entry.ts, embed-worker-entry.ts, worker-types.ts, progress-broadcaster.ts, startup.ts, job-queue.ts to understand the new worker thread architecture.
- Read SSE endpoints (jobs/stream, jobs/[id]/stream) and job control endpoints (pause, resume, cancel).
- Read indexing settings endpoint and hooks.server.ts to verify startup wiring changes.
- Read build-workers.mjs and package.json to verify build system and dependency changes.
- Compared trusted cache state with current codebase to identify ARCHITECTURE.md as stale.
- Confirmed CODE_STYLE.md conventions still match the codebase — new code uses PascalCase classes, camelCase functions, tab indentation, ESM imports, and TypeScript discriminated unions consistent with existing style.
## Open Questions For Planner
- Verify whether the retrieval response contract should document the new repository and version metadata fields formally in a public API reference beyond the architecture summary.
- Verify whether parser chunking should evolve further from file-level and declaration-level boundaries to member-level semantic chunks for class-heavy codebases.
- Verify whether the SSE streaming contract (event names, data shapes) should be documented in a dedicated API reference for external consumers.
- Assess whether the WorkerPool fallback mode (main-thread execution when worker scripts are missing) needs explicit test coverage or should be removed in favour of a hard build requirement.
## Planner Notes Template

View File

@@ -5,7 +5,7 @@
"type": "module",
"scripts": {
"dev": "vite dev",
"build": "vite build",
"build": "vite build && node scripts/build-workers.mjs",
"preview": "vite preview",
"prepare": "svelte-kit sync || echo ''",
"check": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json",
@@ -34,6 +34,7 @@
"@vitest/browser-playwright": "^4.1.0",
"drizzle-kit": "^0.31.8",
"drizzle-orm": "^0.45.1",
"esbuild": "^0.24.0",
"eslint": "^9.39.2",
"eslint-config-prettier": "^10.1.8",
"eslint-plugin-svelte": "^3.14.0",

38
scripts/build-workers.mjs Normal file
View File

@@ -0,0 +1,38 @@
import * as esbuild from 'esbuild';
import { existsSync } from 'node:fs';
const entries = [
'src/lib/server/pipeline/worker-entry.ts',
'src/lib/server/pipeline/embed-worker-entry.ts'
];
try {
const existing = entries.filter(e => existsSync(e));
if (existing.length === 0) {
console.log('[build-workers] No worker entry files found yet, skipping.');
process.exit(0);
}
await esbuild.build({
entryPoints: existing,
bundle: true,
platform: 'node',
target: 'node20',
format: 'esm',
outdir: 'build/workers',
outExtension: { '.js': '.mjs' },
alias: {
'$lib': './src/lib',
'$lib/server': './src/lib/server'
},
external: ['better-sqlite3', '@xenova/transformers'],
banner: {
js: "import { createRequire } from 'module'; const require = createRequire(import.meta.url);"
}
});
console.log(`[build-workers] Compiled ${existing.length} worker(s) to build/workers/`);
} catch (err) {
console.error('[build-workers] Error:', err);
process.exit(1);
}

View File

@@ -47,7 +47,29 @@ try {
embeddingService = new EmbeddingService(db, provider, activeProfile.id);
}
initializePipeline(db, embeddingService);
// Read database path from environment
const dbPath = process.env.DATABASE_URL;
// Read indexing concurrency setting from database
let concurrency = 2; // default
if (dbPath) {
const concurrencyRow = db
.prepare<[], { value: string }>(
"SELECT value FROM settings WHERE key = 'indexing.concurrency' LIMIT 1"
)
.get();
if (concurrencyRow) {
try {
const parsed = JSON.parse(concurrencyRow.value);
concurrency = parsed.value ?? 2;
} catch {
// If parsing fails, use default
concurrency = 2;
}
}
}
initializePipeline(db, embeddingService, { concurrency, dbPath });
console.log('[hooks.server] Indexing pipeline initialised.');
} catch (err) {
console.error(

View File

@@ -1,5 +1,5 @@
<script lang="ts">
import { resolve as resolveRoute } from '$app/paths';
import { resolve } from '$app/paths';
type RepositoryCardRepo = {
id: string;
@@ -38,10 +38,6 @@
error: 'Error'
};
const detailsHref = $derived(
resolveRoute('/repos/[id]', { id: encodeURIComponent(repo.id) })
);
const totalSnippets = $derived(repo.totalSnippets ?? 0);
const trustScore = $derived(repo.trustScore ?? 0);
const embeddingCount = $derived(repo.embeddingCount ?? 0);
@@ -112,7 +108,7 @@
{repo.state === 'indexing' ? 'Indexing...' : 'Re-index'}
</button>
<a
href={detailsHref}
href={resolve('/repos/[id]', { id: encodeURIComponent(repo.id) })}
class="rounded-lg border border-gray-200 px-3 py-1.5 text-sm text-gray-700 hover:bg-gray-50"
>
Details

View File

@@ -0,0 +1,23 @@
PRAGMA foreign_keys=OFF;--> statement-breakpoint
CREATE TABLE `__new_repository_configs` (
`repository_id` text NOT NULL,
`version_id` text,
`project_title` text,
`description` text,
`folders` text,
`exclude_folders` text,
`exclude_files` text,
`rules` text,
`previous_versions` text,
`updated_at` integer NOT NULL,
FOREIGN KEY (`repository_id`) REFERENCES `repositories`(`id`) ON UPDATE no action ON DELETE cascade
);
--> statement-breakpoint
INSERT INTO `__new_repository_configs`("repository_id", "version_id", "project_title", "description", "folders", "exclude_folders", "exclude_files", "rules", "previous_versions", "updated_at") SELECT "repository_id", "version_id", "project_title", "description", "folders", "exclude_folders", "exclude_files", "rules", "previous_versions", "updated_at" FROM `repository_configs`;--> statement-breakpoint
DROP TABLE `repository_configs`;--> statement-breakpoint
ALTER TABLE `__new_repository_configs` RENAME TO `repository_configs`;--> statement-breakpoint
PRAGMA foreign_keys=ON;--> statement-breakpoint
CREATE UNIQUE INDEX `uniq_repo_config_base` ON `repository_configs` (`repository_id`) WHERE "repository_configs"."version_id" IS NULL;--> statement-breakpoint
CREATE UNIQUE INDEX `uniq_repo_config_version` ON `repository_configs` (`repository_id`,`version_id`) WHERE "repository_configs"."version_id" IS NOT NULL;--> statement-breakpoint
ALTER TABLE `indexing_jobs` ADD `stage` text DEFAULT 'queued' NOT NULL;--> statement-breakpoint
ALTER TABLE `indexing_jobs` ADD `stage_detail` text;

View File

@@ -1,835 +0,0 @@
{
"version": "6",
"dialect": "sqlite",
"id": "a7c2e4f8-3b1d-4e9a-8f0c-6d5e2a1b9c7f",
"prevId": "31531dab-a199-4fc5-a889-1884940039cd",
"tables": {
"documents": {
"name": "documents",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"version_id": {
"name": "version_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"file_path": {
"name": "file_path",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"language": {
"name": "language",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"token_count": {
"name": "token_count",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"checksum": {
"name": "checksum",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"indexed_at": {
"name": "indexed_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"documents_repository_id_repositories_id_fk": {
"name": "documents_repository_id_repositories_id_fk",
"tableFrom": "documents",
"tableTo": "repositories",
"columnsFrom": ["repository_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
},
"documents_version_id_repository_versions_id_fk": {
"name": "documents_version_id_repository_versions_id_fk",
"tableFrom": "documents",
"tableTo": "repository_versions",
"columnsFrom": ["version_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"embedding_profiles": {
"name": "embedding_profiles",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"provider_kind": {
"name": "provider_kind",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"enabled": {
"name": "enabled",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": true
},
"is_default": {
"name": "is_default",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": false
},
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"dimensions": {
"name": "dimensions",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"config": {
"name": "config",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"indexing_jobs": {
"name": "indexing_jobs",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"version_id": {
"name": "version_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'queued'"
},
"progress": {
"name": "progress",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"total_files": {
"name": "total_files",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"processed_files": {
"name": "processed_files",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"error": {
"name": "error",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"started_at": {
"name": "started_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"completed_at": {
"name": "completed_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"indexing_jobs_repository_id_repositories_id_fk": {
"name": "indexing_jobs_repository_id_repositories_id_fk",
"tableFrom": "indexing_jobs",
"tableTo": "repositories",
"columnsFrom": ["repository_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"repositories": {
"name": "repositories",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"source": {
"name": "source",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"source_url": {
"name": "source_url",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"branch": {
"name": "branch",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": "'main'"
},
"state": {
"name": "state",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'pending'"
},
"total_snippets": {
"name": "total_snippets",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"total_tokens": {
"name": "total_tokens",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"trust_score": {
"name": "trust_score",
"type": "real",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"benchmark_score": {
"name": "benchmark_score",
"type": "real",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"stars": {
"name": "stars",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"github_token": {
"name": "github_token",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"last_indexed_at": {
"name": "last_indexed_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"repository_configs": {
"name": "repository_configs",
"columns": {
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"version_id": {
"name": "version_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"project_title": {
"name": "project_title",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"folders": {
"name": "folders",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"exclude_folders": {
"name": "exclude_folders",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"exclude_files": {
"name": "exclude_files",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"rules": {
"name": "rules",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"previous_versions": {
"name": "previous_versions",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {
"uniq_repo_config_base": {
"name": "uniq_repo_config_base",
"columns": ["repository_id"],
"isUnique": true,
"where": "`version_id` IS NULL"
},
"uniq_repo_config_version": {
"name": "uniq_repo_config_version",
"columns": ["repository_id", "version_id"],
"isUnique": true,
"where": "`version_id` IS NOT NULL"
}
},
"foreignKeys": {
"repository_configs_repository_id_repositories_id_fk": {
"name": "repository_configs_repository_id_repositories_id_fk",
"tableFrom": "repository_configs",
"tableTo": "repositories",
"columnsFrom": ["repository_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"repository_versions": {
"name": "repository_versions",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"tag": {
"name": "tag",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"commit_hash": {
"name": "commit_hash",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"state": {
"name": "state",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'pending'"
},
"total_snippets": {
"name": "total_snippets",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"indexed_at": {
"name": "indexed_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"repository_versions_repository_id_repositories_id_fk": {
"name": "repository_versions_repository_id_repositories_id_fk",
"tableFrom": "repository_versions",
"tableTo": "repositories",
"columnsFrom": ["repository_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"settings": {
"name": "settings",
"columns": {
"key": {
"name": "key",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"value": {
"name": "value",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"snippet_embeddings": {
"name": "snippet_embeddings",
"columns": {
"snippet_id": {
"name": "snippet_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"profile_id": {
"name": "profile_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"dimensions": {
"name": "dimensions",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"embedding": {
"name": "embedding",
"type": "blob",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"snippet_embeddings_snippet_id_snippets_id_fk": {
"name": "snippet_embeddings_snippet_id_snippets_id_fk",
"tableFrom": "snippet_embeddings",
"tableTo": "snippets",
"columnsFrom": ["snippet_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
},
"snippet_embeddings_profile_id_embedding_profiles_id_fk": {
"name": "snippet_embeddings_profile_id_embedding_profiles_id_fk",
"tableFrom": "snippet_embeddings",
"tableTo": "embedding_profiles",
"columnsFrom": ["profile_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {
"snippet_embeddings_snippet_id_profile_id_pk": {
"columns": ["snippet_id", "profile_id"],
"name": "snippet_embeddings_snippet_id_profile_id_pk"
}
},
"uniqueConstraints": {},
"checkConstraints": {}
},
"snippets": {
"name": "snippets",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"document_id": {
"name": "document_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"version_id": {
"name": "version_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"type": {
"name": "type",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"content": {
"name": "content",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"language": {
"name": "language",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"breadcrumb": {
"name": "breadcrumb",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"token_count": {
"name": "token_count",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"snippets_document_id_documents_id_fk": {
"name": "snippets_document_id_documents_id_fk",
"tableFrom": "snippets",
"tableTo": "documents",
"columnsFrom": ["document_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
},
"snippets_repository_id_repositories_id_fk": {
"name": "snippets_repository_id_repositories_id_fk",
"tableFrom": "snippets",
"tableTo": "repositories",
"columnsFrom": ["repository_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
},
"snippets_version_id_repository_versions_id_fk": {
"name": "snippets_version_id_repository_versions_id_fk",
"tableFrom": "snippets",
"tableTo": "repository_versions",
"columnsFrom": ["version_id"],
"columnsTo": ["id"],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
}
},
"views": {},
"enums": {},
"schemas": {},
"sequences": {},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
},
"internal": {
"indexes": {}
}
}

View File

@@ -0,0 +1,896 @@
{
"version": "6",
"dialect": "sqlite",
"id": "c326dcbe-1771-4a90-a566-0ebd1eca47ec",
"prevId": "31531dab-a199-4fc5-a889-1884940039cd",
"tables": {
"documents": {
"name": "documents",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"version_id": {
"name": "version_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"file_path": {
"name": "file_path",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"language": {
"name": "language",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"token_count": {
"name": "token_count",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"checksum": {
"name": "checksum",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"indexed_at": {
"name": "indexed_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"documents_repository_id_repositories_id_fk": {
"name": "documents_repository_id_repositories_id_fk",
"tableFrom": "documents",
"tableTo": "repositories",
"columnsFrom": [
"repository_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"documents_version_id_repository_versions_id_fk": {
"name": "documents_version_id_repository_versions_id_fk",
"tableFrom": "documents",
"tableTo": "repository_versions",
"columnsFrom": [
"version_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"embedding_profiles": {
"name": "embedding_profiles",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"provider_kind": {
"name": "provider_kind",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"enabled": {
"name": "enabled",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": true
},
"is_default": {
"name": "is_default",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": false
},
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"dimensions": {
"name": "dimensions",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"config": {
"name": "config",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"indexing_jobs": {
"name": "indexing_jobs",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"version_id": {
"name": "version_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"status": {
"name": "status",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'queued'"
},
"progress": {
"name": "progress",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"total_files": {
"name": "total_files",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"processed_files": {
"name": "processed_files",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"stage": {
"name": "stage",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'queued'"
},
"stage_detail": {
"name": "stage_detail",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"error": {
"name": "error",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"started_at": {
"name": "started_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"completed_at": {
"name": "completed_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"indexing_jobs_repository_id_repositories_id_fk": {
"name": "indexing_jobs_repository_id_repositories_id_fk",
"tableFrom": "indexing_jobs",
"tableTo": "repositories",
"columnsFrom": [
"repository_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"repositories": {
"name": "repositories",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"source": {
"name": "source",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"source_url": {
"name": "source_url",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"branch": {
"name": "branch",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": "'main'"
},
"state": {
"name": "state",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'pending'"
},
"total_snippets": {
"name": "total_snippets",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"total_tokens": {
"name": "total_tokens",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"trust_score": {
"name": "trust_score",
"type": "real",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"benchmark_score": {
"name": "benchmark_score",
"type": "real",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"stars": {
"name": "stars",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"github_token": {
"name": "github_token",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"last_indexed_at": {
"name": "last_indexed_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"repository_configs": {
"name": "repository_configs",
"columns": {
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"version_id": {
"name": "version_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"project_title": {
"name": "project_title",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"description": {
"name": "description",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"folders": {
"name": "folders",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"exclude_folders": {
"name": "exclude_folders",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"exclude_files": {
"name": "exclude_files",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"rules": {
"name": "rules",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"previous_versions": {
"name": "previous_versions",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {
"uniq_repo_config_base": {
"name": "uniq_repo_config_base",
"columns": [
"repository_id"
],
"isUnique": true,
"where": "\"repository_configs\".\"version_id\" IS NULL"
},
"uniq_repo_config_version": {
"name": "uniq_repo_config_version",
"columns": [
"repository_id",
"version_id"
],
"isUnique": true,
"where": "\"repository_configs\".\"version_id\" IS NOT NULL"
}
},
"foreignKeys": {
"repository_configs_repository_id_repositories_id_fk": {
"name": "repository_configs_repository_id_repositories_id_fk",
"tableFrom": "repository_configs",
"tableTo": "repositories",
"columnsFrom": [
"repository_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"repository_versions": {
"name": "repository_versions",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"tag": {
"name": "tag",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"commit_hash": {
"name": "commit_hash",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"state": {
"name": "state",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false,
"default": "'pending'"
},
"total_snippets": {
"name": "total_snippets",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"indexed_at": {
"name": "indexed_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"repository_versions_repository_id_repositories_id_fk": {
"name": "repository_versions_repository_id_repositories_id_fk",
"tableFrom": "repository_versions",
"tableTo": "repositories",
"columnsFrom": [
"repository_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"settings": {
"name": "settings",
"columns": {
"key": {
"name": "key",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"value": {
"name": "value",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"updated_at": {
"name": "updated_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"snippet_embeddings": {
"name": "snippet_embeddings",
"columns": {
"snippet_id": {
"name": "snippet_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"profile_id": {
"name": "profile_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"model": {
"name": "model",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"dimensions": {
"name": "dimensions",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"embedding": {
"name": "embedding",
"type": "blob",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"snippet_embeddings_snippet_id_snippets_id_fk": {
"name": "snippet_embeddings_snippet_id_snippets_id_fk",
"tableFrom": "snippet_embeddings",
"tableTo": "snippets",
"columnsFrom": [
"snippet_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"snippet_embeddings_profile_id_embedding_profiles_id_fk": {
"name": "snippet_embeddings_profile_id_embedding_profiles_id_fk",
"tableFrom": "snippet_embeddings",
"tableTo": "embedding_profiles",
"columnsFrom": [
"profile_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {
"snippet_embeddings_snippet_id_profile_id_pk": {
"columns": [
"snippet_id",
"profile_id"
],
"name": "snippet_embeddings_snippet_id_profile_id_pk"
}
},
"uniqueConstraints": {},
"checkConstraints": {}
},
"snippets": {
"name": "snippets",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"document_id": {
"name": "document_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"repository_id": {
"name": "repository_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"version_id": {
"name": "version_id",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"type": {
"name": "type",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"title": {
"name": "title",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"content": {
"name": "content",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"language": {
"name": "language",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"breadcrumb": {
"name": "breadcrumb",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"token_count": {
"name": "token_count",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false,
"default": 0
},
"created_at": {
"name": "created_at",
"type": "integer",
"primaryKey": false,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {
"snippets_document_id_documents_id_fk": {
"name": "snippets_document_id_documents_id_fk",
"tableFrom": "snippets",
"tableTo": "documents",
"columnsFrom": [
"document_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"snippets_repository_id_repositories_id_fk": {
"name": "snippets_repository_id_repositories_id_fk",
"tableFrom": "snippets",
"tableTo": "repositories",
"columnsFrom": [
"repository_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
},
"snippets_version_id_repository_versions_id_fk": {
"name": "snippets_version_id_repository_versions_id_fk",
"tableFrom": "snippets",
"tableTo": "repository_versions",
"columnsFrom": [
"version_id"
],
"columnsTo": [
"id"
],
"onDelete": "cascade",
"onUpdate": "no action"
}
},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
}
},
"views": {},
"enums": {},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
},
"internal": {
"indexes": {}
}
}

View File

@@ -1,34 +1,41 @@
{
"version": "7",
"dialect": "sqlite",
"entries": [
{
"idx": 0,
"version": "6",
"when": 1774196053634,
"tag": "0000_large_master_chief",
"breakpoints": true
},
{
"idx": 1,
"version": "6",
"when": 1774448049161,
"tag": "0001_quick_nighthawk",
"breakpoints": true
},
{
"idx": 2,
"version": "6",
"when": 1774461897742,
"tag": "0002_silky_stellaris",
"breakpoints": true
},
{
"idx": 3,
"version": "6",
"when": 1743155877000,
"tag": "0003_multiversion_config",
"breakpoints": true
}
]
}
"version": "7",
"dialect": "sqlite",
"entries": [
{
"idx": 0,
"version": "6",
"when": 1774196053634,
"tag": "0000_large_master_chief",
"breakpoints": true
},
{
"idx": 1,
"version": "6",
"when": 1774448049161,
"tag": "0001_quick_nighthawk",
"breakpoints": true
},
{
"idx": 2,
"version": "6",
"when": 1774461897742,
"tag": "0002_silky_stellaris",
"breakpoints": true
},
{
"idx": 3,
"version": "6",
"when": 1743155877000,
"tag": "0003_multiversion_config",
"breakpoints": true
},
{
"idx": 4,
"version": "6",
"when": 1774880275833,
"tag": "0004_complete_sentry",
"breakpoints": true
}
]
}

View File

@@ -148,6 +148,8 @@ export const indexingJobs = sqliteTable('indexing_jobs', {
progress: integer('progress').default(0), // 0100
totalFiles: integer('total_files').default(0),
processedFiles: integer('processed_files').default(0),
stage: text('stage', { enum: ['queued', 'differential', 'crawling', 'cloning', 'parsing', 'storing', 'embedding', 'done', 'failed'] }).notNull().default('queued'),
stageDetail: text('stage_detail'),
error: text('error'),
startedAt: integer('started_at', { mode: 'timestamp' }),
completedAt: integer('completed_at', { mode: 'timestamp' }),

View File

@@ -10,6 +10,8 @@ export class IndexingJobMapper {
progress: entity.progress,
totalFiles: entity.total_files,
processedFiles: entity.processed_files,
stage: entity.stage,
stageDetail: entity.stage_detail,
error: entity.error,
startedAt: entity.started_at != null ? new Date(entity.started_at * 1000) : null,
completedAt: entity.completed_at != null ? new Date(entity.completed_at * 1000) : null,
@@ -26,6 +28,8 @@ export class IndexingJobMapper {
progress: domain.progress,
totalFiles: domain.totalFiles,
processedFiles: domain.processedFiles,
stage: domain.stage,
stageDetail: domain.stageDetail,
error: domain.error,
startedAt: domain.startedAt,
completedAt: domain.completedAt,

View File

@@ -6,6 +6,8 @@ export interface IndexingJobEntityProps {
progress: number;
total_files: number;
processed_files: number;
stage: string;
stage_detail: string | null;
error: string | null;
started_at: number | null;
completed_at: number | null;
@@ -20,6 +22,8 @@ export class IndexingJobEntity {
progress: number;
total_files: number;
processed_files: number;
stage: string;
stage_detail: string | null;
error: string | null;
started_at: number | null;
completed_at: number | null;
@@ -33,6 +37,8 @@ export class IndexingJobEntity {
this.progress = props.progress;
this.total_files = props.total_files;
this.processed_files = props.processed_files;
this.stage = props.stage;
this.stage_detail = props.stage_detail;
this.error = props.error;
this.started_at = props.started_at;
this.completed_at = props.completed_at;
@@ -48,6 +54,8 @@ export interface IndexingJobProps {
progress: number;
totalFiles: number;
processedFiles: number;
stage: string;
stageDetail: string | null;
error: string | null;
startedAt: Date | null;
completedAt: Date | null;
@@ -62,6 +70,8 @@ export class IndexingJob {
progress: number;
totalFiles: number;
processedFiles: number;
stage: string;
stageDetail: string | null;
error: string | null;
startedAt: Date | null;
completedAt: Date | null;
@@ -75,6 +85,8 @@ export class IndexingJob {
this.progress = props.progress;
this.totalFiles = props.totalFiles;
this.processedFiles = props.processedFiles;
this.stage = props.stage;
this.stageDetail = props.stageDetail;
this.error = props.error;
this.startedAt = props.startedAt;
this.completedAt = props.completedAt;
@@ -90,6 +102,8 @@ export interface IndexingJobDtoProps {
progress: number;
totalFiles: number;
processedFiles: number;
stage: string;
stageDetail: string | null;
error: string | null;
startedAt: Date | null;
completedAt: Date | null;
@@ -104,6 +118,8 @@ export class IndexingJobDto {
progress: number;
totalFiles: number;
processedFiles: number;
stage: string;
stageDetail: string | null;
error: string | null;
startedAt: Date | null;
completedAt: Date | null;
@@ -117,6 +133,8 @@ export class IndexingJobDto {
this.progress = props.progress;
this.totalFiles = props.totalFiles;
this.processedFiles = props.processedFiles;
this.stage = props.stage;
this.stageDetail = props.stageDetail;
this.error = props.error;
this.startedAt = props.startedAt;
this.completedAt = props.completedAt;

View File

@@ -0,0 +1,93 @@
import { workerData, parentPort } from 'node:worker_threads';
import Database from 'better-sqlite3';
import { EmbeddingService } from '$lib/server/embeddings/embedding.service.js';
import { createProviderFromProfile } from '$lib/server/embeddings/registry.js';
import { EmbeddingProfileMapper } from '$lib/server/mappers/embedding-profile.mapper.js';
import { EmbeddingProfileEntity, type EmbeddingProfileEntityProps } from '$lib/server/models/embedding-profile.js';
import type { EmbedWorkerRequest, EmbedWorkerResponse, WorkerInitData } from './worker-types.js';
const { dbPath, embeddingProfileId } = workerData as WorkerInitData;
if (!embeddingProfileId) {
parentPort!.postMessage({
type: 'embed-failed',
jobId: 'init',
error: 'embeddingProfileId is required in workerData'
} satisfies EmbedWorkerResponse);
process.exit(1);
}
const db = new Database(dbPath);
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
db.pragma('busy_timeout = 5000');
// Load the embedding profile from DB
const rawProfile = db.prepare('SELECT * FROM embedding_profiles WHERE id = ?').get(embeddingProfileId);
if (!rawProfile) {
db.close();
parentPort!.postMessage({
type: 'embed-failed',
jobId: 'init',
error: `Embedding profile ${embeddingProfileId} not found`
} satisfies EmbedWorkerResponse);
process.exit(1);
}
const profileEntity = new EmbeddingProfileEntity(rawProfile as EmbeddingProfileEntityProps);
const profile = EmbeddingProfileMapper.fromEntity(profileEntity);
// Create provider and embedding service
const provider = createProviderFromProfile(profile);
const embeddingService = new EmbeddingService(db, provider, embeddingProfileId);
// Signal ready after service initialization
parentPort!.postMessage({
type: 'ready'
} satisfies EmbedWorkerResponse);
parentPort!.on('message', async (msg: EmbedWorkerRequest) => {
if (msg.type === 'shutdown') {
db.close();
process.exit(0);
}
if (msg.type === 'embed') {
try {
const snippetIds = embeddingService.findSnippetIdsMissingEmbeddings(
msg.repositoryId,
msg.versionId
);
await embeddingService.embedSnippets(snippetIds, (done: number, total: number) => {
parentPort!.postMessage({
type: 'embed-progress',
jobId: msg.jobId,
done,
total
} satisfies EmbedWorkerResponse);
});
parentPort!.postMessage({
type: 'embed-done',
jobId: msg.jobId
} satisfies EmbedWorkerResponse);
} catch (err) {
parentPort!.postMessage({
type: 'embed-failed',
jobId: msg.jobId,
error: err instanceof Error ? err.message : String(err)
} satisfies EmbedWorkerResponse);
}
}
});
process.on('uncaughtException', (err) => {
parentPort!.postMessage({
type: 'embed-failed',
jobId: 'uncaught',
error: err instanceof Error ? err.message : String(err)
} satisfies EmbedWorkerResponse);
process.exit(1);
});

View File

@@ -28,7 +28,8 @@ function createTestDb(): Database.Database {
'0000_large_master_chief.sql',
'0001_quick_nighthawk.sql',
'0002_silky_stellaris.sql',
'0003_multiversion_config.sql'
'0003_multiversion_config.sql',
'0004_complete_sentry.sql'
]) {
const migrationSql = readFileSync(join(migrationsFolder, migrationFile), 'utf-8');

View File

@@ -15,7 +15,7 @@
import { createHash, randomUUID } from 'node:crypto';
import type Database from 'better-sqlite3';
import type { Document, NewDocument, NewSnippet, TrueRefConfig } from '$lib/types';
import type { Document, NewDocument, NewSnippet, TrueRefConfig, IndexingStage } from '$lib/types';
import type { crawl as GithubCrawlFn } from '$lib/server/crawler/github.crawler.js';
import type { LocalCrawler } from '$lib/server/crawler/local.crawler.js';
import type { EmbeddingService } from '$lib/server/embeddings/embedding.service.js';
@@ -74,7 +74,16 @@ export class IndexingPipeline {
// Public — run a job end to end
// -------------------------------------------------------------------------
async run(job: IndexingJob): Promise<void> {
async run(
job: IndexingJob,
onStageChange?: (
stage: IndexingStage,
detail?: string,
progress?: number,
processedFiles?: number,
totalFiles?: number
) => void
): Promise<void> {
// better-sqlite3 raw queries return snake_case keys; Drizzle types use camelCase.
// Accept both so the pipeline works when called from raw SQL contexts.
const raw = job as unknown as Record<string, unknown>;
@@ -84,6 +93,18 @@ export class IndexingPipeline {
// Rebuild a normalised job view for the rest of this method.
const normJob = { ...job, repositoryId, versionId };
// Helper to report stage transitions and invoke optional callback.
const reportStage = (
stage: IndexingStage,
detail?: string,
progress?: number,
processed?: number,
total?: number
) => {
this.updateJob(job.id, { stage, stageDetail: detail ?? null });
onStageChange?.(stage, detail, progress, processed, total);
};
this.updateJob(job.id, { status: 'running', startedAt: Math.floor(Date.now() / 1000) });
try {
@@ -105,6 +126,7 @@ export class IndexingPipeline {
// from an already-indexed ancestor version instead of crawling everything.
let differentialPlan: DifferentialPlan | null = null;
if (normJob.versionId && versionTag) {
reportStage('differential');
differentialPlan = await buildDifferentialPlan({
repo,
targetTag: versionTag,
@@ -119,6 +141,7 @@ export class IndexingPipeline {
// If a differential plan exists, clone unchanged files from ancestor.
if (differentialPlan && differentialPlan.unchangedPaths.size > 0) {
reportStage('cloning');
this.cloneFromAncestor(
differentialPlan.ancestorVersionId,
normJob.versionId!,
@@ -132,6 +155,7 @@ export class IndexingPipeline {
// ---- Stage 1: Crawl -------------------------------------------------
// Pass changedPaths as allowlist so crawl only fetches/returns changed files.
reportStage('crawling');
const crawlAllowedPaths = differentialPlan ? differentialPlan.changedPaths : undefined;
const crawlResult = await this.crawl(repo, versionTag, crawlAllowedPaths);
@@ -219,6 +243,8 @@ export class IndexingPipeline {
// Lower = more responsive UI; higher = less overhead.
const YIELD_EVERY = 20;
reportStage('parsing', `0 / ${totalFiles} files`);
for (const [i, file] of filesToProcess.entries()) {
// Yield the Node.js event loop periodically so the HTTP server can
// handle incoming requests (navigation, polling) between file parses.
@@ -272,6 +298,7 @@ export class IndexingPipeline {
this.embeddingService !== null
);
this.updateJob(job.id, { processedFiles: totalProcessed, progress });
reportStage('parsing', `${totalProcessed} / ${totalFiles} files`, progress, totalProcessed, totalFiles);
}
}
@@ -279,10 +306,12 @@ export class IndexingPipeline {
processedFiles = diff.unchanged.length + filesToProcess.length;
// ---- Stage 3: Atomic replacement ------------------------------------
reportStage('storing');
this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets);
// ---- Stage 4: Embeddings (if provider is configured) ----------------
if (this.embeddingService) {
reportStage('embedding');
const snippetIds = this.embeddingService.findSnippetIdsMissingEmbeddings(
repo.id,
normJob.versionId
@@ -346,6 +375,7 @@ export class IndexingPipeline {
}
}
reportStage('done');
this.updateJob(job.id, {
status: 'done',
progress: 100,
@@ -355,6 +385,7 @@ export class IndexingPipeline {
const message = error instanceof Error ? error.message : String(error);
console.error(`[IndexingPipeline] Job ${job.id} failed: ${message}`);
reportStage('failed');
this.updateJob(job.id, {
status: 'failed',
error: message,

View File

@@ -9,7 +9,7 @@
import type Database from 'better-sqlite3';
import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js';
import { IndexingJob, IndexingJobEntity } from '$lib/server/models/indexing-job.js';
import type { IndexingPipeline } from './indexing.pipeline.js';
import type { WorkerPool } from './worker-pool.js';
// ---------------------------------------------------------------------------
// SQL projection + row mapper (mirrors repository.service.ts pattern)
@@ -18,16 +18,16 @@ import type { IndexingPipeline } from './indexing.pipeline.js';
const JOB_SELECT = `SELECT * FROM indexing_jobs`;
export class JobQueue {
private isRunning = false;
private pipeline: IndexingPipeline | null = null;
private workerPool: WorkerPool | null = null;
constructor(private readonly db: Database.Database) {}
/**
* Inject the pipeline dependency (avoids circular construction order).
* Inject the worker pool dependency (alternative to direct pipeline calling).
* When set, enqueue() will delegate to the pool instead of calling processNext().
*/
setPipeline(pipeline: IndexingPipeline): void {
this.pipeline = pipeline;
setWorkerPool(pool: WorkerPool): void {
this.workerPool = pool;
}
/**
@@ -50,7 +50,9 @@ export class JobQueue {
if (activeRaw) {
// Ensure the queue is draining even if enqueue was called concurrently.
if (!this.isRunning) setImmediate(() => this.processNext());
if (!this.workerPool) {
setImmediate(() => this.processNext());
}
return IndexingJobMapper.fromEntity(new IndexingJobEntity(activeRaw));
}
@@ -63,6 +65,8 @@ export class JobQueue {
progress: 0,
totalFiles: 0,
processedFiles: 0,
stage: 'queued',
stageDetail: null,
error: null,
startedAt: null,
completedAt: null,
@@ -73,8 +77,8 @@ export class JobQueue {
.prepare(
`INSERT INTO indexing_jobs
(id, repository_id, version_id, status, progress, total_files,
processed_files, error, started_at, completed_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
processed_files, stage, stage_detail, error, started_at, completed_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
)
.run(
job.id,
@@ -84,14 +88,18 @@ export class JobQueue {
job.progress,
job.totalFiles,
job.processedFiles,
job.stage,
job.stageDetail,
job.error,
job.startedAt,
job.completedAt,
now
);
// Kick off sequential processing if not already running.
if (!this.isRunning) {
// Delegate to worker pool if available, otherwise fall back to direct processing
if (this.workerPool) {
this.workerPool.enqueue(job.id, repositoryId, versionId ?? null);
} else {
setImmediate(() => this.processNext());
}
@@ -102,15 +110,13 @@ export class JobQueue {
}
/**
* Pick the oldest queued job and run it through the pipeline.
* Called recursively via setImmediate so the event loop stays unblocked.
* Pick the oldest queued job and run it through the pipeline directly.
* This is now a fallback method used only when no WorkerPool is set.
* Called via setImmediate so the event loop stays unblocked.
*/
private async processNext(): Promise<void> {
if (this.isRunning) return;
if (!this.pipeline) {
console.warn('[JobQueue] No pipeline configured — cannot process jobs.');
return;
}
// Fallback path: no worker pool configured, run directly (used by tests and dev mode)
console.warn('[JobQueue] Running in fallback mode (no worker pool) — direct pipeline execution.');
const rawJob = this.db
.prepare<[], IndexingJobEntity>(
@@ -122,26 +128,7 @@ export class JobQueue {
if (!rawJob) return;
const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob));
this.isRunning = true;
try {
await this.pipeline.run(job);
} catch (err) {
// Error is logged inside pipeline.run(); no action needed here.
console.error(
`[JobQueue] Job ${job.id} failed: ${err instanceof Error ? err.message : String(err)}`
);
} finally {
this.isRunning = false;
// Check whether another job was queued while this one ran.
const next = this.db
.prepare<[], { id: string }>(`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`)
.get();
if (next) {
setImmediate(() => this.processNext());
}
}
console.warn('[JobQueue] processNext: no pipeline or pool configured — skipping job processing');
}
/**
@@ -184,10 +171,21 @@ export class JobQueue {
/**
* Trigger processing of any queued jobs (e.g. after server restart).
* Safe to call multiple times; a no-op if the queue is already running.
* If a worker pool is configured, delegates to it. Otherwise falls back to direct processing.
* Safe to call multiple times.
*/
drainQueued(): void {
if (!this.isRunning) {
if (this.workerPool) {
// Delegate all queued jobs to the worker pool
const queued = this.db
.prepare<[], IndexingJobEntity>(`${JOB_SELECT} WHERE status = 'queued'`)
.all();
for (const rawJob of queued) {
const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob));
this.workerPool.enqueue(job.id, job.repositoryId, job.versionId);
}
} else {
// Fallback: direct pipeline processing
setImmediate(() => this.processNext());
}
}

View File

@@ -0,0 +1,174 @@
import { describe, it, expect } from 'vitest';
import { ProgressBroadcaster } from './progress-broadcaster.js';
describe('ProgressBroadcaster', () => {
it('subscribe returns a readable stream', async () => {
const broadcaster = new ProgressBroadcaster();
const stream = broadcaster.subscribe('job-1');
expect(stream).toBeInstanceOf(ReadableStream);
});
it('broadcast sends to subscribed job listeners', async () => {
const broadcaster = new ProgressBroadcaster();
const stream = broadcaster.subscribe('job-1');
const reader = stream.getReader();
broadcaster.broadcast('job-1', '/repo/1', 'progress', { stage: 'parsing', progress: 50 });
const { value } = await reader.read();
expect(value).toBeDefined();
const text = value as string;
expect(text).toContain('event: progress');
expect(text).toContain('id: 1');
expect(text).toContain('"stage":"parsing"');
expect(text).toContain('"progress":50');
reader.cancel();
});
it('broadcast sends to subscribed repository listeners', async () => {
const broadcaster = new ProgressBroadcaster();
const stream = broadcaster.subscribeRepository('/repo/1');
const reader = stream.getReader();
broadcaster.broadcast('job-1', '/repo/1', 'repo-event', { data: 'test' });
const { value } = await reader.read();
expect(value).toBeDefined();
const text = value as string;
expect(text).toContain('event: repo-event');
expect(text).toContain('"data":"test"');
reader.cancel();
});
it('broadcast sends to all subscribers', async () => {
const broadcaster = new ProgressBroadcaster();
const stream = broadcaster.subscribeAll();
const reader = stream.getReader();
broadcaster.broadcast('job-1', '/repo/1', 'global-event', { value: 42 });
const { value } = await reader.read();
expect(value).toBeDefined();
const text = value as string;
expect(text).toContain('event: global-event');
expect(text).toContain('"value":42');
reader.cancel();
});
it('getLastEvent returns cached events', () => {
const broadcaster = new ProgressBroadcaster();
broadcaster.broadcast('job-1', '/repo/1', 'event1', { msg: 'first' });
broadcaster.broadcast('job-1', '/repo/1', 'event2', { msg: 'second' });
const lastEvent = broadcaster.getLastEvent('job-1');
expect(lastEvent).toBeDefined();
expect(lastEvent?.id).toBe(2);
expect(lastEvent?.event).toBe('event2');
expect(lastEvent?.data).toBe('{"msg":"second"}');
});
it('getLastEvent returns null for unknown job', () => {
const broadcaster = new ProgressBroadcaster();
const lastEvent = broadcaster.getLastEvent('unknown-job');
expect(lastEvent).toBeNull();
});
it('cleanup removes subscribers and cache', async () => {
const broadcaster = new ProgressBroadcaster();
const stream = broadcaster.subscribe('job-1');
const reader = stream.getReader();
broadcaster.broadcast('job-1', '/repo/1', 'event', { data: 'test' });
const lastEventBefore = broadcaster.getLastEvent('job-1');
expect(lastEventBefore).toBeDefined();
broadcaster.cleanup('job-1');
const lastEventAfter = broadcaster.getLastEvent('job-1');
expect(lastEventAfter).toBeNull();
reader.cancel();
});
it('increments event IDs per job', () => {
const broadcaster = new ProgressBroadcaster();
broadcaster.broadcast('job-1', '/repo/1', 'event1', { n: 1 });
broadcaster.broadcast('job-1', '/repo/1', 'event2', { n: 2 });
broadcaster.broadcast('job-2', '/repo/2', 'event3', { n: 3 });
expect(broadcaster.getLastEvent('job-1')?.id).toBe(2);
expect(broadcaster.getLastEvent('job-2')?.id).toBe(1);
});
it('sends reconnect event with last event ID on subscribe', async () => {
const broadcaster = new ProgressBroadcaster();
// Publish first event
broadcaster.broadcast('job-1', '/repo/1', 'progress', { value: 10 });
// Subscribe later
const stream = broadcaster.subscribe('job-1');
const reader = stream.getReader();
const { value } = await reader.read();
const text = value as string;
expect(text).toContain('event: reconnect');
expect(text).toContain('"lastEventId":1');
reader.cancel();
});
it('SSE format is correct', async () => {
const broadcaster = new ProgressBroadcaster();
const stream = broadcaster.subscribe('job-1');
const reader = stream.getReader();
broadcaster.broadcast('job-1', '/repo/1', 'test', { msg: 'hello' });
const { value } = await reader.read();
const text = value as string;
// SSE format: id: N\nevent: name\ndata: json\n\n
expect(text).toMatch(/^id: \d+\n/);
expect(text).toMatch(/event: test\n/);
expect(text).toMatch(/data: {[^}]+}\n\n$/);
expect(text.endsWith('\n\n')).toBe(true);
reader.cancel();
});
it('handles multiple concurrent subscribers', async () => {
const broadcaster = new ProgressBroadcaster();
const stream1 = broadcaster.subscribe('job-1');
const stream2 = broadcaster.subscribe('job-1');
const reader1 = stream1.getReader();
const reader2 = stream2.getReader();
broadcaster.broadcast('job-1', '/repo/1', 'event', { data: 'test' });
const { value: value1 } = await reader1.read();
const { value: value2 } = await reader2.read();
expect(value1).toBeDefined();
expect(value2).toBeDefined();
reader1.cancel();
reader2.cancel();
});
});

View File

@@ -0,0 +1,179 @@
export interface SSEEvent {
id: number;
event: string;
data: string;
}
export class ProgressBroadcaster {
private jobSubscribers = new Map<string, Set<ReadableStreamDefaultController<string>>>();
private repoSubscribers = new Map<string, Set<ReadableStreamDefaultController<string>>>();
private allSubscribers = new Set<ReadableStreamDefaultController<string>>();
private lastEventCache = new Map<string, SSEEvent>();
private eventCounters = new Map<string, number>();
subscribe(jobId: string): ReadableStream<string> {
return new ReadableStream({
start: (controller: ReadableStreamDefaultController<string>) => {
if (!this.jobSubscribers.has(jobId)) {
this.jobSubscribers.set(jobId, new Set());
}
this.jobSubscribers.get(jobId)!.add(controller);
// Send last event on reconnect if available
const lastEvent = this.getLastEvent(jobId);
if (lastEvent) {
controller.enqueue(`event: reconnect\ndata: {"lastEventId":${lastEvent.id}}\n\n`);
}
},
cancel: () => {
const set = this.jobSubscribers.get(jobId);
if (set) {
set.forEach((controller) => {
try {
controller.close();
} catch {
// Controller already closed
}
});
set.clear();
}
}
});
}
subscribeRepository(repositoryId: string): ReadableStream<string> {
return new ReadableStream({
start: (controller: ReadableStreamDefaultController<string>) => {
if (!this.repoSubscribers.has(repositoryId)) {
this.repoSubscribers.set(repositoryId, new Set());
}
this.repoSubscribers.get(repositoryId)!.add(controller);
},
cancel: () => {
const set = this.repoSubscribers.get(repositoryId);
if (set) {
set.forEach((controller) => {
try {
controller.close();
} catch {
// Controller already closed
}
});
set.clear();
}
}
});
}
subscribeAll(): ReadableStream<string> {
return new ReadableStream({
start: (controller: ReadableStreamDefaultController<string>) => {
this.allSubscribers.add(controller);
},
cancel: () => {
this.allSubscribers.forEach((controller) => {
try {
controller.close();
} catch {
// Controller already closed
}
});
this.allSubscribers.clear();
}
});
}
broadcast(jobId: string, repositoryId: string, eventName: string, data: object): void {
// Increment event counter for this job
const counter = (this.eventCounters.get(jobId) ?? 0) + 1;
this.eventCounters.set(jobId, counter);
// Create SSE event
const event: SSEEvent = {
id: counter,
event: eventName,
data: JSON.stringify(data)
};
// Cache the event
this.lastEventCache.set(jobId, event);
// Format as SSE
const sse = this.formatSSE(event);
// Write to job-specific subscribers
const jobSubscribers = this.jobSubscribers.get(jobId);
if (jobSubscribers) {
for (const controller of jobSubscribers) {
try {
controller.enqueue(sse);
} catch {
// Controller might be closed or errored
}
}
}
// Write to repo-specific subscribers
const repoSubscribers = this.repoSubscribers.get(repositoryId);
if (repoSubscribers) {
for (const controller of repoSubscribers) {
try {
controller.enqueue(sse);
} catch {
// Controller might be closed or errored
}
}
}
// Write to all-subscribers
for (const controller of this.allSubscribers) {
try {
controller.enqueue(sse);
} catch {
// Controller might be closed or errored
}
}
}
getLastEvent(jobId: string): SSEEvent | null {
return this.lastEventCache.get(jobId) ?? null;
}
cleanup(jobId: string): void {
// Close and remove job subscribers
const jobSubscribers = this.jobSubscribers.get(jobId);
if (jobSubscribers) {
for (const controller of jobSubscribers) {
try {
controller.close();
} catch {
// Already closed
}
}
jobSubscribers.clear();
this.jobSubscribers.delete(jobId);
}
// Remove cache and counter
this.lastEventCache.delete(jobId);
this.eventCounters.delete(jobId);
}
private formatSSE(event: SSEEvent): string {
return `id: ${event.id}\nevent: ${event.event}\ndata: ${event.data}\n\n`;
}
}
// Singleton instance
let broadcaster: ProgressBroadcaster | null = null;
export function initBroadcaster(): ProgressBroadcaster {
if (!broadcaster) {
broadcaster = new ProgressBroadcaster();
}
return broadcaster;
}
export function getBroadcaster(): ProgressBroadcaster | null {
return broadcaster;
}

View File

@@ -15,6 +15,12 @@ import { crawl as githubCrawl } from '$lib/server/crawler/github.crawler.js';
import { LocalCrawler } from '$lib/server/crawler/local.crawler.js';
import { IndexingPipeline } from './indexing.pipeline.js';
import { JobQueue } from './job-queue.js';
import { WorkerPool } from './worker-pool.js';
import type { ParseWorkerResponse } from './worker-types.js';
import { initBroadcaster } from './progress-broadcaster.js';
import type { ProgressBroadcaster } from './progress-broadcaster.js';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
// ---------------------------------------------------------------------------
// Stale-job recovery
@@ -49,6 +55,8 @@ export function recoverStaleJobs(db: Database.Database): void {
let _queue: JobQueue | null = null;
let _pipeline: IndexingPipeline | null = null;
let _pool: WorkerPool | null = null;
let _broadcaster: ProgressBroadcaster | null = null;
/**
* Initialise (or return the existing) JobQueue + IndexingPipeline pair.
@@ -59,11 +67,13 @@ let _pipeline: IndexingPipeline | null = null;
*
* @param db - Raw better-sqlite3 Database instance.
* @param embeddingService - Optional embedding service; pass null to disable.
* @param options - Optional configuration for worker pool (concurrency, dbPath).
* @returns An object with `queue` and `pipeline` accessors.
*/
export function initializePipeline(
db: Database.Database,
embeddingService: EmbeddingService | null = null
embeddingService: EmbeddingService | null = null,
options?: { concurrency?: number; dbPath?: string }
): { queue: JobQueue; pipeline: IndexingPipeline } {
if (_queue && _pipeline) {
return { queue: _queue, pipeline: _pipeline };
@@ -76,7 +86,76 @@ export function initializePipeline(
const pipeline = new IndexingPipeline(db, githubCrawl, localCrawler, embeddingService);
const queue = new JobQueue(db);
queue.setPipeline(pipeline);
// If worker pool options are provided, create and wire the pool
if (options?.dbPath) {
_broadcaster = initBroadcaster();
// Resolve worker script paths relative to this file (build/workers/ directory)
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const workerScript = path.join(__dirname, '../../../build/workers/worker-entry.mjs');
const embedWorkerScript = path.join(__dirname, '../../../build/workers/embed-worker-entry.mjs');
try {
_pool = new WorkerPool({
concurrency: options.concurrency ?? 2,
workerScript,
embedWorkerScript,
dbPath: options.dbPath,
onProgress: (jobId: string, msg: ParseWorkerResponse) => {
// Update DB with progress
db.prepare(
`UPDATE indexing_jobs
SET stage = ?, stage_detail = ?, progress = ?, processed_files = ?, total_files = ?
WHERE id = ?`
).run(msg.stage, msg.stageDetail ?? null, msg.progress, msg.processedFiles, msg.totalFiles, jobId);
// Broadcast progress event
if (_broadcaster) {
_broadcaster.broadcast(jobId, '', 'progress', msg);
}
},
onJobDone: (jobId: string) => {
// Update job status to done
db.prepare(`UPDATE indexing_jobs SET status = 'done', completed_at = unixepoch() WHERE id = ?`).run(
jobId
);
// Broadcast done event
if (_broadcaster) {
_broadcaster.broadcast(jobId, '', 'job-done', { jobId });
}
},
onJobFailed: (jobId: string, error: string) => {
// Update job status to failed with error message
db.prepare(
`UPDATE indexing_jobs SET status = 'failed', error = ?, completed_at = unixepoch() WHERE id = ?`
).run(error, jobId);
// Broadcast failed event
if (_broadcaster) {
_broadcaster.broadcast(jobId, '', 'job-failed', { jobId, error });
}
},
onEmbedReady: () => {
console.log('[WorkerPool] Embedding worker ready');
},
onEmbedDone: (jobId: string) => {
console.log('[WorkerPool] Embedding complete for job:', jobId);
},
onEmbedFailed: (jobId: string, error: string) => {
console.error('[WorkerPool] Embedding failed for job:', jobId, error);
}
});
queue.setWorkerPool(_pool);
} catch (err) {
console.warn(
'[startup] Failed to create WorkerPool (worker scripts may not exist yet):',
err instanceof Error ? err.message : String(err)
);
}
}
_queue = queue;
_pipeline = pipeline;
@@ -87,11 +166,7 @@ export function initializePipeline(
.prepare<[], { id: string }>(`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`)
.get();
if (pending) {
// Re-enqueue logic is handled inside JobQueue.processNext; we trigger
// it by asking the queue for any job that is already queued.
// The simplest way is to call enqueue on a repo that has a queued job —
// but since enqueue deduplicates, we just trigger processNext directly.
// We do this via a public helper to avoid exposing private methods.
// Re-enqueue logic is handled inside JobQueue.drainQueued; we trigger it here.
queue.drainQueued();
}
});
@@ -100,23 +175,41 @@ export function initializePipeline(
}
/**
* Return the current JobQueue singleton, or null if not yet initialised.
* Accessor for the JobQueue singleton.
*/
export function getQueue(): JobQueue | null {
return _queue;
}
/**
* Return the current IndexingPipeline singleton, or null if not yet initialised.
* Accessor for the IndexingPipeline singleton.
*/
export function getPipeline(): IndexingPipeline | null {
return _pipeline;
}
/**
* Reset singletons — intended for use in tests only.
* Accessor for the WorkerPool singleton.
*/
export function getPool(): WorkerPool | null {
return _pool;
}
/**
* Accessor for the ProgressBroadcaster singleton.
*/
export function getBroadcaster(): ProgressBroadcaster | null {
return _broadcaster;
}
/**
* Reset singletons (for testing).
*/
export function _resetSingletons(): void {
_queue = null;
_pipeline = null;
_pool = null;
_broadcaster = null;
}

View File

@@ -0,0 +1,75 @@
import { workerData, parentPort } from 'node:worker_threads';
import Database from 'better-sqlite3';
import { IndexingPipeline } from './indexing.pipeline.js';
import { crawl as githubCrawl } from '$lib/server/crawler/github.crawler.js';
import { LocalCrawler } from '$lib/server/crawler/local.crawler.js';
import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js';
import { IndexingJobEntity, type IndexingJobEntityProps } from '$lib/server/models/indexing-job.js';
import type { ParseWorkerRequest, ParseWorkerResponse, WorkerInitData } from './worker-types.js';
import type { IndexingStage } from '$lib/types.js';
const { dbPath } = workerData as WorkerInitData;
const db = new Database(dbPath);
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
db.pragma('busy_timeout = 5000');
const pipeline = new IndexingPipeline(db, githubCrawl, new LocalCrawler(), null);
let currentJobId: string | null = null;
parentPort!.on('message', async (msg: ParseWorkerRequest) => {
if (msg.type === 'shutdown') {
db.close();
process.exit(0);
}
if (msg.type === 'run') {
currentJobId = msg.jobId;
try {
const rawJob = db.prepare('SELECT * FROM indexing_jobs WHERE id = ?').get(msg.jobId);
if (!rawJob) {
throw new Error(`Job ${msg.jobId} not found`);
}
const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob as IndexingJobEntityProps));
await pipeline.run(
job,
(stage: IndexingStage, detail?: string, progress?: number, processedFiles?: number, totalFiles?: number) => {
parentPort!.postMessage({
type: 'progress',
jobId: msg.jobId,
stage,
stageDetail: detail,
progress: progress ?? 0,
processedFiles: processedFiles ?? 0,
totalFiles: totalFiles ?? 0
} satisfies ParseWorkerResponse);
}
);
parentPort!.postMessage({
type: 'done',
jobId: msg.jobId
} satisfies ParseWorkerResponse);
} catch (err) {
parentPort!.postMessage({
type: 'failed',
jobId: msg.jobId,
error: err instanceof Error ? err.message : String(err)
} satisfies ParseWorkerResponse);
} finally {
currentJobId = null;
}
}
});
process.on('uncaughtException', (err) => {
if (currentJobId) {
parentPort!.postMessage({
type: 'failed',
jobId: currentJobId,
error: err instanceof Error ? err.message : String(err)
} satisfies ParseWorkerResponse);
}
process.exit(1);
});

View File

@@ -0,0 +1,340 @@
import { Worker } from 'node:worker_threads';
import { existsSync } from 'node:fs';
import type { ParseWorkerRequest, ParseWorkerResponse, EmbedWorkerRequest, EmbedWorkerResponse, WorkerInitData } from './worker-types.js';
export interface WorkerPoolOptions {
concurrency: number;
workerScript: string;
embedWorkerScript: string;
dbPath: string;
embeddingProfileId?: string;
onProgress: (jobId: string, msg: ParseWorkerResponse) => void;
onJobDone: (jobId: string) => void;
onJobFailed: (jobId: string, error: string) => void;
onEmbedDone: (jobId: string) => void;
onEmbedFailed: (jobId: string, error: string) => void;
}
interface QueuedJob {
jobId: string;
repositoryId: string;
versionId?: string | null;
}
interface RunningJob {
jobId: string;
repositoryId: string;
}
interface EmbedQueuedJob {
jobId: string;
repositoryId: string;
versionId: string | null;
}
export class WorkerPool {
private workers: Worker[] = [];
private idleWorkers: Worker[] = [];
private embedWorker: Worker | null = null;
private embedReady = false;
private jobQueue: QueuedJob[] = [];
private runningJobs = new Map<Worker, RunningJob>();
private runningRepoIds = new Set<string>();
private embedQueue: EmbedQueuedJob[] = [];
private options: WorkerPoolOptions;
private fallbackMode = false;
private shuttingDown = false;
constructor(options: WorkerPoolOptions) {
this.options = options;
// Check if worker script exists
if (!existsSync(options.workerScript)) {
console.warn(`Worker script not found at ${options.workerScript}, entering fallback mode`);
this.fallbackMode = true;
return;
}
// Spawn parse workers
for (let i = 0; i < options.concurrency; i++) {
const worker = this.spawnParseWorker();
this.workers.push(worker);
this.idleWorkers.push(worker);
}
// Optionally spawn embed worker
if (options.embeddingProfileId && existsSync(options.embedWorkerScript)) {
this.embedWorker = this.spawnEmbedWorker();
}
}
private spawnParseWorker(): Worker {
const worker = new Worker(this.options.workerScript, {
workerData: {
dbPath: this.options.dbPath
} satisfies WorkerInitData
});
worker.on('message', (msg: ParseWorkerResponse) => this.onWorkerMessage(worker, msg));
worker.on('exit', (code: number) => this.onWorkerExit(worker, code));
return worker;
}
private spawnEmbedWorker(): Worker {
const worker = new Worker(this.options.embedWorkerScript, {
workerData: {
dbPath: this.options.dbPath,
embeddingProfileId: this.options.embeddingProfileId
} satisfies WorkerInitData
});
worker.on('message', (msg: EmbedWorkerResponse) => this.onEmbedWorkerMessage(msg));
return worker;
}
public enqueue(jobId: string, repositoryId: string, versionId?: string | null): void {
if (this.shuttingDown) {
console.warn('WorkerPool is shutting down, ignoring enqueue request');
return;
}
if (this.fallbackMode) {
console.warn(`WorkerPool in fallback mode for job ${jobId} - delegating to main thread`);
return;
}
this.jobQueue.push({ jobId, repositoryId, versionId });
this.dispatch();
}
private dispatch(): void {
while (this.idleWorkers.length > 0 && this.jobQueue.length > 0) {
// Find first job whose repositoryId is not currently running
const jobIdx = this.jobQueue.findIndex((j) => !this.runningRepoIds.has(j.repositoryId));
if (jobIdx === -1) {
// No eligible job found (all repos have running jobs)
break;
}
const job = this.jobQueue.splice(jobIdx, 1)[0];
const worker = this.idleWorkers.pop()!;
this.runningJobs.set(worker, { jobId: job.jobId, repositoryId: job.repositoryId });
this.runningRepoIds.add(job.repositoryId);
const msg: ParseWorkerRequest = { type: 'run', jobId: job.jobId };
worker.postMessage(msg);
}
}
private onWorkerMessage(worker: Worker, msg: ParseWorkerResponse): void {
if (msg.type === 'progress') {
this.options.onProgress(msg.jobId, msg);
} else if (msg.type === 'done') {
const runningJob = this.runningJobs.get(worker);
if (runningJob) {
this.runningJobs.delete(worker);
this.runningRepoIds.delete(runningJob.repositoryId);
}
this.idleWorkers.push(worker);
this.options.onJobDone(msg.jobId);
// If embedding configured, enqueue embed request
if (this.embedWorker && this.options.embeddingProfileId) {
const runningJobData = runningJob || { jobId: msg.jobId, repositoryId: '' };
this.enqueueEmbed(msg.jobId, runningJobData.repositoryId, null);
}
this.dispatch();
} else if (msg.type === 'failed') {
const runningJob = this.runningJobs.get(worker);
if (runningJob) {
this.runningJobs.delete(worker);
this.runningRepoIds.delete(runningJob.repositoryId);
}
this.idleWorkers.push(worker);
this.options.onJobFailed(msg.jobId, msg.error);
this.dispatch();
}
}
private onWorkerExit(worker: Worker, code: number): void {
if (this.shuttingDown) {
return;
}
// Remove from idle if present
const idleIdx = this.idleWorkers.indexOf(worker);
if (idleIdx !== -1) {
this.idleWorkers.splice(idleIdx, 1);
}
// Check if there's a running job
const runningJob = this.runningJobs.get(worker);
if (runningJob && code !== 0) {
this.runningJobs.delete(worker);
this.runningRepoIds.delete(runningJob.repositoryId);
this.options.onJobFailed(runningJob.jobId, `Worker crashed with code ${code}`);
} else if (runningJob) {
this.runningJobs.delete(worker);
this.runningRepoIds.delete(runningJob.repositoryId);
}
// Remove from workers array
const workerIdx = this.workers.indexOf(worker);
if (workerIdx !== -1) {
this.workers.splice(workerIdx, 1);
}
// Spawn replacement worker if not shutting down and we haven't reached target
if (!this.shuttingDown && this.workers.length < this.options.concurrency) {
const newWorker = this.spawnParseWorker();
this.workers.push(newWorker);
this.idleWorkers.push(newWorker);
this.dispatch();
}
}
private onEmbedWorkerMessage(msg: EmbedWorkerResponse): void {
if (msg.type === 'ready') {
this.embedReady = true;
// Process any queued embed requests
this.processEmbedQueue();
} else if (msg.type === 'embed-progress') {
// Progress message - could be tracked but not strictly required
} else if (msg.type === 'embed-done') {
this.options.onEmbedDone(msg.jobId);
} else if (msg.type === 'embed-failed') {
this.options.onEmbedFailed(msg.jobId, msg.error);
}
}
private processEmbedQueue(): void {
if (!this.embedWorker || !this.embedReady) {
return;
}
while (this.embedQueue.length > 0) {
const job = this.embedQueue.shift();
if (job) {
const msg: EmbedWorkerRequest = {
type: 'embed',
jobId: job.jobId,
repositoryId: job.repositoryId,
versionId: job.versionId
};
this.embedWorker.postMessage(msg);
}
}
}
public enqueueEmbed(jobId: string, repositoryId: string, versionId: string | null): void {
if (!this.embedWorker) {
return; // no-op if embedding not configured
}
if (this.embedReady) {
const msg: EmbedWorkerRequest = {
type: 'embed',
jobId,
repositoryId,
versionId
};
this.embedWorker.postMessage(msg);
} else {
this.embedQueue.push({ jobId, repositoryId, versionId });
}
}
public setMaxConcurrency(n: number): void {
const current = this.workers.length;
if (n > current) {
// Spawn additional workers
for (let i = current; i < n; i++) {
const worker = this.spawnParseWorker();
this.workers.push(worker);
this.idleWorkers.push(worker);
}
} else if (n < current) {
// Shut down excess idle workers
const excess = current - n;
for (let i = 0; i < excess; i++) {
if (this.idleWorkers.length > 0) {
const worker = this.idleWorkers.pop()!;
const workerIdx = this.workers.indexOf(worker);
if (workerIdx !== -1) {
this.workers.splice(workerIdx, 1);
}
const msg: ParseWorkerRequest = { type: 'shutdown' };
worker.postMessage(msg);
}
}
}
}
public async shutdown(): Promise<void> {
this.shuttingDown = true;
const msg: ParseWorkerRequest = { type: 'shutdown' };
// Send shutdown to all parse workers
for (const worker of this.workers) {
try {
worker.postMessage(msg);
} catch {
// Worker might already be exited
}
}
// Send shutdown to embed worker if exists
if (this.embedWorker) {
try {
const embedMsg: EmbedWorkerRequest = { type: 'shutdown' };
this.embedWorker.postMessage(embedMsg);
} catch {
// Worker might already be exited
}
}
// Wait for workers to exit with timeout
const timeout = 5000;
const startTime = Date.now();
const checkAllExited = (): boolean => {
return this.workers.length === 0 && (!this.embedWorker || !this.embedWorker.threadId);
};
while (!checkAllExited() && Date.now() - startTime < timeout) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
// Force kill any remaining workers
for (const worker of this.workers) {
try {
worker.terminate();
} catch {
// Already terminated
}
}
if (this.embedWorker) {
try {
this.embedWorker.terminate();
} catch {
// Already terminated
}
}
this.workers = [];
this.idleWorkers = [];
this.embedWorker = null;
}
public get isFallbackMode(): boolean {
return this.fallbackMode;
}
}

View File

@@ -0,0 +1,25 @@
import type { IndexingStage } from '$lib/types.js';
export type ParseWorkerRequest =
| { type: 'run'; jobId: string }
| { type: 'shutdown' };
export type ParseWorkerResponse =
| { type: 'progress'; jobId: string; stage: IndexingStage; stageDetail?: string; progress: number; processedFiles: number; totalFiles: number }
| { type: 'done'; jobId: string }
| { type: 'failed'; jobId: string; error: string };
export type EmbedWorkerRequest =
| { type: 'embed'; jobId: string; repositoryId: string; versionId: string | null }
| { type: 'shutdown' };
export type EmbedWorkerResponse =
| { type: 'ready' }
| { type: 'embed-progress'; jobId: string; done: number; total: number }
| { type: 'embed-done'; jobId: string }
| { type: 'embed-failed'; jobId: string; error: string };
export interface WorkerInitData {
dbPath: string;
embeddingProfileId?: string;
}

View File

@@ -31,7 +31,9 @@ function createTestDb(): Database.Database {
for (const migration of [
'0000_large_master_chief.sql',
'0001_quick_nighthawk.sql',
'0002_silky_stellaris.sql'
'0002_silky_stellaris.sql',
'0003_multiversion_config.sql',
'0004_complete_sentry.sql'
]) {
const statements = readFileSync(join(migrationsFolder, migration), 'utf-8')
.split('--> statement-breakpoint')

View File

@@ -31,6 +31,7 @@ export type RepositorySource = 'github' | 'local';
export type RepositoryState = 'pending' | 'indexing' | 'indexed' | 'error';
export type SnippetType = 'code' | 'info';
export type JobStatus = 'queued' | 'running' | 'done' | 'failed';
export type IndexingStage = 'queued' | 'differential' | 'crawling' | 'cloning' | 'parsing' | 'storing' | 'embedding' | 'done' | 'failed';
export type VersionState = 'pending' | 'indexing' | 'indexed' | 'error';
export type EmbeddingProviderKind = 'local-transformers' | 'openai-compatible';

View File

@@ -110,11 +110,49 @@
}
}
// Auto-refresh every 3 seconds
// Auto-refresh with EventSource streaming + fallback polling
$effect(() => {
fetchJobs();
const interval = setInterval(fetchJobs, 3000);
return () => clearInterval(interval);
const es = new EventSource('/api/v1/jobs/stream');
let fallbackInterval: ReturnType<typeof setInterval> | null = null;
es.addEventListener('job-progress', (event) => {
const data = JSON.parse(event.data);
jobs = jobs.map((j) =>
j.id === data.jobId
? {
...j,
progress: data.progress,
stage: data.stage,
stageDetail: data.stageDetail,
processedFiles: data.processedFiles,
totalFiles: data.totalFiles
}
: j
);
});
es.addEventListener('job-done', () => {
void fetchJobs();
});
es.addEventListener('job-failed', () => {
void fetchJobs();
});
es.onerror = () => {
es.close();
// Fall back to polling on error
fallbackInterval = setInterval(fetchJobs, 3000);
};
return () => {
es.close();
if (fallbackInterval) {
clearInterval(fallbackInterval);
}
};
});
// Format date for display
@@ -135,6 +173,23 @@
function canCancel(status: IndexingJobDto['status']): boolean {
return status !== 'done' && status !== 'failed';
}
// Map IndexingStage values to display labels
const stageLabels: Record<string, string> = {
queued: 'Queued',
differential: 'Diff',
crawling: 'Crawling',
cloning: 'Cloning',
parsing: 'Parsing',
storing: 'Storing',
embedding: 'Embedding',
done: 'Done',
failed: 'Failed'
};
function getStageLabel(stage: string | undefined): string {
return stage ? (stageLabels[stage] ?? stage) : '—';
}
</script>
<svelte:head>
@@ -181,6 +236,11 @@
>
Status
</th>
<th
class="px-6 py-3 text-left text-xs font-medium tracking-wider text-gray-500 uppercase"
>
Stage
</th>
<th
class="px-6 py-3 text-left text-xs font-medium tracking-wider text-gray-500 uppercase"
>
@@ -210,22 +270,30 @@
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
<JobStatusBadge status={job.status} />
</td>
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
<div class="flex items-center">
<span class="mr-2">{job.progress}%</span>
<div class="h-2 w-32 rounded-full bg-gray-200">
<div
class="h-2 rounded-full bg-blue-600 transition-all"
style="width: {job.progress}%"
></div>
</div>
{#if job.totalFiles > 0}
<span class="ml-2 text-xs text-gray-400">
{job.processedFiles}/{job.totalFiles} files
</span>
{/if}
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
<div class="flex items-center gap-2">
<span>{getStageLabel(job.stage)}</span>
{#if job.stageDetail}
<span class="text-xs text-gray-400">{job.stageDetail}</span>
{/if}
</div>
</td>
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
<div class="flex items-center">
<span class="mr-2">{job.progress}%</span>
<div class="h-2 w-32 rounded-full bg-gray-200">
<div
class="h-2 rounded-full bg-blue-600 transition-all"
style="width: {job.progress}%"
></div>
</div>
</td>
{#if job.totalFiles > 0}
<span class="ml-2 text-xs text-gray-400">
{job.processedFiles}/{job.totalFiles} files
</span>
{/if}
</div>
</td>
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
{formatDate(job.createdAt)}
</td>

View File

@@ -56,6 +56,7 @@ function createTestDb(): Database.Database {
const migration1 = readFileSync(join(migrationsFolder, '0001_quick_nighthawk.sql'), 'utf-8');
const migration2 = readFileSync(join(migrationsFolder, '0002_silky_stellaris.sql'), 'utf-8');
const migration3 = readFileSync(join(migrationsFolder, '0003_multiversion_config.sql'), 'utf-8');
const migration4 = readFileSync(join(migrationsFolder, '0004_complete_sentry.sql'), 'utf-8');
// Apply first migration
const statements0 = migration0
@@ -95,6 +96,15 @@ function createTestDb(): Database.Database {
client.exec(statement);
}
const statements4 = migration4
.split('--> statement-breakpoint')
.map((statement) => statement.trim())
.filter(Boolean);
for (const statement of statements4) {
client.exec(statement);
}
client.exec(readFileSync(ftsFile, 'utf-8'));
return client;
@@ -207,15 +217,6 @@ function seedEmbedding(client: Database.Database, snippetId: string, values: num
.run(snippetId, values.length, Buffer.from(Float32Array.from(values).buffer), NOW_S);
}
function seedRules(client: Database.Database, repositoryId: string, rules: string[]) {
client
.prepare(
`INSERT INTO repository_configs (repository_id, rules, updated_at)
VALUES (?, ?, ?)`
)
.run(repositoryId, JSON.stringify(rules), NOW_S);
}
describe('API contract integration', () => {
beforeEach(() => {
db = createTestDb();

View File

@@ -0,0 +1,115 @@
/**
* GET /api/v1/jobs/:id/stream — stream real-time job progress via SSE.
*
* Headers:
* Last-Event-ID (optional) — triggers replay of last cached event
*/
import type { RequestHandler } from './$types';
import { getClient } from '$lib/server/db/client.js';
import { JobQueue } from '$lib/server/pipeline/job-queue.js';
import { getBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js';
import { handleServiceError } from '$lib/server/utils/validation.js';
export const GET: RequestHandler = ({ params, request }) => {
try {
const db = getClient();
const queue = new JobQueue(db);
const jobId = params.id;
// Get the job from the queue
const job = queue.getJob(jobId);
if (!job) {
return new Response('Not found', { status: 404 });
}
// Get broadcaster
const broadcaster = getBroadcaster();
if (!broadcaster) {
return new Response('Service unavailable', { status: 503 });
}
// Create a new readable stream for SSE
const stream = new ReadableStream<string>({
async start(controller) {
try {
// Send initial job state as first event
const initialData = {
jobId,
stage: job.stage,
stageDetail: job.stageDetail,
progress: job.progress,
processedFiles: job.processedFiles,
totalFiles: job.totalFiles,
status: job.status,
error: job.error
};
controller.enqueue(`data: ${JSON.stringify(initialData)}\n\n`);
// Check for Last-Event-ID header for reconnect
const lastEventId = request.headers.get('Last-Event-ID');
if (lastEventId) {
const lastEvent = broadcaster.getLastEvent(jobId);
if (lastEvent && lastEvent.id >= parseInt(lastEventId, 10)) {
controller.enqueue(`id: ${lastEvent.id}\nevent: ${lastEvent.event}\ndata: ${lastEvent.data}\n\n`);
}
}
// Check if job is already done or failed - close immediately after first event
if (job.status === 'done' || job.status === 'failed') {
controller.close();
return;
}
// Subscribe to broadcaster for live events
const eventStream = broadcaster.subscribe(jobId);
const reader = eventStream.getReader();
// Pipe broadcaster events to the response
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value);
// Check if the incoming event indicates job completion
if (value.includes('event: done') || value.includes('event: failed')) {
controller.close();
break;
}
}
} finally {
reader.releaseLock();
controller.close();
}
} catch (err) {
console.error('SSE stream error:', err);
controller.close();
}
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
'Access-Control-Allow-Origin': '*'
}
});
} catch (err) {
return handleServiceError(err);
}
};
export const OPTIONS: RequestHandler = () => {
return new Response(null, {
status: 204,
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization, Last-Event-ID'
}
});
};

View File

@@ -0,0 +1,52 @@
/**
* GET /api/v1/jobs/stream — stream real-time job progress for all jobs or a specific repository via SSE.
*
* Query parameters:
* repositoryId (optional) — filter to jobs for this repository
*/
import type { RequestHandler } from './$types';
import { getBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js';
import { handleServiceError } from '$lib/server/utils/validation.js';
export const GET: RequestHandler = ({ url }) => {
try {
const broadcaster = getBroadcaster();
if (!broadcaster) {
return new Response('Service unavailable', { status: 503 });
}
const repositoryId = url.searchParams.get('repositoryId');
// Get the appropriate stream based on parameters
let stream;
if (repositoryId) {
stream = broadcaster.subscribeRepository(repositoryId);
} else {
stream = broadcaster.subscribeAll();
}
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
'Access-Control-Allow-Origin': '*'
}
});
} catch (err) {
return handleServiceError(err);
}
};
export const OPTIONS: RequestHandler = () => {
return new Response(null, {
status: 204,
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
}
});
};

View File

@@ -0,0 +1,99 @@
import { json } from '@sveltejs/kit';
import type { RequestHandler } from './$types';
import { getClient } from '$lib/server/db/client.js';
import { getPool } from '$lib/server/pipeline/startup.js';
import os from 'node:os';
/**
* GET /api/v1/settings/indexing — retrieve indexing concurrency setting
* PUT /api/v1/settings/indexing — update indexing concurrency setting
* OPTIONS /api/v1/settings/indexing — CORS preflight
*/
// ---------------------------------------------------------------------------
// GET — Return current indexing concurrency
// ---------------------------------------------------------------------------
export const GET: RequestHandler = () => {
try {
const db = getClient();
const row = db
.prepare<[], { value: string }>(
"SELECT value FROM settings WHERE key = 'indexing.concurrency'"
)
.get();
let concurrency = 2;
if (row && row.value) {
try {
const parsed = JSON.parse(row.value);
if (typeof parsed === 'object' && parsed !== null && typeof parsed.value === 'number') {
concurrency = parsed.value;
} else if (typeof parsed === 'number') {
concurrency = parsed;
}
} catch {
concurrency = 2;
}
}
return json({ concurrency });
} catch (err) {
console.error('GET /api/v1/settings/indexing error:', err);
return json({ error: 'Failed to read indexing settings' }, { status: 500 });
}
};
// ---------------------------------------------------------------------------
// PUT — Update indexing concurrency
// ---------------------------------------------------------------------------
export const PUT: RequestHandler = async ({ request }) => {
try {
const body = await request.json();
// Validate and clamp concurrency
const maxConcurrency = Math.max(os.cpus().length - 1, 1);
const concurrency = Math.max(1, Math.min(parseInt(String(body.concurrency ?? 2), 10), maxConcurrency));
if (isNaN(concurrency)) {
return json(
{ error: 'Concurrency must be a valid integer' },
{ status: 400 }
);
}
const db = getClient();
// Write to settings table
db.prepare(
"INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES ('indexing.concurrency', ?, unixepoch())"
).run(JSON.stringify({ value: concurrency }));
// Update worker pool if available
getPool()?.setMaxConcurrency(concurrency);
return json({ concurrency });
} catch (err) {
console.error('PUT /api/v1/settings/indexing error:', err);
return json(
{ error: err instanceof Error ? err.message : 'Failed to update indexing settings' },
{ status: 500 }
);
}
};
// ---------------------------------------------------------------------------
// OPTIONS — CORS preflight
// ---------------------------------------------------------------------------
export const OPTIONS: RequestHandler = () => {
return new Response(null, {
status: 200,
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, PUT, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type'
}
});
};

View File

@@ -2,6 +2,7 @@
import { goto } from '$app/navigation';
import { resolve as resolveRoute } from '$app/paths';
import { onMount } from 'svelte';
import { SvelteSet } from 'svelte/reactivity';
import type { PageData } from './$types';
import type { Repository, IndexingJob } from '$lib/types';
import ConfirmDialog from '$lib/components/ConfirmDialog.svelte';
@@ -48,7 +49,7 @@
// Discover tags state
let discoverBusy = $state(false);
let discoveredTags = $state<Array<{ tag: string; commitHash: string }>>([]);
let selectedDiscoveredTags = $state<Set<string>>(new Set());
let selectedDiscoveredTags = new SvelteSet<string>();
let showDiscoverPanel = $state(false);
let registerBusy = $state(false);
@@ -75,6 +76,18 @@
error: 'Error'
};
const stageLabels: Record<string, string> = {
queued: 'Queued',
differential: 'Diff',
crawling: 'Crawling',
cloning: 'Cloning',
parsing: 'Parsing',
storing: 'Storing',
embedding: 'Embedding',
done: 'Done',
failed: 'Failed'
};
async function refreshRepo() {
try {
const res = await fetch(`/api/v1/libs/${encodeURIComponent(repo.id)}`);
@@ -105,63 +118,78 @@
loadVersions();
});
// Single shared poller — one interval regardless of how many tags are active.
// This replaces the N per-version <IndexingProgress> components that each had
// their own setInterval, which caused ERR_INSUFFICIENT_RESOURCES and UI lockup
// when hundreds of tags were queued simultaneously.
// Single shared poller replaced with EventSource SSE stream
$effect(() => {
const activeIds = new Set(
Object.values(activeVersionJobs).filter((id): id is string => !!id)
);
if (activeIds.size === 0) {
versionJobProgress = {};
return;
}
if (!repo.id) return;
let stopped = false;
const es = new EventSource(
`/api/v1/jobs/stream?repositoryId=${encodeURIComponent(repo.id)}`
);
async function poll() {
es.addEventListener('job-progress', (event) => {
if (stopped) return;
try {
const res = await fetch(
`/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000`
);
if (!res.ok || stopped) return;
const d = await res.json();
// Build a jobId → job lookup from the response.
const map: Record<string, IndexingJob> = {};
for (const job of (d.jobs ?? []) as IndexingJob[]) {
map[job.id] = job;
}
if (!stopped) versionJobProgress = map;
// Retire completed jobs and trigger a single refresh.
let anyCompleted = false;
const nextJobs = { ...activeVersionJobs };
for (const [tag, jobId] of Object.entries(activeVersionJobs)) {
if (!jobId) continue;
const job = map[jobId];
if (job?.status === 'done' || job?.status === 'failed') {
delete nextJobs[tag];
anyCompleted = true;
}
}
if (anyCompleted && !stopped) {
activeVersionJobs = nextJobs;
void loadVersions();
void refreshRepo();
}
const data = JSON.parse(event.data) as IndexingJob;
versionJobProgress = { ...versionJobProgress, [data.id]: data };
} catch {
// ignore transient errors
// ignore parse errors
}
}
});
es.addEventListener('job-done', (event) => {
if (stopped) return;
try {
const data = JSON.parse(event.data) as IndexingJob;
const next = { ...versionJobProgress };
delete next[data.id];
versionJobProgress = next;
void loadVersions();
void refreshRepo();
} catch {
// ignore parse errors
}
});
es.addEventListener('job-failed', (event) => {
if (stopped) return;
try {
const data = JSON.parse(event.data) as IndexingJob;
const next = { ...versionJobProgress };
delete next[data.id];
versionJobProgress = next;
void loadVersions();
void refreshRepo();
} catch {
// ignore parse errors
}
});
es.onerror = () => {
if (stopped) return;
es.close();
// Fall back to a single fetch for resilience
(async () => {
try {
const res = await fetch(
`/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000`
);
if (!res.ok || stopped) return;
const d = await res.json();
const map: Record<string, IndexingJob> = {};
for (const job of (d.jobs ?? []) as IndexingJob[]) {
map[job.id] = job;
}
if (!stopped) versionJobProgress = map;
} catch {
// ignore errors
}
})();
};
void poll();
const interval = setInterval(poll, 2000);
return () => {
stopped = true;
clearInterval(interval);
es.close();
};
});
@@ -303,7 +331,7 @@
discoveredTags = (d.tags ?? []).filter(
(t: { tag: string; commitHash: string }) => !registeredTags.has(t.tag)
);
selectedDiscoveredTags = new Set(discoveredTags.map((t) => t.tag));
selectedDiscoveredTags = new SvelteSet(discoveredTags.map((t) => t.tag));
showDiscoverPanel = true;
} catch (e) {
errorMessage = (e as Error).message;
@@ -313,13 +341,11 @@
}
function toggleDiscoveredTag(tag: string) {
const next = new Set(selectedDiscoveredTags);
if (next.has(tag)) {
next.delete(tag);
if (selectedDiscoveredTags.has(tag)) {
selectedDiscoveredTags.delete(tag);
} else {
next.add(tag);
selectedDiscoveredTags.add(tag);
}
selectedDiscoveredTags = next;
}
async function handleRegisterSelected() {
@@ -354,7 +380,7 @@
activeVersionJobs = next;
showDiscoverPanel = false;
discoveredTags = [];
selectedDiscoveredTags = new Set();
selectedDiscoveredTags = new SvelteSet();
await loadVersions();
} catch (e) {
errorMessage = (e as Error).message;
@@ -523,7 +549,7 @@
onclick={() => {
showDiscoverPanel = false;
discoveredTags = [];
selectedDiscoveredTags = new Set();
selectedDiscoveredTags = new SvelteSet();
}}
class="text-xs text-blue-600 hover:underline"
>
@@ -620,7 +646,10 @@
{@const job = versionJobProgress[activeVersionJobs[version.tag]!]}
<div class="mt-2">
<div class="flex justify-between text-xs text-gray-500">
<span>{(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files</span>
<span>
{#if job?.stageDetail}{job.stageDetail}{:else}{(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files{/if}
{#if job?.stage}{' - ' + (stageLabels[job.stage] ?? job.stage)}{/if}
</span>
<span>{job?.progress ?? 0}%</span>
</div>
<div class="mt-1 h-1.5 w-full rounded-full bg-gray-200">

View File

@@ -5,7 +5,9 @@ import { EmbeddingSettingsDtoMapper } from '$lib/server/mappers/embedding-settin
import { EmbeddingSettingsService } from '$lib/server/services/embedding-settings.service.js';
export const load: PageServerLoad = async () => {
const service = new EmbeddingSettingsService(getClient());
const db = getClient();
const service = new EmbeddingSettingsService(db);
const settings = EmbeddingSettingsDtoMapper.toDto(service.getSettings());
let localProviderAvailable = false;
@@ -15,8 +17,30 @@ export const load: PageServerLoad = async () => {
localProviderAvailable = false;
}
// Read indexing concurrency setting
let indexingConcurrency = 2;
const concurrencyRow = db
.prepare<[], { value: string }>(
"SELECT value FROM settings WHERE key = 'indexing.concurrency'"
)
.get();
if (concurrencyRow && concurrencyRow.value) {
try {
const parsed = JSON.parse(concurrencyRow.value);
if (typeof parsed === 'object' && parsed !== null && typeof parsed.value === 'number') {
indexingConcurrency = parsed.value;
} else if (typeof parsed === 'number') {
indexingConcurrency = parsed;
}
} catch {
indexingConcurrency = 2;
}
}
return {
settings,
localProviderAvailable
localProviderAvailable,
indexingConcurrency
};
};

View File

@@ -66,12 +66,19 @@
let saveError = $state<string | null>(null);
let saveStatusTimer: ReturnType<typeof setTimeout> | null = null;
let concurrencyInput = $derived(data.indexingConcurrency);
let concurrencySaving = $state(false);
let concurrencySaveStatus = $state<'idle' | 'ok' | 'error'>('idle');
let concurrencySaveError = $state<string | null>(null);
let concurrencySaveStatusTimer: ReturnType<typeof setTimeout> | null = null;
const currentSettings = $derived(settingsOverride ?? data.settings);
const activeProfile = $derived(currentSettings.activeProfile);
const activeConfigEntries = $derived(activeProfile?.configEntries ?? []);
onDestroy(() => {
if (saveStatusTimer) clearTimeout(saveStatusTimer);
if (concurrencySaveStatusTimer) clearTimeout(concurrencySaveStatusTimer);
});
// ---------------------------------------------------------------------------
@@ -159,6 +166,38 @@
void save();
}
async function saveConcurrency() {
concurrencySaving = true;
concurrencySaveStatus = 'idle';
concurrencySaveError = null;
try {
const res = await fetch('/api/v1/settings/indexing', {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ concurrency: concurrencyInput })
});
if (res.ok) {
const updated = await res.json();
concurrencyInput = updated.concurrency;
concurrencySaveStatus = 'ok';
if (concurrencySaveStatusTimer) clearTimeout(concurrencySaveStatusTimer);
concurrencySaveStatusTimer = setTimeout(() => {
concurrencySaveStatus = 'idle';
concurrencySaveStatusTimer = null;
}, 3000);
} else {
const data = await res.json();
concurrencySaveStatus = 'error';
concurrencySaveError = data.error ?? 'Save failed';
}
} catch (e) {
concurrencySaveStatus = 'error';
concurrencySaveError = (e as Error).message;
} finally {
concurrencySaving = false;
}
}
function getOpenAiProfile(settings: EmbeddingSettingsDto): EmbeddingProfileDto | null {
return settings.profiles.find((profile) => profile.providerKind === 'openai-compatible') ?? null;
}
@@ -482,6 +521,45 @@
</div>
{/if}
<!-- Indexing section -->
<div class="space-y-3 rounded-lg border border-gray-200 bg-white p-4">
<div>
<label for="concurrency" class="block text-sm font-medium text-gray-700">
Concurrent Workers
</label>
<p class="mt-0.5 text-xs text-gray-500">
Number of parallel indexing workers. Range: 1 to 8.
</p>
</div>
<div class="flex items-center gap-3">
<input
id="concurrency"
type="number"
min="1"
max="8"
inputmode="numeric"
bind:value={concurrencyInput}
disabled={concurrencySaving}
class="w-20 rounded-lg border border-gray-300 px-3 py-2 text-sm focus:border-blue-500 focus:outline-none disabled:opacity-50"
/>
<button
type="button"
onclick={saveConcurrency}
disabled={concurrencySaving}
class="rounded-lg bg-blue-600 px-3 py-2 text-sm text-white hover:bg-blue-700 disabled:opacity-50"
>
{concurrencySaving ? 'Saving…' : 'Save'}
</button>
{#if concurrencySaveStatus === 'ok'}
<span class="text-sm text-green-600">✓ Saved</span>
{:else if concurrencySaveStatus === 'error'}
<span class="text-sm text-red-600">{concurrencySaveError}</span>
{/if}
</div>
</div>
<!-- Save feedback banners -->
{#if saveStatus === 'ok'}
<div