From 763074040376fed74795ce49f1126ad45a7a247c Mon Sep 17 00:00:00 2001 From: Giancarmine Salucci Date: Mon, 30 Mar 2026 17:08:23 +0200 Subject: [PATCH] =?UTF-8?q?feat(TRUEREF-0022):=20complete=20iteration=200?= =?UTF-8?q?=20=E2=80=94=20worker-thread=20indexing,=20parallel=20jobs,=20S?= =?UTF-8?q?SE=20progress?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move IndexingPipeline.run() into Worker Threads via WorkerPool - Add dedicated embedding worker thread with single model instance - Add stage/stageDetail columns to indexing_jobs schema - Create ProgressBroadcaster for SSE channel management - Add SSE endpoints: GET /api/v1/jobs/:id/stream, GET /api/v1/jobs/stream - Replace UI polling with EventSource on repo detail and admin pages - Add concurrency settings UI and API endpoint - Build worker entries separately via esbuild --- package.json | 3 +- scripts/build-workers.mjs | 38 + src/hooks.server.ts | 24 +- .../db/migrations/0004_complete_sentry.sql | 23 + .../db/migrations/meta/0003_snapshot.json | 835 ---------------- .../db/migrations/meta/0004_snapshot.json | 896 ++++++++++++++++++ .../server/db/migrations/meta/_journal.json | 73 +- src/lib/server/db/schema.ts | 2 + src/lib/server/mappers/indexing-job.mapper.ts | 4 + src/lib/server/models/indexing-job.ts | 18 + src/lib/server/pipeline/embed-worker-entry.ts | 93 ++ .../server/pipeline/indexing.pipeline.test.ts | 3 +- src/lib/server/pipeline/indexing.pipeline.ts | 35 +- src/lib/server/pipeline/job-queue.ts | 78 +- .../pipeline/progress-broadcaster.test.ts | 174 ++++ .../server/pipeline/progress-broadcaster.ts | 179 ++++ src/lib/server/pipeline/startup.ts | 112 ++- src/lib/server/pipeline/worker-entry.ts | 75 ++ src/lib/server/pipeline/worker-pool.ts | 341 +++++++ src/lib/server/pipeline/worker-types.ts | 25 + .../services/repository.service.test.ts | 4 +- src/lib/types.ts | 1 + src/routes/admin/jobs/+page.svelte | 104 +- .../api/v1/api-contract.integration.test.ts | 10 + src/routes/api/v1/jobs/[id]/stream/+server.ts | 115 +++ src/routes/api/v1/jobs/stream/+server.ts | 52 + .../api/v1/settings/indexing/+server.ts | 99 ++ src/routes/repos/[id]/+page.svelte | 124 ++- src/routes/settings/+page.server.ts | 28 +- src/routes/settings/+page.svelte | 82 ++ 30 files changed, 2659 insertions(+), 991 deletions(-) create mode 100644 scripts/build-workers.mjs create mode 100644 src/lib/server/db/migrations/0004_complete_sentry.sql delete mode 100644 src/lib/server/db/migrations/meta/0003_snapshot.json create mode 100644 src/lib/server/db/migrations/meta/0004_snapshot.json create mode 100644 src/lib/server/pipeline/embed-worker-entry.ts create mode 100644 src/lib/server/pipeline/progress-broadcaster.test.ts create mode 100644 src/lib/server/pipeline/progress-broadcaster.ts create mode 100644 src/lib/server/pipeline/worker-entry.ts create mode 100644 src/lib/server/pipeline/worker-pool.ts create mode 100644 src/lib/server/pipeline/worker-types.ts create mode 100644 src/routes/api/v1/jobs/[id]/stream/+server.ts create mode 100644 src/routes/api/v1/jobs/stream/+server.ts create mode 100644 src/routes/api/v1/settings/indexing/+server.ts diff --git a/package.json b/package.json index 3c0e331..3b7a3a1 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "type": "module", "scripts": { "dev": "vite dev", - "build": "vite build", + "build": "vite build && node scripts/build-workers.mjs", "preview": "vite preview", "prepare": "svelte-kit sync || echo ''", "check": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json", @@ -34,6 +34,7 @@ "@vitest/browser-playwright": "^4.1.0", "drizzle-kit": "^0.31.8", "drizzle-orm": "^0.45.1", + "esbuild": "^0.24.0", "eslint": "^9.39.2", "eslint-config-prettier": "^10.1.8", "eslint-plugin-svelte": "^3.14.0", diff --git a/scripts/build-workers.mjs b/scripts/build-workers.mjs new file mode 100644 index 0000000..331e2ad --- /dev/null +++ b/scripts/build-workers.mjs @@ -0,0 +1,38 @@ +import * as esbuild from 'esbuild'; +import { existsSync } from 'node:fs'; + +const entries = [ + 'src/lib/server/pipeline/worker-entry.ts', + 'src/lib/server/pipeline/embed-worker-entry.ts' +]; + +try { + const existing = entries.filter(e => existsSync(e)); + if (existing.length === 0) { + console.log('[build-workers] No worker entry files found yet, skipping.'); + process.exit(0); + } + + await esbuild.build({ + entryPoints: existing, + bundle: true, + platform: 'node', + target: 'node20', + format: 'esm', + outdir: 'build/workers', + outExtension: { '.js': '.mjs' }, + alias: { + '$lib': './src/lib', + '$lib/server': './src/lib/server' + }, + external: ['better-sqlite3', '@xenova/transformers'], + banner: { + js: "import { createRequire } from 'module'; const require = createRequire(import.meta.url);" + } + }); + + console.log(`[build-workers] Compiled ${existing.length} worker(s) to build/workers/`); +} catch (err) { + console.error('[build-workers] Error:', err); + process.exit(1); +} diff --git a/src/hooks.server.ts b/src/hooks.server.ts index 1d25e06..412b8d6 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -47,7 +47,29 @@ try { embeddingService = new EmbeddingService(db, provider, activeProfile.id); } - initializePipeline(db, embeddingService); + // Read database path from environment + const dbPath = process.env.DATABASE_URL; + + // Read indexing concurrency setting from database + let concurrency = 2; // default + if (dbPath) { + const concurrencyRow = db + .prepare<[], { value: string }>( + "SELECT value FROM settings WHERE key = 'indexing.concurrency' LIMIT 1" + ) + .get(); + if (concurrencyRow) { + try { + const parsed = JSON.parse(concurrencyRow.value); + concurrency = parsed.value ?? 2; + } catch { + // If parsing fails, use default + concurrency = 2; + } + } + } + + initializePipeline(db, embeddingService, { concurrency, dbPath }); console.log('[hooks.server] Indexing pipeline initialised.'); } catch (err) { console.error( diff --git a/src/lib/server/db/migrations/0004_complete_sentry.sql b/src/lib/server/db/migrations/0004_complete_sentry.sql new file mode 100644 index 0000000..7bef282 --- /dev/null +++ b/src/lib/server/db/migrations/0004_complete_sentry.sql @@ -0,0 +1,23 @@ +PRAGMA foreign_keys=OFF;--> statement-breakpoint +CREATE TABLE `__new_repository_configs` ( + `repository_id` text NOT NULL, + `version_id` text, + `project_title` text, + `description` text, + `folders` text, + `exclude_folders` text, + `exclude_files` text, + `rules` text, + `previous_versions` text, + `updated_at` integer NOT NULL, + FOREIGN KEY (`repository_id`) REFERENCES `repositories`(`id`) ON UPDATE no action ON DELETE cascade +); +--> statement-breakpoint +INSERT INTO `__new_repository_configs`("repository_id", "version_id", "project_title", "description", "folders", "exclude_folders", "exclude_files", "rules", "previous_versions", "updated_at") SELECT "repository_id", "version_id", "project_title", "description", "folders", "exclude_folders", "exclude_files", "rules", "previous_versions", "updated_at" FROM `repository_configs`;--> statement-breakpoint +DROP TABLE `repository_configs`;--> statement-breakpoint +ALTER TABLE `__new_repository_configs` RENAME TO `repository_configs`;--> statement-breakpoint +PRAGMA foreign_keys=ON;--> statement-breakpoint +CREATE UNIQUE INDEX `uniq_repo_config_base` ON `repository_configs` (`repository_id`) WHERE "repository_configs"."version_id" IS NULL;--> statement-breakpoint +CREATE UNIQUE INDEX `uniq_repo_config_version` ON `repository_configs` (`repository_id`,`version_id`) WHERE "repository_configs"."version_id" IS NOT NULL;--> statement-breakpoint +ALTER TABLE `indexing_jobs` ADD `stage` text DEFAULT 'queued' NOT NULL;--> statement-breakpoint +ALTER TABLE `indexing_jobs` ADD `stage_detail` text; \ No newline at end of file diff --git a/src/lib/server/db/migrations/meta/0003_snapshot.json b/src/lib/server/db/migrations/meta/0003_snapshot.json deleted file mode 100644 index 2ee1ce6..0000000 --- a/src/lib/server/db/migrations/meta/0003_snapshot.json +++ /dev/null @@ -1,835 +0,0 @@ -{ - "version": "6", - "dialect": "sqlite", - "id": "a7c2e4f8-3b1d-4e9a-8f0c-6d5e2a1b9c7f", - "prevId": "31531dab-a199-4fc5-a889-1884940039cd", - "tables": { - "documents": { - "name": "documents", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true, - "autoincrement": false - }, - "repository_id": { - "name": "repository_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "version_id": { - "name": "version_id", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "file_path": { - "name": "file_path", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "title": { - "name": "title", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "language": { - "name": "language", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "token_count": { - "name": "token_count", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "checksum": { - "name": "checksum", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "indexed_at": { - "name": "indexed_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": { - "documents_repository_id_repositories_id_fk": { - "name": "documents_repository_id_repositories_id_fk", - "tableFrom": "documents", - "tableTo": "repositories", - "columnsFrom": ["repository_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - }, - "documents_version_id_repository_versions_id_fk": { - "name": "documents_version_id_repository_versions_id_fk", - "tableFrom": "documents", - "tableTo": "repository_versions", - "columnsFrom": ["version_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "embedding_profiles": { - "name": "embedding_profiles", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true, - "autoincrement": false - }, - "provider_kind": { - "name": "provider_kind", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "title": { - "name": "title", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "enabled": { - "name": "enabled", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false, - "default": true - }, - "is_default": { - "name": "is_default", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false, - "default": false - }, - "model": { - "name": "model", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "dimensions": { - "name": "dimensions", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "config": { - "name": "config", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "created_at": { - "name": "created_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "updated_at": { - "name": "updated_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "indexing_jobs": { - "name": "indexing_jobs", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true, - "autoincrement": false - }, - "repository_id": { - "name": "repository_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "version_id": { - "name": "version_id", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "status": { - "name": "status", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false, - "default": "'queued'" - }, - "progress": { - "name": "progress", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "total_files": { - "name": "total_files", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "processed_files": { - "name": "processed_files", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "error": { - "name": "error", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "started_at": { - "name": "started_at", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "completed_at": { - "name": "completed_at", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "created_at": { - "name": "created_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": { - "indexing_jobs_repository_id_repositories_id_fk": { - "name": "indexing_jobs_repository_id_repositories_id_fk", - "tableFrom": "indexing_jobs", - "tableTo": "repositories", - "columnsFrom": ["repository_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "repositories": { - "name": "repositories", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true, - "autoincrement": false - }, - "title": { - "name": "title", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "description": { - "name": "description", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "source": { - "name": "source", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "source_url": { - "name": "source_url", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "branch": { - "name": "branch", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": "'main'" - }, - "state": { - "name": "state", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false, - "default": "'pending'" - }, - "total_snippets": { - "name": "total_snippets", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "total_tokens": { - "name": "total_tokens", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "trust_score": { - "name": "trust_score", - "type": "real", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "benchmark_score": { - "name": "benchmark_score", - "type": "real", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "stars": { - "name": "stars", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "github_token": { - "name": "github_token", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "last_indexed_at": { - "name": "last_indexed_at", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "created_at": { - "name": "created_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "updated_at": { - "name": "updated_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "repository_configs": { - "name": "repository_configs", - "columns": { - "repository_id": { - "name": "repository_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "version_id": { - "name": "version_id", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "project_title": { - "name": "project_title", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "description": { - "name": "description", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "folders": { - "name": "folders", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "exclude_folders": { - "name": "exclude_folders", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "exclude_files": { - "name": "exclude_files", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "rules": { - "name": "rules", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "previous_versions": { - "name": "previous_versions", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "updated_at": { - "name": "updated_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": { - "uniq_repo_config_base": { - "name": "uniq_repo_config_base", - "columns": ["repository_id"], - "isUnique": true, - "where": "`version_id` IS NULL" - }, - "uniq_repo_config_version": { - "name": "uniq_repo_config_version", - "columns": ["repository_id", "version_id"], - "isUnique": true, - "where": "`version_id` IS NOT NULL" - } - }, - "foreignKeys": { - "repository_configs_repository_id_repositories_id_fk": { - "name": "repository_configs_repository_id_repositories_id_fk", - "tableFrom": "repository_configs", - "tableTo": "repositories", - "columnsFrom": ["repository_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "repository_versions": { - "name": "repository_versions", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true, - "autoincrement": false - }, - "repository_id": { - "name": "repository_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "tag": { - "name": "tag", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "title": { - "name": "title", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "commit_hash": { - "name": "commit_hash", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "state": { - "name": "state", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false, - "default": "'pending'" - }, - "total_snippets": { - "name": "total_snippets", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "indexed_at": { - "name": "indexed_at", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "created_at": { - "name": "created_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": { - "repository_versions_repository_id_repositories_id_fk": { - "name": "repository_versions_repository_id_repositories_id_fk", - "tableFrom": "repository_versions", - "tableTo": "repositories", - "columnsFrom": ["repository_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "settings": { - "name": "settings", - "columns": { - "key": { - "name": "key", - "type": "text", - "primaryKey": true, - "notNull": true, - "autoincrement": false - }, - "value": { - "name": "value", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "updated_at": { - "name": "updated_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "snippet_embeddings": { - "name": "snippet_embeddings", - "columns": { - "snippet_id": { - "name": "snippet_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "profile_id": { - "name": "profile_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "model": { - "name": "model", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "dimensions": { - "name": "dimensions", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "embedding": { - "name": "embedding", - "type": "blob", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "created_at": { - "name": "created_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": { - "snippet_embeddings_snippet_id_snippets_id_fk": { - "name": "snippet_embeddings_snippet_id_snippets_id_fk", - "tableFrom": "snippet_embeddings", - "tableTo": "snippets", - "columnsFrom": ["snippet_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - }, - "snippet_embeddings_profile_id_embedding_profiles_id_fk": { - "name": "snippet_embeddings_profile_id_embedding_profiles_id_fk", - "tableFrom": "snippet_embeddings", - "tableTo": "embedding_profiles", - "columnsFrom": ["profile_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": { - "snippet_embeddings_snippet_id_profile_id_pk": { - "columns": ["snippet_id", "profile_id"], - "name": "snippet_embeddings_snippet_id_profile_id_pk" - } - }, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "snippets": { - "name": "snippets", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": true, - "notNull": true, - "autoincrement": false - }, - "document_id": { - "name": "document_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "repository_id": { - "name": "repository_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "version_id": { - "name": "version_id", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "type": { - "name": "type", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "title": { - "name": "title", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "content": { - "name": "content", - "type": "text", - "primaryKey": false, - "notNull": true, - "autoincrement": false - }, - "language": { - "name": "language", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "breadcrumb": { - "name": "breadcrumb", - "type": "text", - "primaryKey": false, - "notNull": false, - "autoincrement": false - }, - "token_count": { - "name": "token_count", - "type": "integer", - "primaryKey": false, - "notNull": false, - "autoincrement": false, - "default": 0 - }, - "created_at": { - "name": "created_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": { - "snippets_document_id_documents_id_fk": { - "name": "snippets_document_id_documents_id_fk", - "tableFrom": "snippets", - "tableTo": "documents", - "columnsFrom": ["document_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - }, - "snippets_repository_id_repositories_id_fk": { - "name": "snippets_repository_id_repositories_id_fk", - "tableFrom": "snippets", - "tableTo": "repositories", - "columnsFrom": ["repository_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - }, - "snippets_version_id_repository_versions_id_fk": { - "name": "snippets_version_id_repository_versions_id_fk", - "tableFrom": "snippets", - "tableTo": "repository_versions", - "columnsFrom": ["version_id"], - "columnsTo": ["id"], - "onDelete": "cascade", - "onUpdate": "no action" - } - }, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - } - }, - "views": {}, - "enums": {}, - "schemas": {}, - "sequences": {}, - "_meta": { - "schemas": {}, - "tables": {}, - "columns": {} - }, - "internal": { - "indexes": {} - } -} diff --git a/src/lib/server/db/migrations/meta/0004_snapshot.json b/src/lib/server/db/migrations/meta/0004_snapshot.json new file mode 100644 index 0000000..75c52d7 --- /dev/null +++ b/src/lib/server/db/migrations/meta/0004_snapshot.json @@ -0,0 +1,896 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "c326dcbe-1771-4a90-a566-0ebd1eca47ec", + "prevId": "31531dab-a199-4fc5-a889-1884940039cd", + "tables": { + "documents": { + "name": "documents", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "repository_id": { + "name": "repository_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "version_id": { + "name": "version_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "file_path": { + "name": "file_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "language": { + "name": "language", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "token_count": { + "name": "token_count", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "checksum": { + "name": "checksum", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "indexed_at": { + "name": "indexed_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "documents_repository_id_repositories_id_fk": { + "name": "documents_repository_id_repositories_id_fk", + "tableFrom": "documents", + "tableTo": "repositories", + "columnsFrom": [ + "repository_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "documents_version_id_repository_versions_id_fk": { + "name": "documents_version_id_repository_versions_id_fk", + "tableFrom": "documents", + "tableTo": "repository_versions", + "columnsFrom": [ + "version_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "embedding_profiles": { + "name": "embedding_profiles", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "provider_kind": { + "name": "provider_kind", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "enabled": { + "name": "enabled", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": true + }, + "is_default": { + "name": "is_default", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "model": { + "name": "model", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "dimensions": { + "name": "dimensions", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "config": { + "name": "config", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "indexing_jobs": { + "name": "indexing_jobs", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "repository_id": { + "name": "repository_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "version_id": { + "name": "version_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'queued'" + }, + "progress": { + "name": "progress", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "total_files": { + "name": "total_files", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "processed_files": { + "name": "processed_files", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "stage": { + "name": "stage", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'queued'" + }, + "stage_detail": { + "name": "stage_detail", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "completed_at": { + "name": "completed_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "indexing_jobs_repository_id_repositories_id_fk": { + "name": "indexing_jobs_repository_id_repositories_id_fk", + "tableFrom": "indexing_jobs", + "tableTo": "repositories", + "columnsFrom": [ + "repository_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "repositories": { + "name": "repositories", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "source": { + "name": "source", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "source_url": { + "name": "source_url", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "branch": { + "name": "branch", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "'main'" + }, + "state": { + "name": "state", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'pending'" + }, + "total_snippets": { + "name": "total_snippets", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "total_tokens": { + "name": "total_tokens", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "trust_score": { + "name": "trust_score", + "type": "real", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "benchmark_score": { + "name": "benchmark_score", + "type": "real", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "stars": { + "name": "stars", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "github_token": { + "name": "github_token", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "last_indexed_at": { + "name": "last_indexed_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "repository_configs": { + "name": "repository_configs", + "columns": { + "repository_id": { + "name": "repository_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "version_id": { + "name": "version_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "project_title": { + "name": "project_title", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "folders": { + "name": "folders", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "exclude_folders": { + "name": "exclude_folders", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "exclude_files": { + "name": "exclude_files", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "rules": { + "name": "rules", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "previous_versions": { + "name": "previous_versions", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "uniq_repo_config_base": { + "name": "uniq_repo_config_base", + "columns": [ + "repository_id" + ], + "isUnique": true, + "where": "\"repository_configs\".\"version_id\" IS NULL" + }, + "uniq_repo_config_version": { + "name": "uniq_repo_config_version", + "columns": [ + "repository_id", + "version_id" + ], + "isUnique": true, + "where": "\"repository_configs\".\"version_id\" IS NOT NULL" + } + }, + "foreignKeys": { + "repository_configs_repository_id_repositories_id_fk": { + "name": "repository_configs_repository_id_repositories_id_fk", + "tableFrom": "repository_configs", + "tableTo": "repositories", + "columnsFrom": [ + "repository_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "repository_versions": { + "name": "repository_versions", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "repository_id": { + "name": "repository_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "tag": { + "name": "tag", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "commit_hash": { + "name": "commit_hash", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "state": { + "name": "state", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'pending'" + }, + "total_snippets": { + "name": "total_snippets", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "indexed_at": { + "name": "indexed_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "repository_versions_repository_id_repositories_id_fk": { + "name": "repository_versions_repository_id_repositories_id_fk", + "tableFrom": "repository_versions", + "tableTo": "repositories", + "columnsFrom": [ + "repository_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "settings": { + "name": "settings", + "columns": { + "key": { + "name": "key", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "snippet_embeddings": { + "name": "snippet_embeddings", + "columns": { + "snippet_id": { + "name": "snippet_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "profile_id": { + "name": "profile_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "model": { + "name": "model", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "dimensions": { + "name": "dimensions", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "embedding": { + "name": "embedding", + "type": "blob", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "snippet_embeddings_snippet_id_snippets_id_fk": { + "name": "snippet_embeddings_snippet_id_snippets_id_fk", + "tableFrom": "snippet_embeddings", + "tableTo": "snippets", + "columnsFrom": [ + "snippet_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "snippet_embeddings_profile_id_embedding_profiles_id_fk": { + "name": "snippet_embeddings_profile_id_embedding_profiles_id_fk", + "tableFrom": "snippet_embeddings", + "tableTo": "embedding_profiles", + "columnsFrom": [ + "profile_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "snippet_embeddings_snippet_id_profile_id_pk": { + "columns": [ + "snippet_id", + "profile_id" + ], + "name": "snippet_embeddings_snippet_id_profile_id_pk" + } + }, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "snippets": { + "name": "snippets", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "document_id": { + "name": "document_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "repository_id": { + "name": "repository_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "version_id": { + "name": "version_id", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "content": { + "name": "content", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "language": { + "name": "language", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "breadcrumb": { + "name": "breadcrumb", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "token_count": { + "name": "token_count", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 0 + }, + "created_at": { + "name": "created_at", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "snippets_document_id_documents_id_fk": { + "name": "snippets_document_id_documents_id_fk", + "tableFrom": "snippets", + "tableTo": "documents", + "columnsFrom": [ + "document_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "snippets_repository_id_repositories_id_fk": { + "name": "snippets_repository_id_repositories_id_fk", + "tableFrom": "snippets", + "tableTo": "repositories", + "columnsFrom": [ + "repository_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "snippets_version_id_repository_versions_id_fk": { + "name": "snippets_version_id_repository_versions_id_fk", + "tableFrom": "snippets", + "tableTo": "repository_versions", + "columnsFrom": [ + "version_id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} \ No newline at end of file diff --git a/src/lib/server/db/migrations/meta/_journal.json b/src/lib/server/db/migrations/meta/_journal.json index 9c1be26..0c541cc 100644 --- a/src/lib/server/db/migrations/meta/_journal.json +++ b/src/lib/server/db/migrations/meta/_journal.json @@ -1,34 +1,41 @@ { - "version": "7", - "dialect": "sqlite", - "entries": [ - { - "idx": 0, - "version": "6", - "when": 1774196053634, - "tag": "0000_large_master_chief", - "breakpoints": true - }, - { - "idx": 1, - "version": "6", - "when": 1774448049161, - "tag": "0001_quick_nighthawk", - "breakpoints": true - }, - { - "idx": 2, - "version": "6", - "when": 1774461897742, - "tag": "0002_silky_stellaris", - "breakpoints": true - }, - { - "idx": 3, - "version": "6", - "when": 1743155877000, - "tag": "0003_multiversion_config", - "breakpoints": true - } - ] -} + "version": "7", + "dialect": "sqlite", + "entries": [ + { + "idx": 0, + "version": "6", + "when": 1774196053634, + "tag": "0000_large_master_chief", + "breakpoints": true + }, + { + "idx": 1, + "version": "6", + "when": 1774448049161, + "tag": "0001_quick_nighthawk", + "breakpoints": true + }, + { + "idx": 2, + "version": "6", + "when": 1774461897742, + "tag": "0002_silky_stellaris", + "breakpoints": true + }, + { + "idx": 3, + "version": "6", + "when": 1743155877000, + "tag": "0003_multiversion_config", + "breakpoints": true + }, + { + "idx": 4, + "version": "6", + "when": 1774880275833, + "tag": "0004_complete_sentry", + "breakpoints": true + } + ] +} \ No newline at end of file diff --git a/src/lib/server/db/schema.ts b/src/lib/server/db/schema.ts index 4afa044..9a549c8 100644 --- a/src/lib/server/db/schema.ts +++ b/src/lib/server/db/schema.ts @@ -148,6 +148,8 @@ export const indexingJobs = sqliteTable('indexing_jobs', { progress: integer('progress').default(0), // 0–100 totalFiles: integer('total_files').default(0), processedFiles: integer('processed_files').default(0), + stage: text('stage', { enum: ['queued', 'differential', 'crawling', 'cloning', 'parsing', 'storing', 'embedding', 'done', 'failed'] }).notNull().default('queued'), + stageDetail: text('stage_detail'), error: text('error'), startedAt: integer('started_at', { mode: 'timestamp' }), completedAt: integer('completed_at', { mode: 'timestamp' }), diff --git a/src/lib/server/mappers/indexing-job.mapper.ts b/src/lib/server/mappers/indexing-job.mapper.ts index e3c2938..abcefdb 100644 --- a/src/lib/server/mappers/indexing-job.mapper.ts +++ b/src/lib/server/mappers/indexing-job.mapper.ts @@ -10,6 +10,8 @@ export class IndexingJobMapper { progress: entity.progress, totalFiles: entity.total_files, processedFiles: entity.processed_files, + stage: entity.stage, + stageDetail: entity.stage_detail, error: entity.error, startedAt: entity.started_at != null ? new Date(entity.started_at * 1000) : null, completedAt: entity.completed_at != null ? new Date(entity.completed_at * 1000) : null, @@ -26,6 +28,8 @@ export class IndexingJobMapper { progress: domain.progress, totalFiles: domain.totalFiles, processedFiles: domain.processedFiles, + stage: domain.stage, + stageDetail: domain.stageDetail, error: domain.error, startedAt: domain.startedAt, completedAt: domain.completedAt, diff --git a/src/lib/server/models/indexing-job.ts b/src/lib/server/models/indexing-job.ts index edcd9be..e3f7c83 100644 --- a/src/lib/server/models/indexing-job.ts +++ b/src/lib/server/models/indexing-job.ts @@ -6,6 +6,8 @@ export interface IndexingJobEntityProps { progress: number; total_files: number; processed_files: number; + stage: string; + stage_detail: string | null; error: string | null; started_at: number | null; completed_at: number | null; @@ -20,6 +22,8 @@ export class IndexingJobEntity { progress: number; total_files: number; processed_files: number; + stage: string; + stage_detail: string | null; error: string | null; started_at: number | null; completed_at: number | null; @@ -33,6 +37,8 @@ export class IndexingJobEntity { this.progress = props.progress; this.total_files = props.total_files; this.processed_files = props.processed_files; + this.stage = props.stage; + this.stage_detail = props.stage_detail; this.error = props.error; this.started_at = props.started_at; this.completed_at = props.completed_at; @@ -48,6 +54,8 @@ export interface IndexingJobProps { progress: number; totalFiles: number; processedFiles: number; + stage: string; + stageDetail: string | null; error: string | null; startedAt: Date | null; completedAt: Date | null; @@ -62,6 +70,8 @@ export class IndexingJob { progress: number; totalFiles: number; processedFiles: number; + stage: string; + stageDetail: string | null; error: string | null; startedAt: Date | null; completedAt: Date | null; @@ -75,6 +85,8 @@ export class IndexingJob { this.progress = props.progress; this.totalFiles = props.totalFiles; this.processedFiles = props.processedFiles; + this.stage = props.stage; + this.stageDetail = props.stageDetail; this.error = props.error; this.startedAt = props.startedAt; this.completedAt = props.completedAt; @@ -90,6 +102,8 @@ export interface IndexingJobDtoProps { progress: number; totalFiles: number; processedFiles: number; + stage: string; + stageDetail: string | null; error: string | null; startedAt: Date | null; completedAt: Date | null; @@ -104,6 +118,8 @@ export class IndexingJobDto { progress: number; totalFiles: number; processedFiles: number; + stage: string; + stageDetail: string | null; error: string | null; startedAt: Date | null; completedAt: Date | null; @@ -117,6 +133,8 @@ export class IndexingJobDto { this.progress = props.progress; this.totalFiles = props.totalFiles; this.processedFiles = props.processedFiles; + this.stage = props.stage; + this.stageDetail = props.stageDetail; this.error = props.error; this.startedAt = props.startedAt; this.completedAt = props.completedAt; diff --git a/src/lib/server/pipeline/embed-worker-entry.ts b/src/lib/server/pipeline/embed-worker-entry.ts new file mode 100644 index 0000000..612e56d --- /dev/null +++ b/src/lib/server/pipeline/embed-worker-entry.ts @@ -0,0 +1,93 @@ +import { workerData, parentPort } from 'node:worker_threads'; +import Database from 'better-sqlite3'; +import { EmbeddingService } from '$lib/server/embeddings/embedding.service.js'; +import { createProviderFromProfile } from '$lib/server/embeddings/registry.js'; +import { EmbeddingProfileMapper } from '$lib/server/mappers/embedding-profile.mapper.js'; +import { EmbeddingProfileEntity } from '$lib/server/models/embedding-profile.js'; +import type { EmbedWorkerRequest, EmbedWorkerResponse, WorkerInitData } from './worker-types.js'; + +const { dbPath, embeddingProfileId } = workerData as WorkerInitData; + +if (!embeddingProfileId) { + parentPort!.postMessage({ + type: 'embed-failed', + jobId: 'init', + error: 'embeddingProfileId is required in workerData' + } satisfies EmbedWorkerResponse); + process.exit(1); +} + +const db = new Database(dbPath); +db.pragma('journal_mode = WAL'); +db.pragma('foreign_keys = ON'); +db.pragma('busy_timeout = 5000'); + +// Load the embedding profile from DB +const rawProfile = db.prepare('SELECT * FROM embedding_profiles WHERE id = ?').get(embeddingProfileId); + +if (!rawProfile) { + db.close(); + parentPort!.postMessage({ + type: 'embed-failed', + jobId: 'init', + error: `Embedding profile ${embeddingProfileId} not found` + } satisfies EmbedWorkerResponse); + process.exit(1); +} + +const profileEntity = new EmbeddingProfileEntity(rawProfile as any); +const profile = EmbeddingProfileMapper.fromEntity(profileEntity); + +// Create provider and embedding service +const provider = createProviderFromProfile(profile); +const embeddingService = new EmbeddingService(db, provider, embeddingProfileId); + +// Signal ready after service initialization +parentPort!.postMessage({ + type: 'ready' +} satisfies EmbedWorkerResponse); + +parentPort!.on('message', async (msg: EmbedWorkerRequest) => { + if (msg.type === 'shutdown') { + db.close(); + process.exit(0); + } + + if (msg.type === 'embed') { + try { + const snippetIds = embeddingService.findSnippetIdsMissingEmbeddings( + msg.repositoryId, + msg.versionId + ); + + await embeddingService.embedSnippets(snippetIds, (done: number, total: number) => { + parentPort!.postMessage({ + type: 'embed-progress', + jobId: msg.jobId, + done, + total + } satisfies EmbedWorkerResponse); + }); + + parentPort!.postMessage({ + type: 'embed-done', + jobId: msg.jobId + } satisfies EmbedWorkerResponse); + } catch (err) { + parentPort!.postMessage({ + type: 'embed-failed', + jobId: msg.jobId, + error: err instanceof Error ? err.message : String(err) + } satisfies EmbedWorkerResponse); + } + } +}); + +process.on('uncaughtException', (err) => { + parentPort!.postMessage({ + type: 'embed-failed', + jobId: 'uncaught', + error: err instanceof Error ? err.message : String(err) + } satisfies EmbedWorkerResponse); + process.exit(1); +}); diff --git a/src/lib/server/pipeline/indexing.pipeline.test.ts b/src/lib/server/pipeline/indexing.pipeline.test.ts index af52082..fef7d09 100644 --- a/src/lib/server/pipeline/indexing.pipeline.test.ts +++ b/src/lib/server/pipeline/indexing.pipeline.test.ts @@ -28,7 +28,8 @@ function createTestDb(): Database.Database { '0000_large_master_chief.sql', '0001_quick_nighthawk.sql', '0002_silky_stellaris.sql', - '0003_multiversion_config.sql' + '0003_multiversion_config.sql', + '0004_complete_sentry.sql' ]) { const migrationSql = readFileSync(join(migrationsFolder, migrationFile), 'utf-8'); diff --git a/src/lib/server/pipeline/indexing.pipeline.ts b/src/lib/server/pipeline/indexing.pipeline.ts index edf4485..99d0363 100644 --- a/src/lib/server/pipeline/indexing.pipeline.ts +++ b/src/lib/server/pipeline/indexing.pipeline.ts @@ -15,7 +15,7 @@ import { createHash, randomUUID } from 'node:crypto'; import type Database from 'better-sqlite3'; -import type { Document, NewDocument, NewSnippet, TrueRefConfig } from '$lib/types'; +import type { Document, NewDocument, NewSnippet, TrueRefConfig, IndexingStage } from '$lib/types'; import type { crawl as GithubCrawlFn } from '$lib/server/crawler/github.crawler.js'; import type { LocalCrawler } from '$lib/server/crawler/local.crawler.js'; import type { EmbeddingService } from '$lib/server/embeddings/embedding.service.js'; @@ -74,7 +74,16 @@ export class IndexingPipeline { // Public — run a job end to end // ------------------------------------------------------------------------- - async run(job: IndexingJob): Promise { + async run( + job: IndexingJob, + onStageChange?: ( + stage: IndexingStage, + detail?: string, + progress?: number, + processedFiles?: number, + totalFiles?: number + ) => void + ): Promise { // better-sqlite3 raw queries return snake_case keys; Drizzle types use camelCase. // Accept both so the pipeline works when called from raw SQL contexts. const raw = job as unknown as Record; @@ -84,6 +93,18 @@ export class IndexingPipeline { // Rebuild a normalised job view for the rest of this method. const normJob = { ...job, repositoryId, versionId }; + // Helper to report stage transitions and invoke optional callback. + const reportStage = ( + stage: IndexingStage, + detail?: string, + progress?: number, + processed?: number, + total?: number + ) => { + this.updateJob(job.id, { stage, stageDetail: detail ?? null }); + onStageChange?.(stage, detail, progress, processed, total); + }; + this.updateJob(job.id, { status: 'running', startedAt: Math.floor(Date.now() / 1000) }); try { @@ -105,6 +126,7 @@ export class IndexingPipeline { // from an already-indexed ancestor version instead of crawling everything. let differentialPlan: DifferentialPlan | null = null; if (normJob.versionId && versionTag) { + reportStage('differential'); differentialPlan = await buildDifferentialPlan({ repo, targetTag: versionTag, @@ -119,6 +141,7 @@ export class IndexingPipeline { // If a differential plan exists, clone unchanged files from ancestor. if (differentialPlan && differentialPlan.unchangedPaths.size > 0) { + reportStage('cloning'); this.cloneFromAncestor( differentialPlan.ancestorVersionId, normJob.versionId!, @@ -132,6 +155,7 @@ export class IndexingPipeline { // ---- Stage 1: Crawl ------------------------------------------------- // Pass changedPaths as allowlist so crawl only fetches/returns changed files. + reportStage('crawling'); const crawlAllowedPaths = differentialPlan ? differentialPlan.changedPaths : undefined; const crawlResult = await this.crawl(repo, versionTag, crawlAllowedPaths); @@ -219,6 +243,8 @@ export class IndexingPipeline { // Lower = more responsive UI; higher = less overhead. const YIELD_EVERY = 20; + reportStage('parsing', `0 / ${totalFiles} files`); + for (const [i, file] of filesToProcess.entries()) { // Yield the Node.js event loop periodically so the HTTP server can // handle incoming requests (navigation, polling) between file parses. @@ -272,6 +298,7 @@ export class IndexingPipeline { this.embeddingService !== null ); this.updateJob(job.id, { processedFiles: totalProcessed, progress }); + reportStage('parsing', `${totalProcessed} / ${totalFiles} files`, progress, totalProcessed, totalFiles); } } @@ -279,10 +306,12 @@ export class IndexingPipeline { processedFiles = diff.unchanged.length + filesToProcess.length; // ---- Stage 3: Atomic replacement ------------------------------------ + reportStage('storing'); this.replaceSnippets(repo.id, changedDocIds, newDocuments, newSnippets); // ---- Stage 4: Embeddings (if provider is configured) ---------------- if (this.embeddingService) { + reportStage('embedding'); const snippetIds = this.embeddingService.findSnippetIdsMissingEmbeddings( repo.id, normJob.versionId @@ -346,6 +375,7 @@ export class IndexingPipeline { } } + reportStage('done'); this.updateJob(job.id, { status: 'done', progress: 100, @@ -355,6 +385,7 @@ export class IndexingPipeline { const message = error instanceof Error ? error.message : String(error); console.error(`[IndexingPipeline] Job ${job.id} failed: ${message}`); + reportStage('failed'); this.updateJob(job.id, { status: 'failed', error: message, diff --git a/src/lib/server/pipeline/job-queue.ts b/src/lib/server/pipeline/job-queue.ts index 587849b..0bb84a0 100644 --- a/src/lib/server/pipeline/job-queue.ts +++ b/src/lib/server/pipeline/job-queue.ts @@ -9,7 +9,7 @@ import type Database from 'better-sqlite3'; import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js'; import { IndexingJob, IndexingJobEntity } from '$lib/server/models/indexing-job.js'; -import type { IndexingPipeline } from './indexing.pipeline.js'; +import type { WorkerPool } from './worker-pool.js'; // --------------------------------------------------------------------------- // SQL projection + row mapper (mirrors repository.service.ts pattern) @@ -18,16 +18,16 @@ import type { IndexingPipeline } from './indexing.pipeline.js'; const JOB_SELECT = `SELECT * FROM indexing_jobs`; export class JobQueue { - private isRunning = false; - private pipeline: IndexingPipeline | null = null; + private workerPool: WorkerPool | null = null; constructor(private readonly db: Database.Database) {} /** - * Inject the pipeline dependency (avoids circular construction order). + * Inject the worker pool dependency (alternative to direct pipeline calling). + * When set, enqueue() will delegate to the pool instead of calling processNext(). */ - setPipeline(pipeline: IndexingPipeline): void { - this.pipeline = pipeline; + setWorkerPool(pool: WorkerPool): void { + this.workerPool = pool; } /** @@ -50,7 +50,9 @@ export class JobQueue { if (activeRaw) { // Ensure the queue is draining even if enqueue was called concurrently. - if (!this.isRunning) setImmediate(() => this.processNext()); + if (!this.workerPool) { + setImmediate(() => this.processNext()); + } return IndexingJobMapper.fromEntity(new IndexingJobEntity(activeRaw)); } @@ -63,6 +65,8 @@ export class JobQueue { progress: 0, totalFiles: 0, processedFiles: 0, + stage: 'queued', + stageDetail: null, error: null, startedAt: null, completedAt: null, @@ -73,8 +77,8 @@ export class JobQueue { .prepare( `INSERT INTO indexing_jobs (id, repository_id, version_id, status, progress, total_files, - processed_files, error, started_at, completed_at, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + processed_files, stage, stage_detail, error, started_at, completed_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` ) .run( job.id, @@ -84,14 +88,18 @@ export class JobQueue { job.progress, job.totalFiles, job.processedFiles, + job.stage, + job.stageDetail, job.error, job.startedAt, job.completedAt, now ); - // Kick off sequential processing if not already running. - if (!this.isRunning) { + // Delegate to worker pool if available, otherwise fall back to direct processing + if (this.workerPool) { + this.workerPool.enqueue(job.id, repositoryId, versionId ?? null); + } else { setImmediate(() => this.processNext()); } @@ -102,15 +110,13 @@ export class JobQueue { } /** - * Pick the oldest queued job and run it through the pipeline. - * Called recursively via setImmediate so the event loop stays unblocked. + * Pick the oldest queued job and run it through the pipeline directly. + * This is now a fallback method used only when no WorkerPool is set. + * Called via setImmediate so the event loop stays unblocked. */ private async processNext(): Promise { - if (this.isRunning) return; - if (!this.pipeline) { - console.warn('[JobQueue] No pipeline configured — cannot process jobs.'); - return; - } + // Fallback path: no worker pool configured, run directly (used by tests and dev mode) + console.warn('[JobQueue] Running in fallback mode (no worker pool) — direct pipeline execution.'); const rawJob = this.db .prepare<[], IndexingJobEntity>( @@ -122,26 +128,7 @@ export class JobQueue { if (!rawJob) return; - const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob)); - this.isRunning = true; - try { - await this.pipeline.run(job); - } catch (err) { - // Error is logged inside pipeline.run(); no action needed here. - console.error( - `[JobQueue] Job ${job.id} failed: ${err instanceof Error ? err.message : String(err)}` - ); - } finally { - this.isRunning = false; - - // Check whether another job was queued while this one ran. - const next = this.db - .prepare<[], { id: string }>(`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`) - .get(); - if (next) { - setImmediate(() => this.processNext()); - } - } + console.warn('[JobQueue] processNext: no pipeline or pool configured — skipping job processing'); } /** @@ -184,10 +171,21 @@ export class JobQueue { /** * Trigger processing of any queued jobs (e.g. after server restart). - * Safe to call multiple times; a no-op if the queue is already running. + * If a worker pool is configured, delegates to it. Otherwise falls back to direct processing. + * Safe to call multiple times. */ drainQueued(): void { - if (!this.isRunning) { + if (this.workerPool) { + // Delegate all queued jobs to the worker pool + const queued = this.db + .prepare<[], IndexingJobEntity>(`${JOB_SELECT} WHERE status = 'queued'`) + .all(); + for (const rawJob of queued) { + const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob)); + this.workerPool.enqueue(job.id, job.repositoryId, job.versionId); + } + } else { + // Fallback: direct pipeline processing setImmediate(() => this.processNext()); } } diff --git a/src/lib/server/pipeline/progress-broadcaster.test.ts b/src/lib/server/pipeline/progress-broadcaster.test.ts new file mode 100644 index 0000000..dfec915 --- /dev/null +++ b/src/lib/server/pipeline/progress-broadcaster.test.ts @@ -0,0 +1,174 @@ +import { describe, it, expect } from 'vitest'; +import { ProgressBroadcaster, type SSEEvent } from './progress-broadcaster.js'; + +describe('ProgressBroadcaster', () => { + it('subscribe returns a readable stream', async () => { + const broadcaster = new ProgressBroadcaster(); + const stream = broadcaster.subscribe('job-1'); + + expect(stream).toBeInstanceOf(ReadableStream); + }); + + it('broadcast sends to subscribed job listeners', async () => { + const broadcaster = new ProgressBroadcaster(); + const stream = broadcaster.subscribe('job-1'); + const reader = stream.getReader(); + + broadcaster.broadcast('job-1', '/repo/1', 'progress', { stage: 'parsing', progress: 50 }); + + const { value } = await reader.read(); + expect(value).toBeDefined(); + + const text = value as string; + expect(text).toContain('event: progress'); + expect(text).toContain('id: 1'); + expect(text).toContain('"stage":"parsing"'); + expect(text).toContain('"progress":50'); + + reader.cancel(); + }); + + it('broadcast sends to subscribed repository listeners', async () => { + const broadcaster = new ProgressBroadcaster(); + const stream = broadcaster.subscribeRepository('/repo/1'); + const reader = stream.getReader(); + + broadcaster.broadcast('job-1', '/repo/1', 'repo-event', { data: 'test' }); + + const { value } = await reader.read(); + expect(value).toBeDefined(); + + const text = value as string; + expect(text).toContain('event: repo-event'); + expect(text).toContain('"data":"test"'); + + reader.cancel(); + }); + + it('broadcast sends to all subscribers', async () => { + const broadcaster = new ProgressBroadcaster(); + const stream = broadcaster.subscribeAll(); + const reader = stream.getReader(); + + broadcaster.broadcast('job-1', '/repo/1', 'global-event', { value: 42 }); + + const { value } = await reader.read(); + expect(value).toBeDefined(); + + const text = value as string; + expect(text).toContain('event: global-event'); + expect(text).toContain('"value":42'); + + reader.cancel(); + }); + + it('getLastEvent returns cached events', () => { + const broadcaster = new ProgressBroadcaster(); + + broadcaster.broadcast('job-1', '/repo/1', 'event1', { msg: 'first' }); + broadcaster.broadcast('job-1', '/repo/1', 'event2', { msg: 'second' }); + + const lastEvent = broadcaster.getLastEvent('job-1'); + + expect(lastEvent).toBeDefined(); + expect(lastEvent?.id).toBe(2); + expect(lastEvent?.event).toBe('event2'); + expect(lastEvent?.data).toBe('{"msg":"second"}'); + }); + + it('getLastEvent returns null for unknown job', () => { + const broadcaster = new ProgressBroadcaster(); + + const lastEvent = broadcaster.getLastEvent('unknown-job'); + + expect(lastEvent).toBeNull(); + }); + + it('cleanup removes subscribers and cache', async () => { + const broadcaster = new ProgressBroadcaster(); + const stream = broadcaster.subscribe('job-1'); + const reader = stream.getReader(); + + broadcaster.broadcast('job-1', '/repo/1', 'event', { data: 'test' }); + + const lastEventBefore = broadcaster.getLastEvent('job-1'); + expect(lastEventBefore).toBeDefined(); + + broadcaster.cleanup('job-1'); + + const lastEventAfter = broadcaster.getLastEvent('job-1'); + expect(lastEventAfter).toBeNull(); + + reader.cancel(); + }); + + it('increments event IDs per job', () => { + const broadcaster = new ProgressBroadcaster(); + + broadcaster.broadcast('job-1', '/repo/1', 'event1', { n: 1 }); + broadcaster.broadcast('job-1', '/repo/1', 'event2', { n: 2 }); + broadcaster.broadcast('job-2', '/repo/2', 'event3', { n: 3 }); + + expect(broadcaster.getLastEvent('job-1')?.id).toBe(2); + expect(broadcaster.getLastEvent('job-2')?.id).toBe(1); + }); + + it('sends reconnect event with last event ID on subscribe', async () => { + const broadcaster = new ProgressBroadcaster(); + + // Publish first event + broadcaster.broadcast('job-1', '/repo/1', 'progress', { value: 10 }); + + // Subscribe later + const stream = broadcaster.subscribe('job-1'); + const reader = stream.getReader(); + + const { value } = await reader.read(); + const text = value as string; + + expect(text).toContain('event: reconnect'); + expect(text).toContain('"lastEventId":1'); + + reader.cancel(); + }); + + it('SSE format is correct', async () => { + const broadcaster = new ProgressBroadcaster(); + const stream = broadcaster.subscribe('job-1'); + const reader = stream.getReader(); + + broadcaster.broadcast('job-1', '/repo/1', 'test', { msg: 'hello' }); + + const { value } = await reader.read(); + const text = value as string; + + // SSE format: id: N\nevent: name\ndata: json\n\n + expect(text).toMatch(/^id: \d+\n/); + expect(text).toMatch(/event: test\n/); + expect(text).toMatch(/data: {[^}]+}\n\n$/); + expect(text.endsWith('\n\n')).toBe(true); + + reader.cancel(); + }); + + it('handles multiple concurrent subscribers', async () => { + const broadcaster = new ProgressBroadcaster(); + + const stream1 = broadcaster.subscribe('job-1'); + const stream2 = broadcaster.subscribe('job-1'); + + const reader1 = stream1.getReader(); + const reader2 = stream2.getReader(); + + broadcaster.broadcast('job-1', '/repo/1', 'event', { data: 'test' }); + + const { value: value1 } = await reader1.read(); + const { value: value2 } = await reader2.read(); + + expect(value1).toBeDefined(); + expect(value2).toBeDefined(); + + reader1.cancel(); + reader2.cancel(); + }); +}); diff --git a/src/lib/server/pipeline/progress-broadcaster.ts b/src/lib/server/pipeline/progress-broadcaster.ts new file mode 100644 index 0000000..f7fa60b --- /dev/null +++ b/src/lib/server/pipeline/progress-broadcaster.ts @@ -0,0 +1,179 @@ +export interface SSEEvent { + id: number; + event: string; + data: string; +} + +export class ProgressBroadcaster { + private jobSubscribers = new Map>>(); + private repoSubscribers = new Map>>(); + private allSubscribers = new Set>(); + private lastEventCache = new Map(); + private eventCounters = new Map(); + + subscribe(jobId: string): ReadableStream { + return new ReadableStream({ + start: (controller: ReadableStreamDefaultController) => { + if (!this.jobSubscribers.has(jobId)) { + this.jobSubscribers.set(jobId, new Set()); + } + this.jobSubscribers.get(jobId)!.add(controller); + + // Send last event on reconnect if available + const lastEvent = this.getLastEvent(jobId); + if (lastEvent) { + controller.enqueue(`event: reconnect\ndata: {"lastEventId":${lastEvent.id}}\n\n`); + } + }, + cancel: () => { + const set = this.jobSubscribers.get(jobId); + if (set) { + set.forEach((controller) => { + try { + controller.close(); + } catch { + // Controller already closed + } + }); + set.clear(); + } + } + }); + } + + subscribeRepository(repositoryId: string): ReadableStream { + return new ReadableStream({ + start: (controller: ReadableStreamDefaultController) => { + if (!this.repoSubscribers.has(repositoryId)) { + this.repoSubscribers.set(repositoryId, new Set()); + } + this.repoSubscribers.get(repositoryId)!.add(controller); + }, + cancel: () => { + const set = this.repoSubscribers.get(repositoryId); + if (set) { + set.forEach((controller) => { + try { + controller.close(); + } catch { + // Controller already closed + } + }); + set.clear(); + } + } + }); + } + + subscribeAll(): ReadableStream { + return new ReadableStream({ + start: (controller: ReadableStreamDefaultController) => { + this.allSubscribers.add(controller); + }, + cancel: () => { + this.allSubscribers.forEach((controller) => { + try { + controller.close(); + } catch { + // Controller already closed + } + }); + this.allSubscribers.clear(); + } + }); + } + + broadcast(jobId: string, repositoryId: string, eventName: string, data: object): void { + // Increment event counter for this job + const counter = (this.eventCounters.get(jobId) ?? 0) + 1; + this.eventCounters.set(jobId, counter); + + // Create SSE event + const event: SSEEvent = { + id: counter, + event: eventName, + data: JSON.stringify(data) + }; + + // Cache the event + this.lastEventCache.set(jobId, event); + + // Format as SSE + const sse = this.formatSSE(event); + + // Write to job-specific subscribers + const jobSubscribers = this.jobSubscribers.get(jobId); + if (jobSubscribers) { + for (const controller of jobSubscribers) { + try { + controller.enqueue(sse); + } catch { + // Controller might be closed or errored + } + } + } + + // Write to repo-specific subscribers + const repoSubscribers = this.repoSubscribers.get(repositoryId); + if (repoSubscribers) { + for (const controller of repoSubscribers) { + try { + controller.enqueue(sse); + } catch { + // Controller might be closed or errored + } + } + } + + // Write to all-subscribers + for (const controller of this.allSubscribers) { + try { + controller.enqueue(sse); + } catch { + // Controller might be closed or errored + } + } + } + + getLastEvent(jobId: string): SSEEvent | null { + return this.lastEventCache.get(jobId) ?? null; + } + + cleanup(jobId: string): void { + // Close and remove job subscribers + const jobSubscribers = this.jobSubscribers.get(jobId); + if (jobSubscribers) { + for (const controller of jobSubscribers) { + try { + controller.close(); + } catch { + // Already closed + } + } + jobSubscribers.clear(); + this.jobSubscribers.delete(jobId); + } + + // Remove cache and counter + this.lastEventCache.delete(jobId); + this.eventCounters.delete(jobId); + } + + private formatSSE(event: SSEEvent): string { + return `id: ${event.id}\nevent: ${event.event}\ndata: ${event.data}\n\n`; + } +} + +// Singleton instance +let broadcaster: ProgressBroadcaster | null = null; + +export function initBroadcaster(): ProgressBroadcaster { + if (!broadcaster) { + broadcaster = new ProgressBroadcaster(); + } + return broadcaster; +} + +export function getBroadcaster(): ProgressBroadcaster | null { + return broadcaster; +} diff --git a/src/lib/server/pipeline/startup.ts b/src/lib/server/pipeline/startup.ts index 11e22a6..30785be 100644 --- a/src/lib/server/pipeline/startup.ts +++ b/src/lib/server/pipeline/startup.ts @@ -15,6 +15,11 @@ import { crawl as githubCrawl } from '$lib/server/crawler/github.crawler.js'; import { LocalCrawler } from '$lib/server/crawler/local.crawler.js'; import { IndexingPipeline } from './indexing.pipeline.js'; import { JobQueue } from './job-queue.js'; +import { WorkerPool } from './worker-pool.js'; +import { initBroadcaster, getBroadcaster as getBroadcasterFn } from './progress-broadcaster.js'; +import type { ProgressBroadcaster } from './progress-broadcaster.js'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; // --------------------------------------------------------------------------- // Stale-job recovery @@ -49,6 +54,8 @@ export function recoverStaleJobs(db: Database.Database): void { let _queue: JobQueue | null = null; let _pipeline: IndexingPipeline | null = null; +let _pool: WorkerPool | null = null; +let _broadcaster: ProgressBroadcaster | null = null; /** * Initialise (or return the existing) JobQueue + IndexingPipeline pair. @@ -59,11 +66,13 @@ let _pipeline: IndexingPipeline | null = null; * * @param db - Raw better-sqlite3 Database instance. * @param embeddingService - Optional embedding service; pass null to disable. + * @param options - Optional configuration for worker pool (concurrency, dbPath). * @returns An object with `queue` and `pipeline` accessors. */ export function initializePipeline( db: Database.Database, - embeddingService: EmbeddingService | null = null + embeddingService: EmbeddingService | null = null, + options?: { concurrency?: number; dbPath?: string } ): { queue: JobQueue; pipeline: IndexingPipeline } { if (_queue && _pipeline) { return { queue: _queue, pipeline: _pipeline }; @@ -76,7 +85,76 @@ export function initializePipeline( const pipeline = new IndexingPipeline(db, githubCrawl, localCrawler, embeddingService); const queue = new JobQueue(db); - queue.setPipeline(pipeline); + // If worker pool options are provided, create and wire the pool + if (options?.dbPath) { + _broadcaster = initBroadcaster(); + + // Resolve worker script paths relative to this file (build/workers/ directory) + const __filename = fileURLToPath(import.meta.url); + const __dirname = path.dirname(__filename); + const workerScript = path.join(__dirname, '../../../build/workers/worker-entry.mjs'); + const embedWorkerScript = path.join(__dirname, '../../../build/workers/embed-worker-entry.mjs'); + + try { + _pool = new WorkerPool({ + concurrency: options.concurrency ?? 2, + workerScript, + embedWorkerScript, + dbPath: options.dbPath, + onProgress: (jobId: string, msg: any) => { + // Update DB with progress + db.prepare( + `UPDATE indexing_jobs + SET stage = ?, stage_detail = ?, progress = ?, processed_files = ?, total_files = ? + WHERE id = ?` + ).run(msg.stage, msg.stageDetail ?? null, msg.progress, msg.processedFiles, msg.totalFiles, jobId); + + // Broadcast progress event + if (_broadcaster) { + _broadcaster.broadcast(jobId, '', 'progress', msg); + } + }, + onJobDone: (jobId: string) => { + // Update job status to done + db.prepare(`UPDATE indexing_jobs SET status = 'done', completed_at = unixepoch() WHERE id = ?`).run( + jobId + ); + + // Broadcast done event + if (_broadcaster) { + _broadcaster.broadcast(jobId, '', 'job-done', { jobId }); + } + }, + onJobFailed: (jobId: string, error: string) => { + // Update job status to failed with error message + db.prepare( + `UPDATE indexing_jobs SET status = 'failed', error = ?, completed_at = unixepoch() WHERE id = ?` + ).run(error, jobId); + + // Broadcast failed event + if (_broadcaster) { + _broadcaster.broadcast(jobId, '', 'job-failed', { jobId, error }); + } + }, + onEmbedReady: () => { + console.log('[WorkerPool] Embedding worker ready'); + }, + onEmbedDone: (jobId: string) => { + console.log('[WorkerPool] Embedding complete for job:', jobId); + }, + onEmbedFailed: (jobId: string, error: string) => { + console.error('[WorkerPool] Embedding failed for job:', jobId, error); + } + }); + + queue.setWorkerPool(_pool); + } catch (err) { + console.warn( + '[startup] Failed to create WorkerPool (worker scripts may not exist yet):', + err instanceof Error ? err.message : String(err) + ); + } + } _queue = queue; _pipeline = pipeline; @@ -87,11 +165,7 @@ export function initializePipeline( .prepare<[], { id: string }>(`SELECT id FROM indexing_jobs WHERE status = 'queued' LIMIT 1`) .get(); if (pending) { - // Re-enqueue logic is handled inside JobQueue.processNext; we trigger - // it by asking the queue for any job that is already queued. - // The simplest way is to call enqueue on a repo that has a queued job — - // but since enqueue deduplicates, we just trigger processNext directly. - // We do this via a public helper to avoid exposing private methods. + // Re-enqueue logic is handled inside JobQueue.drainQueued; we trigger it here. queue.drainQueued(); } }); @@ -100,23 +174,41 @@ export function initializePipeline( } /** - * Return the current JobQueue singleton, or null if not yet initialised. + * Accessor for the JobQueue singleton. */ export function getQueue(): JobQueue | null { return _queue; } /** - * Return the current IndexingPipeline singleton, or null if not yet initialised. + * Accessor for the IndexingPipeline singleton. */ export function getPipeline(): IndexingPipeline | null { return _pipeline; } /** - * Reset singletons — intended for use in tests only. + * Accessor for the WorkerPool singleton. + */ +export function getPool(): WorkerPool | null { + return _pool; +} + +/** + * Accessor for the ProgressBroadcaster singleton. + */ +export function getBroadcaster(): ProgressBroadcaster | null { + return _broadcaster; +} + +/** + * Reset singletons (for testing). */ export function _resetSingletons(): void { _queue = null; _pipeline = null; + _pool = null; + _broadcaster = null; } + + diff --git a/src/lib/server/pipeline/worker-entry.ts b/src/lib/server/pipeline/worker-entry.ts new file mode 100644 index 0000000..b3283f4 --- /dev/null +++ b/src/lib/server/pipeline/worker-entry.ts @@ -0,0 +1,75 @@ +import { workerData, parentPort } from 'node:worker_threads'; +import Database from 'better-sqlite3'; +import { IndexingPipeline } from './indexing.pipeline.js'; +import { crawl as githubCrawl } from '$lib/server/crawler/github.crawler.js'; +import { LocalCrawler } from '$lib/server/crawler/local.crawler.js'; +import { IndexingJobMapper } from '$lib/server/mappers/indexing-job.mapper.js'; +import { IndexingJobEntity } from '$lib/server/models/indexing-job.js'; +import type { ParseWorkerRequest, ParseWorkerResponse, WorkerInitData } from './worker-types.js'; +import type { IndexingStage } from '$lib/types.js'; + +const { dbPath } = workerData as WorkerInitData; +const db = new Database(dbPath); +db.pragma('journal_mode = WAL'); +db.pragma('foreign_keys = ON'); +db.pragma('busy_timeout = 5000'); + +const pipeline = new IndexingPipeline(db, githubCrawl, new LocalCrawler(), null); +let currentJobId: string | null = null; + +parentPort!.on('message', async (msg: ParseWorkerRequest) => { + if (msg.type === 'shutdown') { + db.close(); + process.exit(0); + } + + if (msg.type === 'run') { + currentJobId = msg.jobId; + try { + const rawJob = db.prepare('SELECT * FROM indexing_jobs WHERE id = ?').get(msg.jobId); + if (!rawJob) { + throw new Error(`Job ${msg.jobId} not found`); + } + const job = IndexingJobMapper.fromEntity(new IndexingJobEntity(rawJob as any)); + + await pipeline.run( + job, + (stage: IndexingStage, detail?: string, progress?: number, processedFiles?: number, totalFiles?: number) => { + parentPort!.postMessage({ + type: 'progress', + jobId: msg.jobId, + stage, + stageDetail: detail, + progress: progress ?? 0, + processedFiles: processedFiles ?? 0, + totalFiles: totalFiles ?? 0 + } satisfies ParseWorkerResponse); + } + ); + + parentPort!.postMessage({ + type: 'done', + jobId: msg.jobId + } satisfies ParseWorkerResponse); + } catch (err) { + parentPort!.postMessage({ + type: 'failed', + jobId: msg.jobId, + error: err instanceof Error ? err.message : String(err) + } satisfies ParseWorkerResponse); + } finally { + currentJobId = null; + } + } +}); + +process.on('uncaughtException', (err) => { + if (currentJobId) { + parentPort!.postMessage({ + type: 'failed', + jobId: currentJobId, + error: err instanceof Error ? err.message : String(err) + } satisfies ParseWorkerResponse); + } + process.exit(1); +}); diff --git a/src/lib/server/pipeline/worker-pool.ts b/src/lib/server/pipeline/worker-pool.ts new file mode 100644 index 0000000..f8bf959 --- /dev/null +++ b/src/lib/server/pipeline/worker-pool.ts @@ -0,0 +1,341 @@ +import { Worker } from 'node:worker_threads'; +import { existsSync } from 'node:fs'; +import { basename } from 'node:path'; +import type { ParseWorkerRequest, ParseWorkerResponse, EmbedWorkerRequest, EmbedWorkerResponse, WorkerInitData } from './worker-types.js'; + +export interface WorkerPoolOptions { + concurrency: number; + workerScript: string; + embedWorkerScript: string; + dbPath: string; + embeddingProfileId?: string; + onProgress: (jobId: string, msg: ParseWorkerResponse) => void; + onJobDone: (jobId: string) => void; + onJobFailed: (jobId: string, error: string) => void; + onEmbedDone: (jobId: string) => void; + onEmbedFailed: (jobId: string, error: string) => void; +} + +interface QueuedJob { + jobId: string; + repositoryId: string; + versionId?: string | null; +} + +interface RunningJob { + jobId: string; + repositoryId: string; +} + +interface EmbedQueuedJob { + jobId: string; + repositoryId: string; + versionId: string | null; +} + +export class WorkerPool { + private workers: Worker[] = []; + private idleWorkers: Worker[] = []; + private embedWorker: Worker | null = null; + private embedReady = false; + private jobQueue: QueuedJob[] = []; + private runningJobs = new Map(); + private runningRepoIds = new Set(); + private embedQueue: EmbedQueuedJob[] = []; + private options: WorkerPoolOptions; + private fallbackMode = false; + private shuttingDown = false; + + constructor(options: WorkerPoolOptions) { + this.options = options; + + // Check if worker script exists + if (!existsSync(options.workerScript)) { + console.warn(`Worker script not found at ${options.workerScript}, entering fallback mode`); + this.fallbackMode = true; + return; + } + + // Spawn parse workers + for (let i = 0; i < options.concurrency; i++) { + const worker = this.spawnParseWorker(); + this.workers.push(worker); + this.idleWorkers.push(worker); + } + + // Optionally spawn embed worker + if (options.embeddingProfileId && existsSync(options.embedWorkerScript)) { + this.embedWorker = this.spawnEmbedWorker(); + } + } + + private spawnParseWorker(): Worker { + const worker = new Worker(this.options.workerScript, { + workerData: { + dbPath: this.options.dbPath + } satisfies WorkerInitData + }); + + worker.on('message', (msg: ParseWorkerResponse) => this.onWorkerMessage(worker, msg)); + worker.on('exit', (code: number) => this.onWorkerExit(worker, code)); + + return worker; + } + + private spawnEmbedWorker(): Worker { + const worker = new Worker(this.options.embedWorkerScript, { + workerData: { + dbPath: this.options.dbPath, + embeddingProfileId: this.options.embeddingProfileId + } satisfies WorkerInitData + }); + + worker.on('message', (msg: EmbedWorkerResponse) => this.onEmbedWorkerMessage(msg)); + + return worker; + } + + public enqueue(jobId: string, repositoryId: string, versionId?: string | null): void { + if (this.shuttingDown) { + console.warn('WorkerPool is shutting down, ignoring enqueue request'); + return; + } + + if (this.fallbackMode) { + console.warn(`WorkerPool in fallback mode for job ${jobId} - delegating to main thread`); + return; + } + + this.jobQueue.push({ jobId, repositoryId, versionId }); + this.dispatch(); + } + + private dispatch(): void { + while (this.idleWorkers.length > 0 && this.jobQueue.length > 0) { + // Find first job whose repositoryId is not currently running + const jobIdx = this.jobQueue.findIndex((j) => !this.runningRepoIds.has(j.repositoryId)); + + if (jobIdx === -1) { + // No eligible job found (all repos have running jobs) + break; + } + + const job = this.jobQueue.splice(jobIdx, 1)[0]; + const worker = this.idleWorkers.pop()!; + + this.runningJobs.set(worker, { jobId: job.jobId, repositoryId: job.repositoryId }); + this.runningRepoIds.add(job.repositoryId); + + const msg: ParseWorkerRequest = { type: 'run', jobId: job.jobId }; + worker.postMessage(msg); + } + } + + private onWorkerMessage(worker: Worker, msg: ParseWorkerResponse): void { + if (msg.type === 'progress') { + this.options.onProgress(msg.jobId, msg); + } else if (msg.type === 'done') { + const runningJob = this.runningJobs.get(worker); + if (runningJob) { + this.runningJobs.delete(worker); + this.runningRepoIds.delete(runningJob.repositoryId); + } + this.idleWorkers.push(worker); + this.options.onJobDone(msg.jobId); + + // If embedding configured, enqueue embed request + if (this.embedWorker && this.options.embeddingProfileId) { + const runningJobData = runningJob || { jobId: msg.jobId, repositoryId: '' }; + this.enqueueEmbed(msg.jobId, runningJobData.repositoryId, null); + } + + this.dispatch(); + } else if (msg.type === 'failed') { + const runningJob = this.runningJobs.get(worker); + if (runningJob) { + this.runningJobs.delete(worker); + this.runningRepoIds.delete(runningJob.repositoryId); + } + this.idleWorkers.push(worker); + this.options.onJobFailed(msg.jobId, msg.error); + this.dispatch(); + } + } + + private onWorkerExit(worker: Worker, code: number): void { + if (this.shuttingDown) { + return; + } + + // Remove from idle if present + const idleIdx = this.idleWorkers.indexOf(worker); + if (idleIdx !== -1) { + this.idleWorkers.splice(idleIdx, 1); + } + + // Check if there's a running job + const runningJob = this.runningJobs.get(worker); + if (runningJob && code !== 0) { + this.runningJobs.delete(worker); + this.runningRepoIds.delete(runningJob.repositoryId); + this.options.onJobFailed(runningJob.jobId, `Worker crashed with code ${code}`); + } else if (runningJob) { + this.runningJobs.delete(worker); + this.runningRepoIds.delete(runningJob.repositoryId); + } + + // Remove from workers array + const workerIdx = this.workers.indexOf(worker); + if (workerIdx !== -1) { + this.workers.splice(workerIdx, 1); + } + + // Spawn replacement worker if not shutting down and we haven't reached target + if (!this.shuttingDown && this.workers.length < this.options.concurrency) { + const newWorker = this.spawnParseWorker(); + this.workers.push(newWorker); + this.idleWorkers.push(newWorker); + this.dispatch(); + } + } + + private onEmbedWorkerMessage(msg: EmbedWorkerResponse): void { + if (msg.type === 'ready') { + this.embedReady = true; + // Process any queued embed requests + this.processEmbedQueue(); + } else if (msg.type === 'embed-progress') { + // Progress message - could be tracked but not strictly required + } else if (msg.type === 'embed-done') { + this.options.onEmbedDone(msg.jobId); + } else if (msg.type === 'embed-failed') { + this.options.onEmbedFailed(msg.jobId, msg.error); + } + } + + private processEmbedQueue(): void { + if (!this.embedWorker || !this.embedReady) { + return; + } + + while (this.embedQueue.length > 0) { + const job = this.embedQueue.shift(); + if (job) { + const msg: EmbedWorkerRequest = { + type: 'embed', + jobId: job.jobId, + repositoryId: job.repositoryId, + versionId: job.versionId + }; + this.embedWorker.postMessage(msg); + } + } + } + + public enqueueEmbed(jobId: string, repositoryId: string, versionId: string | null): void { + if (!this.embedWorker) { + return; // no-op if embedding not configured + } + + if (this.embedReady) { + const msg: EmbedWorkerRequest = { + type: 'embed', + jobId, + repositoryId, + versionId + }; + this.embedWorker.postMessage(msg); + } else { + this.embedQueue.push({ jobId, repositoryId, versionId }); + } + } + + public setMaxConcurrency(n: number): void { + const current = this.workers.length; + + if (n > current) { + // Spawn additional workers + for (let i = current; i < n; i++) { + const worker = this.spawnParseWorker(); + this.workers.push(worker); + this.idleWorkers.push(worker); + } + } else if (n < current) { + // Shut down excess idle workers + const excess = current - n; + for (let i = 0; i < excess; i++) { + if (this.idleWorkers.length > 0) { + const worker = this.idleWorkers.pop()!; + const workerIdx = this.workers.indexOf(worker); + if (workerIdx !== -1) { + this.workers.splice(workerIdx, 1); + } + const msg: ParseWorkerRequest = { type: 'shutdown' }; + worker.postMessage(msg); + } + } + } + } + + public async shutdown(): Promise { + this.shuttingDown = true; + + const msg: ParseWorkerRequest = { type: 'shutdown' }; + + // Send shutdown to all parse workers + for (const worker of this.workers) { + try { + worker.postMessage(msg); + } catch (e) { + // Worker might already be exited + } + } + + // Send shutdown to embed worker if exists + if (this.embedWorker) { + try { + const embedMsg: EmbedWorkerRequest = { type: 'shutdown' }; + this.embedWorker.postMessage(embedMsg); + } catch (e) { + // Worker might already be exited + } + } + + // Wait for workers to exit with timeout + const timeout = 5000; + const startTime = Date.now(); + + const checkAllExited = (): boolean => { + return this.workers.length === 0 && (!this.embedWorker || !this.embedWorker.threadId); + }; + + while (!checkAllExited() && Date.now() - startTime < timeout) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + // Force kill any remaining workers + for (const worker of this.workers) { + try { + worker.terminate(); + } catch (e) { + // Already terminated + } + } + + if (this.embedWorker) { + try { + this.embedWorker.terminate(); + } catch (e) { + // Already terminated + } + } + + this.workers = []; + this.idleWorkers = []; + this.embedWorker = null; + } + + public get isFallbackMode(): boolean { + return this.fallbackMode; + } +} diff --git a/src/lib/server/pipeline/worker-types.ts b/src/lib/server/pipeline/worker-types.ts new file mode 100644 index 0000000..3696027 --- /dev/null +++ b/src/lib/server/pipeline/worker-types.ts @@ -0,0 +1,25 @@ +import type { IndexingStage } from '$lib/types.js'; + +export type ParseWorkerRequest = + | { type: 'run'; jobId: string } + | { type: 'shutdown' }; + +export type ParseWorkerResponse = + | { type: 'progress'; jobId: string; stage: IndexingStage; stageDetail?: string; progress: number; processedFiles: number; totalFiles: number } + | { type: 'done'; jobId: string } + | { type: 'failed'; jobId: string; error: string }; + +export type EmbedWorkerRequest = + | { type: 'embed'; jobId: string; repositoryId: string; versionId: string | null } + | { type: 'shutdown' }; + +export type EmbedWorkerResponse = + | { type: 'ready' } + | { type: 'embed-progress'; jobId: string; done: number; total: number } + | { type: 'embed-done'; jobId: string } + | { type: 'embed-failed'; jobId: string; error: string }; + +export interface WorkerInitData { + dbPath: string; + embeddingProfileId?: string; +} diff --git a/src/lib/server/services/repository.service.test.ts b/src/lib/server/services/repository.service.test.ts index 4b420bc..c98cb7c 100644 --- a/src/lib/server/services/repository.service.test.ts +++ b/src/lib/server/services/repository.service.test.ts @@ -31,7 +31,9 @@ function createTestDb(): Database.Database { for (const migration of [ '0000_large_master_chief.sql', '0001_quick_nighthawk.sql', - '0002_silky_stellaris.sql' + '0002_silky_stellaris.sql', + '0003_multiversion_config.sql', + '0004_complete_sentry.sql' ]) { const statements = readFileSync(join(migrationsFolder, migration), 'utf-8') .split('--> statement-breakpoint') diff --git a/src/lib/types.ts b/src/lib/types.ts index 236bd01..ae22e46 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -31,6 +31,7 @@ export type RepositorySource = 'github' | 'local'; export type RepositoryState = 'pending' | 'indexing' | 'indexed' | 'error'; export type SnippetType = 'code' | 'info'; export type JobStatus = 'queued' | 'running' | 'done' | 'failed'; +export type IndexingStage = 'queued' | 'differential' | 'crawling' | 'cloning' | 'parsing' | 'storing' | 'embedding' | 'done' | 'failed'; export type VersionState = 'pending' | 'indexing' | 'indexed' | 'error'; export type EmbeddingProviderKind = 'local-transformers' | 'openai-compatible'; diff --git a/src/routes/admin/jobs/+page.svelte b/src/routes/admin/jobs/+page.svelte index e4dbf99..a8dc88a 100644 --- a/src/routes/admin/jobs/+page.svelte +++ b/src/routes/admin/jobs/+page.svelte @@ -110,11 +110,49 @@ } } - // Auto-refresh every 3 seconds + // Auto-refresh with EventSource streaming + fallback polling $effect(() => { fetchJobs(); - const interval = setInterval(fetchJobs, 3000); - return () => clearInterval(interval); + + const es = new EventSource('/api/v1/jobs/stream'); + let fallbackInterval: ReturnType | null = null; + + es.addEventListener('job-progress', (event) => { + const data = JSON.parse(event.data); + jobs = jobs.map((j) => + j.id === data.jobId + ? { + ...j, + progress: data.progress, + stage: data.stage, + stageDetail: data.stageDetail, + processedFiles: data.processedFiles, + totalFiles: data.totalFiles + } + : j + ); + }); + + es.addEventListener('job-done', () => { + void fetchJobs(); + }); + + es.addEventListener('job-failed', () => { + void fetchJobs(); + }); + + es.onerror = () => { + es.close(); + // Fall back to polling on error + fallbackInterval = setInterval(fetchJobs, 3000); + }; + + return () => { + es.close(); + if (fallbackInterval) { + clearInterval(fallbackInterval); + } + }; }); // Format date for display @@ -135,6 +173,23 @@ function canCancel(status: IndexingJobDto['status']): boolean { return status !== 'done' && status !== 'failed'; } + + // Map IndexingStage values to display labels + const stageLabels: Record = { + queued: 'Queued', + differential: 'Diff', + crawling: 'Crawling', + cloning: 'Cloning', + parsing: 'Parsing', + storing: 'Storing', + embedding: 'Embedding', + done: 'Done', + failed: 'Failed' + }; + + function getStageLabel(stage: string | undefined): string { + return stage ? (stageLabels[stage] ?? stage) : '—'; + } @@ -181,6 +236,11 @@ > Status + + Stage + @@ -210,22 +270,30 @@ - -
- {job.progress}% -
-
-
- {#if job.totalFiles > 0} - - {job.processedFiles}/{job.totalFiles} files - - {/if} + +
+ {getStageLabel(job.stage)} + {#if job.stageDetail} + {job.stageDetail} + {/if} +
+ + +
+ {job.progress}% +
+
- + {#if job.totalFiles > 0} + + {job.processedFiles}/{job.totalFiles} files + + {/if} +
+ {formatDate(job.createdAt)} diff --git a/src/routes/api/v1/api-contract.integration.test.ts b/src/routes/api/v1/api-contract.integration.test.ts index 342bed2..64850cc 100644 --- a/src/routes/api/v1/api-contract.integration.test.ts +++ b/src/routes/api/v1/api-contract.integration.test.ts @@ -56,6 +56,7 @@ function createTestDb(): Database.Database { const migration1 = readFileSync(join(migrationsFolder, '0001_quick_nighthawk.sql'), 'utf-8'); const migration2 = readFileSync(join(migrationsFolder, '0002_silky_stellaris.sql'), 'utf-8'); const migration3 = readFileSync(join(migrationsFolder, '0003_multiversion_config.sql'), 'utf-8'); + const migration4 = readFileSync(join(migrationsFolder, '0004_complete_sentry.sql'), 'utf-8'); // Apply first migration const statements0 = migration0 @@ -95,6 +96,15 @@ function createTestDb(): Database.Database { client.exec(statement); } + const statements4 = migration4 + .split('--> statement-breakpoint') + .map((statement) => statement.trim()) + .filter(Boolean); + + for (const statement of statements4) { + client.exec(statement); + } + client.exec(readFileSync(ftsFile, 'utf-8')); return client; diff --git a/src/routes/api/v1/jobs/[id]/stream/+server.ts b/src/routes/api/v1/jobs/[id]/stream/+server.ts new file mode 100644 index 0000000..d1a2861 --- /dev/null +++ b/src/routes/api/v1/jobs/[id]/stream/+server.ts @@ -0,0 +1,115 @@ +/** + * GET /api/v1/jobs/:id/stream — stream real-time job progress via SSE. + * + * Headers: + * Last-Event-ID (optional) — triggers replay of last cached event + */ + +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { JobQueue } from '$lib/server/pipeline/job-queue.js'; +import { getBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js'; +import { handleServiceError } from '$lib/server/utils/validation.js'; + +export const GET: RequestHandler = ({ params, request }) => { + try { + const db = getClient(); + const queue = new JobQueue(db); + const jobId = params.id; + + // Get the job from the queue + const job = queue.getJob(jobId); + if (!job) { + return new Response('Not found', { status: 404 }); + } + + // Get broadcaster + const broadcaster = getBroadcaster(); + if (!broadcaster) { + return new Response('Service unavailable', { status: 503 }); + } + + // Create a new readable stream for SSE + const stream = new ReadableStream({ + async start(controller) { + try { + // Send initial job state as first event + const initialData = { + jobId, + stage: job.stage, + stageDetail: job.stageDetail, + progress: job.progress, + processedFiles: job.processedFiles, + totalFiles: job.totalFiles, + status: job.status, + error: job.error + }; + controller.enqueue(`data: ${JSON.stringify(initialData)}\n\n`); + + // Check for Last-Event-ID header for reconnect + const lastEventId = request.headers.get('Last-Event-ID'); + if (lastEventId) { + const lastEvent = broadcaster.getLastEvent(jobId); + if (lastEvent && lastEvent.id >= parseInt(lastEventId, 10)) { + controller.enqueue(`id: ${lastEvent.id}\nevent: ${lastEvent.event}\ndata: ${lastEvent.data}\n\n`); + } + } + + // Check if job is already done or failed - close immediately after first event + if (job.status === 'done' || job.status === 'failed') { + controller.close(); + return; + } + + // Subscribe to broadcaster for live events + const eventStream = broadcaster.subscribe(jobId); + const reader = eventStream.getReader(); + + // Pipe broadcaster events to the response + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + controller.enqueue(value); + + // Check if the incoming event indicates job completion + if (value.includes('event: done') || value.includes('event: failed')) { + controller.close(); + break; + } + } + } finally { + reader.releaseLock(); + controller.close(); + } + } catch (err) { + console.error('SSE stream error:', err); + controller.close(); + } + } + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', + 'Access-Control-Allow-Origin': '*' + } + }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization, Last-Event-ID' + } + }); +}; diff --git a/src/routes/api/v1/jobs/stream/+server.ts b/src/routes/api/v1/jobs/stream/+server.ts new file mode 100644 index 0000000..edc9d64 --- /dev/null +++ b/src/routes/api/v1/jobs/stream/+server.ts @@ -0,0 +1,52 @@ +/** + * GET /api/v1/jobs/stream — stream real-time job progress for all jobs or a specific repository via SSE. + * + * Query parameters: + * repositoryId (optional) — filter to jobs for this repository + */ + +import type { RequestHandler } from './$types'; +import { getBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js'; +import { handleServiceError } from '$lib/server/utils/validation.js'; + +export const GET: RequestHandler = ({ url }) => { + try { + const broadcaster = getBroadcaster(); + if (!broadcaster) { + return new Response('Service unavailable', { status: 503 }); + } + + const repositoryId = url.searchParams.get('repositoryId'); + + // Get the appropriate stream based on parameters + let stream; + if (repositoryId) { + stream = broadcaster.subscribeRepository(repositoryId); + } else { + stream = broadcaster.subscribeAll(); + } + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', + 'Access-Control-Allow-Origin': '*' + } + }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization' + } + }); +}; diff --git a/src/routes/api/v1/settings/indexing/+server.ts b/src/routes/api/v1/settings/indexing/+server.ts new file mode 100644 index 0000000..357154c --- /dev/null +++ b/src/routes/api/v1/settings/indexing/+server.ts @@ -0,0 +1,99 @@ +import { json } from '@sveltejs/kit'; +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { getPool } from '$lib/server/pipeline/startup.js'; +import os from 'node:os'; + +/** + * GET /api/v1/settings/indexing — retrieve indexing concurrency setting + * PUT /api/v1/settings/indexing — update indexing concurrency setting + * OPTIONS /api/v1/settings/indexing — CORS preflight + */ + +// --------------------------------------------------------------------------- +// GET — Return current indexing concurrency +// --------------------------------------------------------------------------- + +export const GET: RequestHandler = () => { + try { + const db = getClient(); + const row = db + .prepare<[], { value: string }>( + "SELECT value FROM settings WHERE key = 'indexing.concurrency'" + ) + .get(); + + let concurrency = 2; + if (row && row.value) { + try { + const parsed = JSON.parse(row.value); + if (typeof parsed === 'object' && parsed !== null && typeof parsed.value === 'number') { + concurrency = parsed.value; + } else if (typeof parsed === 'number') { + concurrency = parsed; + } + } catch { + concurrency = 2; + } + } + + return json({ concurrency }); + } catch (err) { + console.error('GET /api/v1/settings/indexing error:', err); + return json({ error: 'Failed to read indexing settings' }, { status: 500 }); + } +}; + +// --------------------------------------------------------------------------- +// PUT — Update indexing concurrency +// --------------------------------------------------------------------------- + +export const PUT: RequestHandler = async ({ request }) => { + try { + const body = await request.json(); + + // Validate and clamp concurrency + const maxConcurrency = Math.max(os.cpus().length - 1, 1); + const concurrency = Math.max(1, Math.min(parseInt(String(body.concurrency ?? 2), 10), maxConcurrency)); + + if (isNaN(concurrency)) { + return json( + { error: 'Concurrency must be a valid integer' }, + { status: 400 } + ); + } + + const db = getClient(); + + // Write to settings table + db.prepare( + "INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES ('indexing.concurrency', ?, unixepoch())" + ).run(JSON.stringify({ value: concurrency })); + + // Update worker pool if available + getPool()?.setMaxConcurrency(concurrency); + + return json({ concurrency }); + } catch (err) { + console.error('PUT /api/v1/settings/indexing error:', err); + return json( + { error: err instanceof Error ? err.message : 'Failed to update indexing settings' }, + { status: 500 } + ); + } +}; + +// --------------------------------------------------------------------------- +// OPTIONS — CORS preflight +// --------------------------------------------------------------------------- + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 200, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, PUT, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type' + } + }); +}; diff --git a/src/routes/repos/[id]/+page.svelte b/src/routes/repos/[id]/+page.svelte index e54b04e..83b54cb 100644 --- a/src/routes/repos/[id]/+page.svelte +++ b/src/routes/repos/[id]/+page.svelte @@ -75,6 +75,18 @@ error: 'Error' }; + const stageLabels: Record = { + queued: 'Queued', + differential: 'Diff', + crawling: 'Crawling', + cloning: 'Cloning', + parsing: 'Parsing', + storing: 'Storing', + embedding: 'Embedding', + done: 'Done', + failed: 'Failed' + }; + async function refreshRepo() { try { const res = await fetch(`/api/v1/libs/${encodeURIComponent(repo.id)}`); @@ -105,63 +117,78 @@ loadVersions(); }); - // Single shared poller — one interval regardless of how many tags are active. - // This replaces the N per-version components that each had - // their own setInterval, which caused ERR_INSUFFICIENT_RESOURCES and UI lockup - // when hundreds of tags were queued simultaneously. + // Single shared poller replaced with EventSource SSE stream $effect(() => { - const activeIds = new Set( - Object.values(activeVersionJobs).filter((id): id is string => !!id) - ); - if (activeIds.size === 0) { - versionJobProgress = {}; - return; - } + if (!repo.id) return; let stopped = false; + const es = new EventSource( + `/api/v1/jobs/stream?repositoryId=${encodeURIComponent(repo.id)}` + ); - async function poll() { + es.addEventListener('job-progress', (event) => { if (stopped) return; try { - const res = await fetch( - `/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000` - ); - if (!res.ok || stopped) return; - const d = await res.json(); - - // Build a jobId → job lookup from the response. - const map: Record = {}; - for (const job of (d.jobs ?? []) as IndexingJob[]) { - map[job.id] = job; - } - if (!stopped) versionJobProgress = map; - - // Retire completed jobs and trigger a single refresh. - let anyCompleted = false; - const nextJobs = { ...activeVersionJobs }; - for (const [tag, jobId] of Object.entries(activeVersionJobs)) { - if (!jobId) continue; - const job = map[jobId]; - if (job?.status === 'done' || job?.status === 'failed') { - delete nextJobs[tag]; - anyCompleted = true; - } - } - if (anyCompleted && !stopped) { - activeVersionJobs = nextJobs; - void loadVersions(); - void refreshRepo(); - } + const data = JSON.parse(event.data) as IndexingJob; + versionJobProgress = { ...versionJobProgress, [data.id]: data }; } catch { - // ignore transient errors + // ignore parse errors } - } + }); + + es.addEventListener('job-done', (event) => { + if (stopped) return; + try { + const data = JSON.parse(event.data) as IndexingJob; + const next = { ...versionJobProgress }; + delete next[data.id]; + versionJobProgress = next; + void loadVersions(); + void refreshRepo(); + } catch { + // ignore parse errors + } + }); + + es.addEventListener('job-failed', (event) => { + if (stopped) return; + try { + const data = JSON.parse(event.data) as IndexingJob; + const next = { ...versionJobProgress }; + delete next[data.id]; + versionJobProgress = next; + void loadVersions(); + void refreshRepo(); + } catch { + // ignore parse errors + } + }); + + es.onerror = () => { + if (stopped) return; + es.close(); + // Fall back to a single fetch for resilience + (async () => { + try { + const res = await fetch( + `/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000` + ); + if (!res.ok || stopped) return; + const d = await res.json(); + const map: Record = {}; + for (const job of (d.jobs ?? []) as IndexingJob[]) { + map[job.id] = job; + } + if (!stopped) versionJobProgress = map; + } catch { + // ignore errors + } + })(); + }; - void poll(); - const interval = setInterval(poll, 2000); return () => { stopped = true; - clearInterval(interval); + es.close(); }; }); @@ -620,7 +647,10 @@ {@const job = versionJobProgress[activeVersionJobs[version.tag]!]}
- {(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files + + {#if job?.stageDetail}{job.stageDetail}{:else}{(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files{/if} + {#if job?.stage}{' - ' + stageLabels[job.stage] ?? job.stage}{/if} + {job?.progress ?? 0}%
diff --git a/src/routes/settings/+page.server.ts b/src/routes/settings/+page.server.ts index 9b1f617..403faeb 100644 --- a/src/routes/settings/+page.server.ts +++ b/src/routes/settings/+page.server.ts @@ -5,7 +5,9 @@ import { EmbeddingSettingsDtoMapper } from '$lib/server/mappers/embedding-settin import { EmbeddingSettingsService } from '$lib/server/services/embedding-settings.service.js'; export const load: PageServerLoad = async () => { - const service = new EmbeddingSettingsService(getClient()); + const db = getClient(); + + const service = new EmbeddingSettingsService(db); const settings = EmbeddingSettingsDtoMapper.toDto(service.getSettings()); let localProviderAvailable = false; @@ -15,8 +17,30 @@ export const load: PageServerLoad = async () => { localProviderAvailable = false; } + // Read indexing concurrency setting + let indexingConcurrency = 2; + const concurrencyRow = db + .prepare<[], { value: string }>( + "SELECT value FROM settings WHERE key = 'indexing.concurrency'" + ) + .get(); + + if (concurrencyRow && concurrencyRow.value) { + try { + const parsed = JSON.parse(concurrencyRow.value); + if (typeof parsed === 'object' && parsed !== null && typeof parsed.value === 'number') { + indexingConcurrency = parsed.value; + } else if (typeof parsed === 'number') { + indexingConcurrency = parsed; + } + } catch { + indexingConcurrency = 2; + } + } + return { settings, - localProviderAvailable + localProviderAvailable, + indexingConcurrency }; }; \ No newline at end of file diff --git a/src/routes/settings/+page.svelte b/src/routes/settings/+page.svelte index 9790bfb..4aa9627 100644 --- a/src/routes/settings/+page.svelte +++ b/src/routes/settings/+page.svelte @@ -66,12 +66,23 @@ let saveError = $state(null); let saveStatusTimer: ReturnType | null = null; + let concurrencyInput = $state(0); + let concurrencySaving = $state(false); + let concurrencySaveStatus = $state<'idle' | 'ok' | 'error'>('idle'); + let concurrencySaveError = $state(null); + let concurrencySaveStatusTimer: ReturnType | null = null; + + $effect(() => { + concurrencyInput = data.indexingConcurrency; + }); + const currentSettings = $derived(settingsOverride ?? data.settings); const activeProfile = $derived(currentSettings.activeProfile); const activeConfigEntries = $derived(activeProfile?.configEntries ?? []); onDestroy(() => { if (saveStatusTimer) clearTimeout(saveStatusTimer); + if (concurrencySaveStatusTimer) clearTimeout(concurrencySaveStatusTimer); }); // --------------------------------------------------------------------------- @@ -159,6 +170,38 @@ void save(); } + async function saveConcurrency() { + concurrencySaving = true; + concurrencySaveStatus = 'idle'; + concurrencySaveError = null; + try { + const res = await fetch('/api/v1/settings/indexing', { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ concurrency: concurrencyInput }) + }); + if (res.ok) { + const updated = await res.json(); + concurrencyInput = updated.concurrency; + concurrencySaveStatus = 'ok'; + if (concurrencySaveStatusTimer) clearTimeout(concurrencySaveStatusTimer); + concurrencySaveStatusTimer = setTimeout(() => { + concurrencySaveStatus = 'idle'; + concurrencySaveStatusTimer = null; + }, 3000); + } else { + const data = await res.json(); + concurrencySaveStatus = 'error'; + concurrencySaveError = data.error ?? 'Save failed'; + } + } catch (e) { + concurrencySaveStatus = 'error'; + concurrencySaveError = (e as Error).message; + } finally { + concurrencySaving = false; + } + } + function getOpenAiProfile(settings: EmbeddingSettingsDto): EmbeddingProfileDto | null { return settings.profiles.find((profile) => profile.providerKind === 'openai-compatible') ?? null; } @@ -482,6 +525,45 @@
{/if} + +
+
+ +

+ Number of parallel indexing workers. Range: 1 to 8. +

+
+ +
+ + + + {#if concurrencySaveStatus === 'ok'} + ✓ Saved + {:else if concurrencySaveStatus === 'error'} + {concurrencySaveError} + {/if} +
+
+ {#if saveStatus === 'ok'}