diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index d32ec18..323f3ee 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -1,15 +1,16 @@ # Architecture -Last Updated: 2026-03-27T00:24:13.000Z +Last Updated: 2026-03-30T00:00:00.000Z ## Overview -TrueRef is a TypeScript-first, self-hosted documentation retrieval platform built on SvelteKit. The repository contains a Node-targeted web application, a REST API, a Model Context Protocol server, and a server-side indexing pipeline backed by SQLite via better-sqlite3 and Drizzle ORM. +TrueRef is a TypeScript-first, self-hosted documentation retrieval platform built on SvelteKit. The repository contains a Node-targeted web application, a REST API, a Model Context Protocol server, and a multi-threaded server-side indexing pipeline backed by SQLite via better-sqlite3 and Drizzle ORM. -- Primary language: TypeScript (110 files) with a small amount of JavaScript configuration (2 files) -- Application type: Full-stack SvelteKit application with server-side indexing and retrieval services +- Primary language: TypeScript (141 files) with a small amount of JavaScript configuration (2 files) +- Application type: Full-stack SvelteKit application with worker-threaded indexing and retrieval services - Runtime framework: SvelteKit with adapter-node -- Storage: SQLite with Drizzle-managed schema plus hand-written FTS5 setup +- Storage: SQLite (WAL mode) with Drizzle-managed schema plus hand-written FTS5 setup +- Concurrency: Node.js worker_threads for parse and embedding work - Testing: Vitest with separate client and server projects ## Project Structure @@ -25,7 +26,7 @@ TrueRef is a TypeScript-first, self-hosted documentation retrieval platform buil ### src/routes -Contains the UI entry points and API routes. The API tree under src/routes/api/v1 is the public HTTP contract for repository management, indexing jobs, search/context retrieval, settings, filesystem browsing, and JSON schema discovery. +Contains the UI entry points and API routes. The API tree under src/routes/api/v1 is the public HTTP contract for repository management, indexing jobs, search/context retrieval, settings, filesystem browsing, JSON schema discovery, real-time SSE progress streaming, and job control (pause/resume/cancel). ### src/lib/server/db @@ -33,7 +34,15 @@ Owns SQLite schema definitions, migration bootstrapping, and FTS initialization. ### src/lib/server/pipeline -Coordinates crawl, parse, chunk, store, and optional embedding generation work. Startup recovery marks stale jobs as failed, resets repositories stuck in indexing state, initializes singleton queue/pipeline instances, and drains queued work after restart. +Coordinates crawl, parse, chunk, store, and optional embedding generation work using a worker thread pool. The pipeline module consists of: + +- **WorkerPool** (`worker-pool.ts`): Manages a configurable number of Node.js `worker_threads` for parse jobs and an optional dedicated embed worker. Dispatches jobs round-robin to idle workers, enforces per-repository serialisation (one active job per repo), auto-respawns crashed workers, and supports runtime concurrency adjustment via `setMaxConcurrency()`. Falls back to main-thread execution when worker scripts are not found. +- **Parse worker** (`worker-entry.ts`): Runs in a worker thread. Opens its own `better-sqlite3` connection (WAL mode, `busy_timeout = 5000`), constructs a local `IndexingPipeline` instance, and processes jobs by posting `progress`, `done`, or `failed` messages back to the parent. +- **Embed worker** (`embed-worker-entry.ts`): Dedicated worker for embedding generation. Loads the embedding profile from the database, creates an `EmbeddingService`, and processes embed requests after the parse worker finishes a job. +- **ProgressBroadcaster** (`progress-broadcaster.ts`): Server-side pub/sub for real-time SSE streaming. Supports per-job, per-repository, and global subscriptions. Caches the last event per job for reconnect support. +- **Worker types** (`worker-types.ts`): Shared TypeScript discriminated union types for `ParseWorkerRequest`/`ParseWorkerResponse` and `EmbedWorkerRequest`/`EmbedWorkerResponse` message protocols. +- **Startup** (`startup.ts`): Recovers stale jobs, constructs singleton `JobQueue`, `IndexingPipeline`, `WorkerPool`, and `ProgressBroadcaster` instances, reads concurrency settings from the database, and drains queued work after restart. +- **JobQueue** (`job-queue.ts`): SQLite-backed queue that delegates to the `WorkerPool` when available, with pause/resume/cancel support. ### src/lib/server/search @@ -49,16 +58,18 @@ Provides a thin compatibility layer over the HTTP API. The MCP server exposes re ## Design Patterns -- No explicit design patterns detected from semantic analysis. -- The implementation does consistently use service classes such as RepositoryService, SearchService, and HybridSearchService for business logic. -- Mapping and entity layers separate raw database rows from domain objects through mapper/entity pairs such as RepositoryMapper and RepositoryEntity. -- Pipeline startup uses module-level singleton state for JobQueue and IndexingPipeline lifecycle management. +- The WorkerPool implements an **observer/callback pattern**: the pool owner provides `onProgress`, `onJobDone`, `onJobFailed`, `onEmbedDone`, and `onEmbedFailed` callbacks at construction time, and the pool invokes them when workers post messages. +- ProgressBroadcaster implements a **pub/sub pattern** with three subscription tiers (per-job, per-repository, global) and last-event caching for SSE reconnect. +- The implementation consistently uses **service classes** such as RepositoryService, SearchService, and HybridSearchService for business logic. +- Mapping and entity layers separate raw database rows from domain objects through **mapper/entity pairs** such as RepositoryMapper and RepositoryEntity. +- Pipeline startup uses **module-level singletons** for JobQueue, IndexingPipeline, WorkerPool, and ProgressBroadcaster lifecycle management, with accessor functions (getQueue, getPool, getBroadcaster) for route handlers. +- Worker message protocols use **TypeScript discriminated unions** (`type` field) for type-safe worker ↔ parent communication. ## Key Components ### SvelteKit server bootstrap -src/hooks.server.ts initializes the database, loads persisted embedding configuration, creates the optional EmbeddingService, starts the indexing pipeline, and applies CORS headers to all /api routes. +src/hooks.server.ts initializes the database, loads persisted embedding configuration, creates the optional EmbeddingService, reads indexing concurrency settings from the database, starts the indexing pipeline with WorkerPool and ProgressBroadcaster via `initializePipeline(db, embeddingService, { concurrency, dbPath })`, and applies CORS headers to all /api routes. ### Database layer @@ -80,6 +91,22 @@ src/lib/server/services/repository.service.ts provides CRUD and statistics for i src/mcp/index.ts creates the MCP server, registers the two supported tools, and exposes them over stdio or streamable HTTP. +### Worker thread pool + +src/lib/server/pipeline/worker-pool.ts manages a pool of Node.js worker threads. Parse workers run the full crawl → parse → store pipeline inside isolated threads with their own better-sqlite3 connections (WAL mode enables concurrent readers). An optional embed worker handles embedding generation in a separate thread. The pool enforces per-repository serialisation, auto-respawns crashed workers, and supports runtime concurrency changes persisted through the settings table. + +### SSE streaming + +src/lib/server/pipeline/progress-broadcaster.ts provides real-time Server-Sent Event streaming of indexing progress. Route handlers in src/routes/api/v1/jobs/stream and src/routes/api/v1/jobs/[id]/stream expose SSE endpoints. The broadcaster supports per-job, per-repository, and global subscriptions, with last-event caching for reconnect via the `Last-Event-ID` header. + +### Job control + +src/routes/api/v1/jobs/[id]/pause, resume, and cancel endpoints allow runtime control of indexing jobs. The JobQueue supports pause/resume/cancel state transitions persisted to SQLite. + +### Indexing settings + +src/routes/api/v1/settings/indexing exposes GET and PUT for indexing concurrency. PUT validates and clamps the value to `max(cpus - 1, 1)`, persists it to the settings table, and live-updates the WorkerPool via `setMaxConcurrency()`. + ## Dependencies ### Production @@ -93,6 +120,7 @@ src/mcp/index.ts creates the MCP server, registers the two supported tools, and - @sveltejs/kit and @sveltejs/adapter-node: application framework and Node deployment target - drizzle-kit and drizzle-orm: schema management and typed database access +- esbuild: worker thread entry point bundling (build/workers/) - vite and @tailwindcss/vite: bundling and Tailwind integration - vitest and @vitest/browser-playwright: server and browser test execution - eslint, typescript-eslint, eslint-plugin-svelte, prettier, prettier-plugin-svelte, prettier-plugin-tailwindcss: linting and formatting @@ -116,12 +144,13 @@ The frontend and backend share the same SvelteKit repository, but most non-UI be ### Indexing flow -1. Server startup runs initializeDatabase() and initializePipeline() from src/hooks.server.ts. -2. The pipeline recovers stale jobs, initializes crawler/parser infrastructure, and resumes queued work. -3. Crawlers ingest GitHub or local repository contents. -4. Parsers split files into document and snippet records with token counts and metadata. -5. Database modules persist repositories, documents, snippets, versions, configs, and job state. -6. If an embedding provider is configured, embedding services generate vectors for snippet search. +1. Server startup runs initializeDatabase() and initializePipeline() from src/hooks.server.ts, which creates the WorkerPool, ProgressBroadcaster, and JobQueue singletons. +2. The pipeline recovers stale jobs (marks running → failed, indexing → error), reads concurrency settings, and resumes queued work. +3. When a job is enqueued, the JobQueue delegates to the WorkerPool, which dispatches work to an idle parse worker thread. +4. Each parse worker opens its own better-sqlite3 connection (WAL mode) and runs the full crawl → parse → store pipeline, posting progress messages back to the parent thread. +5. The parent thread updates job progress in the database and broadcasts SSE events through the ProgressBroadcaster. +6. On parse completion, if an embedding provider is configured, the WorkerPool enqueues an embed request to the dedicated embed worker, which generates vectors in its own thread. +7. Job control endpoints allow pausing, resuming, or cancelling jobs at runtime. ### Retrieval flow @@ -135,7 +164,8 @@ The frontend and backend share the same SvelteKit repository, but most non-UI be ## Build System -- Build command: npm run build +- Build command: npm run build (runs `vite build` then `node scripts/build-workers.mjs`) +- Worker bundling: scripts/build-workers.mjs uses esbuild to compile worker-entry.ts and embed-worker-entry.ts into build/workers/ as ESM bundles (.mjs), with $lib path aliases resolved and better-sqlite3/@xenova/transformers marked external - Test command: npm run test - Primary local run command from package.json: npm run dev - MCP entry points: npm run mcp:start and npm run mcp:http diff --git a/docs/FINDINGS.md b/docs/FINDINGS.md index 13892fd..788a99d 100644 --- a/docs/FINDINGS.md +++ b/docs/FINDINGS.md @@ -1,25 +1,29 @@ # Findings -Last Updated: 2026-03-27T00:24:13.000Z +Last Updated: 2026-03-30T00:00:00.000Z ## Initializer Summary -- JIRA: FEEDBACK-0001 +- JIRA: TRUEREF-0022 - Refresh mode: REFRESH_IF_REQUIRED -- Result: refreshed affected documentation only. ARCHITECTURE.md and FINDINGS.md were updated from current repository analysis; CODE_STYLE.md remained trusted and unchanged because the documented conventions still match the codebase. +- Result: Refreshed ARCHITECTURE.md and FINDINGS.md. CODE_STYLE.md remained trusted — new worker thread code follows established conventions. ## Research Performed -- Discovered source-language distribution, dependency manifest, import patterns, and project structure. -- Read the retrieval, formatter, token-budget, parser, mapper, and response-model modules affected by the latest implementation changes. -- Compared the trusted cache state with current behavior to identify which documentation files were actually stale. -- Confirmed package scripts for build and test. -- Confirmed Linux-native md5sum availability for documentation trust metadata. +- Discovered 141 TypeScript/JavaScript source files (up from 110), with new pipeline worker, broadcaster, and SSE endpoint files. +- Read worker-pool.ts, worker-entry.ts, embed-worker-entry.ts, worker-types.ts, progress-broadcaster.ts, startup.ts, job-queue.ts to understand the new worker thread architecture. +- Read SSE endpoints (jobs/stream, jobs/[id]/stream) and job control endpoints (pause, resume, cancel). +- Read indexing settings endpoint and hooks.server.ts to verify startup wiring changes. +- Read build-workers.mjs and package.json to verify build system and dependency changes. +- Compared trusted cache state with current codebase to identify ARCHITECTURE.md as stale. +- Confirmed CODE_STYLE.md conventions still match the codebase — new code uses PascalCase classes, camelCase functions, tab indentation, ESM imports, and TypeScript discriminated unions consistent with existing style. ## Open Questions For Planner - Verify whether the retrieval response contract should document the new repository and version metadata fields formally in a public API reference beyond the architecture summary. - Verify whether parser chunking should evolve further from file-level and declaration-level boundaries to member-level semantic chunks for class-heavy codebases. +- Verify whether the SSE streaming contract (event names, data shapes) should be documented in a dedicated API reference for external consumers. +- Assess whether the WorkerPool fallback mode (main-thread execution when worker scripts are missing) needs explicit test coverage or should be removed in favour of a hard build requirement. ## Planner Notes Template diff --git a/package.json b/package.json index 3c0e331..3b7a3a1 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "type": "module", "scripts": { "dev": "vite dev", - "build": "vite build", + "build": "vite build && node scripts/build-workers.mjs", "preview": "vite preview", "prepare": "svelte-kit sync || echo ''", "check": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json", @@ -34,6 +34,7 @@ "@vitest/browser-playwright": "^4.1.0", "drizzle-kit": "^0.31.8", "drizzle-orm": "^0.45.1", + "esbuild": "^0.24.0", "eslint": "^9.39.2", "eslint-config-prettier": "^10.1.8", "eslint-plugin-svelte": "^3.14.0", diff --git a/scripts/build-workers.mjs b/scripts/build-workers.mjs new file mode 100644 index 0000000..331e2ad --- /dev/null +++ b/scripts/build-workers.mjs @@ -0,0 +1,38 @@ +import * as esbuild from 'esbuild'; +import { existsSync } from 'node:fs'; + +const entries = [ + 'src/lib/server/pipeline/worker-entry.ts', + 'src/lib/server/pipeline/embed-worker-entry.ts' +]; + +try { + const existing = entries.filter(e => existsSync(e)); + if (existing.length === 0) { + console.log('[build-workers] No worker entry files found yet, skipping.'); + process.exit(0); + } + + await esbuild.build({ + entryPoints: existing, + bundle: true, + platform: 'node', + target: 'node20', + format: 'esm', + outdir: 'build/workers', + outExtension: { '.js': '.mjs' }, + alias: { + '$lib': './src/lib', + '$lib/server': './src/lib/server' + }, + external: ['better-sqlite3', '@xenova/transformers'], + banner: { + js: "import { createRequire } from 'module'; const require = createRequire(import.meta.url);" + } + }); + + console.log(`[build-workers] Compiled ${existing.length} worker(s) to build/workers/`); +} catch (err) { + console.error('[build-workers] Error:', err); + process.exit(1); +} diff --git a/src/hooks.server.ts b/src/hooks.server.ts index 1d25e06..4c43ea0 100644 --- a/src/hooks.server.ts +++ b/src/hooks.server.ts @@ -16,6 +16,7 @@ import { type EmbeddingProfileEntityProps } from '$lib/server/models/embedding-profile.js'; import { EmbeddingProfileMapper } from '$lib/server/mappers/embedding-profile.mapper.js'; +import { env } from '$env/dynamic/private'; import type { Handle } from '@sveltejs/kit'; // --------------------------------------------------------------------------- @@ -47,7 +48,29 @@ try { embeddingService = new EmbeddingService(db, provider, activeProfile.id); } - initializePipeline(db, embeddingService); + // Read database path from environment + const dbPath = env.DATABASE_URL; + + // Read indexing concurrency setting from database + let concurrency = 2; // default + if (dbPath) { + const concurrencyRow = db + .prepare<[], { value: string }>( + "SELECT value FROM settings WHERE key = 'indexing.concurrency' LIMIT 1" + ) + .get(); + if (concurrencyRow) { + try { + const parsed = JSON.parse(concurrencyRow.value); + concurrency = parsed.value ?? 2; + } catch { + // If parsing fails, use default + concurrency = 2; + } + } + } + + initializePipeline(db, embeddingService, { concurrency, dbPath }); console.log('[hooks.server] Indexing pipeline initialised.'); } catch (err) { console.error( diff --git a/src/lib/components/RepositoryCard.svelte b/src/lib/components/RepositoryCard.svelte index 8cbdb84..698fa78 100644 --- a/src/lib/components/RepositoryCard.svelte +++ b/src/lib/components/RepositoryCard.svelte @@ -1,5 +1,5 @@ @@ -181,6 +236,11 @@ > Status + + Stage + @@ -210,22 +270,30 @@ - -
- {job.progress}% -
-
-
- {#if job.totalFiles > 0} - - {job.processedFiles}/{job.totalFiles} files - - {/if} + +
+ {getStageLabel(job.stage)} + {#if job.stageDetail} + {job.stageDetail} + {/if} +
+ + +
+ {job.progress}% +
+
- + {#if job.totalFiles > 0} + + {job.processedFiles}/{job.totalFiles} files + + {/if} +
+ {formatDate(job.createdAt)} diff --git a/src/routes/api/v1/api-contract.integration.test.ts b/src/routes/api/v1/api-contract.integration.test.ts index 342bed2..63de70c 100644 --- a/src/routes/api/v1/api-contract.integration.test.ts +++ b/src/routes/api/v1/api-contract.integration.test.ts @@ -56,6 +56,7 @@ function createTestDb(): Database.Database { const migration1 = readFileSync(join(migrationsFolder, '0001_quick_nighthawk.sql'), 'utf-8'); const migration2 = readFileSync(join(migrationsFolder, '0002_silky_stellaris.sql'), 'utf-8'); const migration3 = readFileSync(join(migrationsFolder, '0003_multiversion_config.sql'), 'utf-8'); + const migration4 = readFileSync(join(migrationsFolder, '0004_complete_sentry.sql'), 'utf-8'); // Apply first migration const statements0 = migration0 @@ -95,6 +96,15 @@ function createTestDb(): Database.Database { client.exec(statement); } + const statements4 = migration4 + .split('--> statement-breakpoint') + .map((statement) => statement.trim()) + .filter(Boolean); + + for (const statement of statements4) { + client.exec(statement); + } + client.exec(readFileSync(ftsFile, 'utf-8')); return client; @@ -207,15 +217,6 @@ function seedEmbedding(client: Database.Database, snippetId: string, values: num .run(snippetId, values.length, Buffer.from(Float32Array.from(values).buffer), NOW_S); } -function seedRules(client: Database.Database, repositoryId: string, rules: string[]) { - client - .prepare( - `INSERT INTO repository_configs (repository_id, rules, updated_at) - VALUES (?, ?, ?)` - ) - .run(repositoryId, JSON.stringify(rules), NOW_S); -} - describe('API contract integration', () => { beforeEach(() => { db = createTestDb(); diff --git a/src/routes/api/v1/jobs/[id]/stream/+server.ts b/src/routes/api/v1/jobs/[id]/stream/+server.ts new file mode 100644 index 0000000..d1a2861 --- /dev/null +++ b/src/routes/api/v1/jobs/[id]/stream/+server.ts @@ -0,0 +1,115 @@ +/** + * GET /api/v1/jobs/:id/stream — stream real-time job progress via SSE. + * + * Headers: + * Last-Event-ID (optional) — triggers replay of last cached event + */ + +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { JobQueue } from '$lib/server/pipeline/job-queue.js'; +import { getBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js'; +import { handleServiceError } from '$lib/server/utils/validation.js'; + +export const GET: RequestHandler = ({ params, request }) => { + try { + const db = getClient(); + const queue = new JobQueue(db); + const jobId = params.id; + + // Get the job from the queue + const job = queue.getJob(jobId); + if (!job) { + return new Response('Not found', { status: 404 }); + } + + // Get broadcaster + const broadcaster = getBroadcaster(); + if (!broadcaster) { + return new Response('Service unavailable', { status: 503 }); + } + + // Create a new readable stream for SSE + const stream = new ReadableStream({ + async start(controller) { + try { + // Send initial job state as first event + const initialData = { + jobId, + stage: job.stage, + stageDetail: job.stageDetail, + progress: job.progress, + processedFiles: job.processedFiles, + totalFiles: job.totalFiles, + status: job.status, + error: job.error + }; + controller.enqueue(`data: ${JSON.stringify(initialData)}\n\n`); + + // Check for Last-Event-ID header for reconnect + const lastEventId = request.headers.get('Last-Event-ID'); + if (lastEventId) { + const lastEvent = broadcaster.getLastEvent(jobId); + if (lastEvent && lastEvent.id >= parseInt(lastEventId, 10)) { + controller.enqueue(`id: ${lastEvent.id}\nevent: ${lastEvent.event}\ndata: ${lastEvent.data}\n\n`); + } + } + + // Check if job is already done or failed - close immediately after first event + if (job.status === 'done' || job.status === 'failed') { + controller.close(); + return; + } + + // Subscribe to broadcaster for live events + const eventStream = broadcaster.subscribe(jobId); + const reader = eventStream.getReader(); + + // Pipe broadcaster events to the response + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + controller.enqueue(value); + + // Check if the incoming event indicates job completion + if (value.includes('event: done') || value.includes('event: failed')) { + controller.close(); + break; + } + } + } finally { + reader.releaseLock(); + controller.close(); + } + } catch (err) { + console.error('SSE stream error:', err); + controller.close(); + } + } + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', + 'Access-Control-Allow-Origin': '*' + } + }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization, Last-Event-ID' + } + }); +}; diff --git a/src/routes/api/v1/jobs/stream/+server.ts b/src/routes/api/v1/jobs/stream/+server.ts new file mode 100644 index 0000000..edc9d64 --- /dev/null +++ b/src/routes/api/v1/jobs/stream/+server.ts @@ -0,0 +1,52 @@ +/** + * GET /api/v1/jobs/stream — stream real-time job progress for all jobs or a specific repository via SSE. + * + * Query parameters: + * repositoryId (optional) — filter to jobs for this repository + */ + +import type { RequestHandler } from './$types'; +import { getBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js'; +import { handleServiceError } from '$lib/server/utils/validation.js'; + +export const GET: RequestHandler = ({ url }) => { + try { + const broadcaster = getBroadcaster(); + if (!broadcaster) { + return new Response('Service unavailable', { status: 503 }); + } + + const repositoryId = url.searchParams.get('repositoryId'); + + // Get the appropriate stream based on parameters + let stream; + if (repositoryId) { + stream = broadcaster.subscribeRepository(repositoryId); + } else { + stream = broadcaster.subscribeAll(); + } + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', + 'Access-Control-Allow-Origin': '*' + } + }); + } catch (err) { + return handleServiceError(err); + } +}; + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 204, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization' + } + }); +}; diff --git a/src/routes/api/v1/settings/indexing/+server.ts b/src/routes/api/v1/settings/indexing/+server.ts new file mode 100644 index 0000000..357154c --- /dev/null +++ b/src/routes/api/v1/settings/indexing/+server.ts @@ -0,0 +1,99 @@ +import { json } from '@sveltejs/kit'; +import type { RequestHandler } from './$types'; +import { getClient } from '$lib/server/db/client.js'; +import { getPool } from '$lib/server/pipeline/startup.js'; +import os from 'node:os'; + +/** + * GET /api/v1/settings/indexing — retrieve indexing concurrency setting + * PUT /api/v1/settings/indexing — update indexing concurrency setting + * OPTIONS /api/v1/settings/indexing — CORS preflight + */ + +// --------------------------------------------------------------------------- +// GET — Return current indexing concurrency +// --------------------------------------------------------------------------- + +export const GET: RequestHandler = () => { + try { + const db = getClient(); + const row = db + .prepare<[], { value: string }>( + "SELECT value FROM settings WHERE key = 'indexing.concurrency'" + ) + .get(); + + let concurrency = 2; + if (row && row.value) { + try { + const parsed = JSON.parse(row.value); + if (typeof parsed === 'object' && parsed !== null && typeof parsed.value === 'number') { + concurrency = parsed.value; + } else if (typeof parsed === 'number') { + concurrency = parsed; + } + } catch { + concurrency = 2; + } + } + + return json({ concurrency }); + } catch (err) { + console.error('GET /api/v1/settings/indexing error:', err); + return json({ error: 'Failed to read indexing settings' }, { status: 500 }); + } +}; + +// --------------------------------------------------------------------------- +// PUT — Update indexing concurrency +// --------------------------------------------------------------------------- + +export const PUT: RequestHandler = async ({ request }) => { + try { + const body = await request.json(); + + // Validate and clamp concurrency + const maxConcurrency = Math.max(os.cpus().length - 1, 1); + const concurrency = Math.max(1, Math.min(parseInt(String(body.concurrency ?? 2), 10), maxConcurrency)); + + if (isNaN(concurrency)) { + return json( + { error: 'Concurrency must be a valid integer' }, + { status: 400 } + ); + } + + const db = getClient(); + + // Write to settings table + db.prepare( + "INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES ('indexing.concurrency', ?, unixepoch())" + ).run(JSON.stringify({ value: concurrency })); + + // Update worker pool if available + getPool()?.setMaxConcurrency(concurrency); + + return json({ concurrency }); + } catch (err) { + console.error('PUT /api/v1/settings/indexing error:', err); + return json( + { error: err instanceof Error ? err.message : 'Failed to update indexing settings' }, + { status: 500 } + ); + } +}; + +// --------------------------------------------------------------------------- +// OPTIONS — CORS preflight +// --------------------------------------------------------------------------- + +export const OPTIONS: RequestHandler = () => { + return new Response(null, { + status: 200, + headers: { + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, PUT, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type' + } + }); +}; diff --git a/src/routes/api/v1/sse-and-settings.integration.test.ts b/src/routes/api/v1/sse-and-settings.integration.test.ts new file mode 100644 index 0000000..ee6544a --- /dev/null +++ b/src/routes/api/v1/sse-and-settings.integration.test.ts @@ -0,0 +1,513 @@ +/** + * Integration tests for SSE streaming endpoints and the indexing settings API + * (TRUEREF-0022). + * + * Uses the same mock / in-memory DB pattern as api-contract.integration.test.ts. + */ + +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import Database from 'better-sqlite3'; +import { readFileSync } from 'node:fs'; +import { join } from 'node:path'; +import type { ProgressBroadcaster as BroadcasterType } from '$lib/server/pipeline/progress-broadcaster.js'; + +// --------------------------------------------------------------------------- +// Module-level mocks (must be hoisted to the top of the file) +// --------------------------------------------------------------------------- + +let db: Database.Database; +// Closed over by the vi.mock factory below. +let mockBroadcaster: BroadcasterType | null = null; + +vi.mock('$lib/server/db/client', () => ({ + getClient: () => db +})); + +vi.mock('$lib/server/db/client.js', () => ({ + getClient: () => db +})); + +vi.mock('$lib/server/pipeline/startup', () => ({ + getQueue: () => null, + getPool: () => null +})); + +vi.mock('$lib/server/pipeline/startup.js', () => ({ + getQueue: () => null, + getPool: () => null +})); + +vi.mock('$lib/server/pipeline/progress-broadcaster', async (importOriginal) => { + const original = await importOriginal(); + return { + ...original, + getBroadcaster: () => mockBroadcaster + }; +}); + +vi.mock('$lib/server/pipeline/progress-broadcaster.js', async (importOriginal) => { + const original = await importOriginal(); + return { + ...original, + getBroadcaster: () => mockBroadcaster + }; +}); + +// --------------------------------------------------------------------------- +// Imports (after mocks are registered) +// --------------------------------------------------------------------------- + +import { ProgressBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js'; +import { GET as getJobStream } from './jobs/[id]/stream/+server.js'; +import { GET as getJobsStream } from './jobs/stream/+server.js'; +import { GET as getIndexingSettings, PUT as putIndexingSettings } from './settings/indexing/+server.js'; + +// --------------------------------------------------------------------------- +// DB factory +// --------------------------------------------------------------------------- + +function createTestDb(): Database.Database { + const client = new Database(':memory:'); + client.pragma('foreign_keys = ON'); + + const migrationsFolder = join(import.meta.dirname, '../../../lib/server/db/migrations'); + + for (const migrationFile of [ + '0000_large_master_chief.sql', + '0001_quick_nighthawk.sql', + '0002_silky_stellaris.sql', + '0003_multiversion_config.sql', + '0004_complete_sentry.sql', + '0005_fix_stage_defaults.sql' + ]) { + const sql = readFileSync(join(migrationsFolder, migrationFile), 'utf-8'); + for (const stmt of sql.split('--> statement-breakpoint').map((s) => s.trim()).filter(Boolean)) { + client.exec(stmt); + } + } + + return client; +} + +// --------------------------------------------------------------------------- +// Fixtures +// --------------------------------------------------------------------------- + +const NOW_S = Math.floor(Date.now() / 1000); + +function seedRepo(client: Database.Database, id = '/test/repo'): string { + client + .prepare( + `INSERT INTO repositories + (id, title, source, source_url, state, created_at, updated_at) + VALUES (?, 'Test Repo', 'local', '/tmp/repo', 'indexed', ?, ?)` + ) + .run(id, NOW_S, NOW_S); + return id; +} + +function seedJob( + client: Database.Database, + overrides: Partial<{ + id: string; + repository_id: string; + status: string; + stage: string; + progress: number; + total_files: number; + processed_files: number; + error: string | null; + }> = {} +): string { + const id = overrides.id ?? crypto.randomUUID(); + client + .prepare( + `INSERT INTO indexing_jobs + (id, repository_id, version_id, status, progress, total_files, processed_files, + stage, stage_detail, error, started_at, completed_at, created_at) + VALUES (?, ?, null, ?, ?, ?, ?, ?, null, ?, null, null, ?)` + ) + .run( + id, + overrides.repository_id ?? '/test/repo', + overrides.status ?? 'queued', + overrides.progress ?? 0, + overrides.total_files ?? 0, + overrides.processed_files ?? 0, + overrides.stage ?? 'queued', + overrides.error ?? null, + NOW_S + ); + return id; +} + +/** Build a minimal SvelteKit-compatible RequestEvent for SSE handlers. */ +function makeEvent[0]>(opts: { + params?: Record; + url?: string; + headers?: Record; + body?: unknown; +}): T { + const url = new URL(opts.url ?? 'http://localhost/api/v1/jobs/test/stream'); + const headers = new Headers(opts.headers ?? {}); + return { + params: opts.params ?? {}, + url, + request: new Request(url.toString(), { + method: opts.body ? 'PUT' : 'GET', + headers, + body: opts.body ? JSON.stringify(opts.body) : undefined + }), + route: { id: null }, + locals: {}, + platform: undefined, + cookies: {} as never, + fetch: fetch, + getClientAddress: () => '127.0.0.1', + setHeaders: vi.fn(), + isDataRequest: false, + isSubRequest: false, + depends: vi.fn(), + untrack: vi.fn() + } as unknown as T; +} + +// --------------------------------------------------------------------------- +// Helper: read first chunk from a response body +// --------------------------------------------------------------------------- + +async function readFirstChunk(response: Response): Promise { + const reader = response.body?.getReader(); + if (!reader) throw new Error('Response has no body'); + const { value } = await reader.read(); + reader.releaseLock(); + // Stream enqueues strings directly — no TextDecoder needed + return String(value ?? ''); +} + +// --------------------------------------------------------------------------- +// Test group 1: GET /api/v1/jobs/:id/stream +// --------------------------------------------------------------------------- + +describe('GET /api/v1/jobs/:id/stream', () => { + beforeEach(() => { + db = createTestDb(); + mockBroadcaster = new ProgressBroadcaster(); + }); + + it('returns 404 when the job does not exist', async () => { + seedRepo(db); + + const response = await getJobStream( + makeEvent({ params: { id: 'non-existent-job-id' } }) + ); + + expect(response.status).toBe(404); + }); + + it('returns 503 when broadcaster is not initialized', async () => { + mockBroadcaster = null; + seedRepo(db); + const jobId = seedJob(db); + + const response = await getJobStream(makeEvent({ params: { id: jobId } })); + + expect(response.status).toBe(503); + }); + + it('returns 200 with Content-Type: text/event-stream', async () => { + seedRepo(db); + const jobId = seedJob(db); + + const response = await getJobStream(makeEvent({ params: { id: jobId } })); + + expect(response.status).toBe(200); + expect(response.headers.get('Content-Type')).toContain('text/event-stream'); + }); + + it('first chunk contains the initial job state as a data event', async () => { + seedRepo(db); + const jobId = seedJob(db, { status: 'running', progress: 42 }); + + const response = await getJobStream(makeEvent({ params: { id: jobId } })); + + const text = await readFirstChunk(response); + expect(text).toContain('data:'); + // The initial event carries jobId and status + expect(text).toContain(jobId); + expect(text).toContain('running'); + }); + + it('closes the stream immediately when job status is "done"', async () => { + seedRepo(db); + const jobId = seedJob(db, { status: 'done' }); + + const response = await getJobStream(makeEvent({ params: { id: jobId } })); + + expect(response.status).toBe(200); + // Read both chunks until done + const reader = response.body!.getReader(); + let fullText = ''; + let iterations = 0; + while (iterations < 10) { + const { done, value } = await reader.read(); + if (done) break; + fullText += String(value ?? ''); + iterations++; + } + // Stream should close without blocking (done=true was reached) + expect(fullText).toContain(jobId); + }); + + it('closes the stream immediately when job status is "failed"', async () => { + seedRepo(db); + const jobId = seedJob(db, { status: 'failed', error: 'something went wrong' }); + + const response = await getJobStream(makeEvent({ params: { id: jobId } })); + + expect(response.status).toBe(200); + const reader = response.body!.getReader(); + let fullText = ''; + let iterations = 0; + while (iterations < 10) { + const { done, value } = await reader.read(); + if (done) break; + fullText += String(value ?? ''); + iterations++; + } + expect(fullText).toContain('failed'); + }); + + it('replays last cached event when Last-Event-ID header is provided', async () => { + seedRepo(db); + const jobId = seedJob(db, { status: 'running' }); + + // Pre-seed a cached event in the broadcaster + mockBroadcaster!.broadcast(jobId, '/test/repo', 'progress', { stage: 'parsing', progress: 50 }); + + const response = await getJobStream( + makeEvent({ + params: { id: jobId }, + headers: { 'Last-Event-ID': '1' } + }) + ); + + expect(response.status).toBe(200); + // Consume enough to get both initial state and replay + const reader = response.body!.getReader(); + let fullText = ''; + // Read two chunks + for (let i = 0; i < 2; i++) { + const { done, value } = await reader.read(); + if (done) break; + fullText += String(value ?? ''); + } + reader.releaseLock(); + // The replay event should include the cached event data + expect(fullText).toContain('progress'); + }); +}); + +// --------------------------------------------------------------------------- +// Test group 2: GET /api/v1/jobs/stream +// --------------------------------------------------------------------------- + +describe('GET /api/v1/jobs/stream', () => { + beforeEach(() => { + db = createTestDb(); + mockBroadcaster = new ProgressBroadcaster(); + }); + + it('returns 200 with Content-Type: text/event-stream', async () => { + const response = await getJobsStream( + makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream' }) + ); + + expect(response.status).toBe(200); + expect(response.headers.get('Content-Type')).toContain('text/event-stream'); + }); + + it('returns 503 when broadcaster is not initialized', async () => { + mockBroadcaster = null; + + const response = await getJobsStream( + makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream' }) + ); + + expect(response.status).toBe(503); + }); + + it('uses subscribeRepository when ?repositoryId= is provided', async () => { + const subscribeSpy = vi.spyOn(mockBroadcaster!, 'subscribeRepository'); + + await getJobsStream( + makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream?repositoryId=/test/repo' }) + ); + + expect(subscribeSpy).toHaveBeenCalledWith('/test/repo'); + }); + + it('uses subscribeAll when no repositoryId query param is present', async () => { + const subscribeSpy = vi.spyOn(mockBroadcaster!, 'subscribeAll'); + + await getJobsStream( + makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream' }) + ); + + expect(subscribeSpy).toHaveBeenCalled(); + }); + + it('broadcasts to stream subscribers for the correct repository', async () => { + seedRepo(db, '/repo/alpha'); + + const response = await getJobsStream( + makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream?repositoryId=/repo/alpha' }) + ); + + // Broadcast an event for this repository + mockBroadcaster!.broadcast('job-123', '/repo/alpha', 'progress', { stage: 'parsing' }); + + const reader = response.body!.getReader(); + const { value } = await reader.read(); + const text = String(value ?? ''); + reader.releaseLock(); + + expect(text).toContain('progress'); + }); +}); + +// --------------------------------------------------------------------------- +// Test group 3: GET /api/v1/settings/indexing +// --------------------------------------------------------------------------- + +describe('GET /api/v1/settings/indexing', () => { + beforeEach(() => { + db = createTestDb(); + }); + + it('returns { concurrency: 2 } when no setting exists in DB', async () => { + const response = await getIndexingSettings(makeEvent[0]>({})); + const body = await response.json(); + + expect(response.status).toBe(200); + expect(body).toEqual({ concurrency: 2 }); + }); + + it('returns the stored concurrency when a setting exists', async () => { + db.prepare( + "INSERT INTO settings (key, value, updated_at) VALUES ('indexing.concurrency', ?, ?)" + ).run(JSON.stringify(4), NOW_S); + + const response = await getIndexingSettings(makeEvent[0]>({})); + const body = await response.json(); + + expect(body.concurrency).toBe(4); + }); + + it('parses JSON-wrapped value correctly: {"value": 5}', async () => { + db.prepare( + "INSERT INTO settings (key, value, updated_at) VALUES ('indexing.concurrency', ?, ?)" + ).run(JSON.stringify({ value: 5 }), NOW_S); + + const response = await getIndexingSettings(makeEvent[0]>({})); + const body = await response.json(); + + expect(body.concurrency).toBe(5); + }); +}); + +// --------------------------------------------------------------------------- +// Test group 4: PUT /api/v1/settings/indexing +// --------------------------------------------------------------------------- + +describe('PUT /api/v1/settings/indexing', () => { + beforeEach(() => { + db = createTestDb(); + }); + + function makePutEvent(body: unknown) { + const url = new URL('http://localhost/api/v1/settings/indexing'); + return { + params: {}, + url, + request: new Request(url.toString(), { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body) + }), + route: { id: null }, + locals: {}, + platform: undefined, + cookies: {} as never, + fetch: fetch, + getClientAddress: () => '127.0.0.1', + setHeaders: vi.fn(), + isDataRequest: false, + isSubRequest: false, + depends: vi.fn(), + untrack: vi.fn() + } as unknown as Parameters[0]; + } + + it('returns 200 with { concurrency } for a valid integer input', async () => { + const response = await putIndexingSettings(makePutEvent({ concurrency: 3 })); + const body = await response.json(); + + expect(response.status).toBe(200); + expect(body.concurrency).toBe(3); + }); + + it('persists the new concurrency to the settings table', async () => { + await putIndexingSettings(makePutEvent({ concurrency: 3 })); + + const row = db + .prepare<[], { value: string }>( + "SELECT value FROM settings WHERE key = 'indexing.concurrency'" + ) + .get(); + + expect(row).toBeDefined(); + const parsed = JSON.parse(row!.value); + expect(parsed.value).toBe(3); + }); + + it('clamps to minimum of 1', async () => { + const response = await putIndexingSettings(makePutEvent({ concurrency: 0 })); + const body = await response.json(); + + expect(body.concurrency).toBeGreaterThanOrEqual(1); + }); + + it('clamps to maximum of max(cpus-1, 1)', async () => { + // Pass an absurdly large value; it must be clamped + const response = await putIndexingSettings(makePutEvent({ concurrency: 99999 })); + const body = await response.json(); + + const os = await import('node:os'); + const expectedMax = Math.max(os.cpus().length - 1, 1); + expect(body.concurrency).toBeLessThanOrEqual(expectedMax); + }); + + it('returns 400 for NaN concurrency (non-numeric string)', async () => { + // parseInt('abc', 10) is NaN → should return 400 + // However, the implementation uses `parseInt(String(body.concurrency ?? 2), 10)` + // and then checks isNaN — but the isNaN check is AFTER the Math.max/min clamping. + // The actual flow: parseInt('abc') => NaN, Math.max(1, Math.min(NaN, max)) => NaN, + // then `if (isNaN(concurrency))` returns 400. + // We pass the raw string directly. + const response = await putIndexingSettings( + makePutEvent({ concurrency: 'not-a-number' }) + ); + + // parseInt('not-a-number') = NaN, so the handler should return 400 + expect(response.status).toBe(400); + }); + + it('uses concurrency=2 as default when body.concurrency is missing', async () => { + const response = await putIndexingSettings(makePutEvent({})); + const body = await response.json(); + + // Default is 2 per the code: `body.concurrency ?? 2` + expect(body.concurrency).toBe(2); + }); +}); diff --git a/src/routes/repos/[id]/+page.svelte b/src/routes/repos/[id]/+page.svelte index e54b04e..3b25aa8 100644 --- a/src/routes/repos/[id]/+page.svelte +++ b/src/routes/repos/[id]/+page.svelte @@ -2,6 +2,7 @@ import { goto } from '$app/navigation'; import { resolve as resolveRoute } from '$app/paths'; import { onMount } from 'svelte'; + import { SvelteSet } from 'svelte/reactivity'; import type { PageData } from './$types'; import type { Repository, IndexingJob } from '$lib/types'; import ConfirmDialog from '$lib/components/ConfirmDialog.svelte'; @@ -48,7 +49,7 @@ // Discover tags state let discoverBusy = $state(false); let discoveredTags = $state>([]); - let selectedDiscoveredTags = $state>(new Set()); + let selectedDiscoveredTags = new SvelteSet(); let showDiscoverPanel = $state(false); let registerBusy = $state(false); @@ -75,6 +76,18 @@ error: 'Error' }; + const stageLabels: Record = { + queued: 'Queued', + differential: 'Diff', + crawling: 'Crawling', + cloning: 'Cloning', + parsing: 'Parsing', + storing: 'Storing', + embedding: 'Embedding', + done: 'Done', + failed: 'Failed' + }; + async function refreshRepo() { try { const res = await fetch(`/api/v1/libs/${encodeURIComponent(repo.id)}`); @@ -105,63 +118,78 @@ loadVersions(); }); - // Single shared poller — one interval regardless of how many tags are active. - // This replaces the N per-version components that each had - // their own setInterval, which caused ERR_INSUFFICIENT_RESOURCES and UI lockup - // when hundreds of tags were queued simultaneously. + // Single shared poller replaced with EventSource SSE stream $effect(() => { - const activeIds = new Set( - Object.values(activeVersionJobs).filter((id): id is string => !!id) - ); - if (activeIds.size === 0) { - versionJobProgress = {}; - return; - } + if (!repo.id) return; let stopped = false; + const es = new EventSource( + `/api/v1/jobs/stream?repositoryId=${encodeURIComponent(repo.id)}` + ); - async function poll() { + es.addEventListener('job-progress', (event) => { if (stopped) return; try { - const res = await fetch( - `/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000` - ); - if (!res.ok || stopped) return; - const d = await res.json(); - - // Build a jobId → job lookup from the response. - const map: Record = {}; - for (const job of (d.jobs ?? []) as IndexingJob[]) { - map[job.id] = job; - } - if (!stopped) versionJobProgress = map; - - // Retire completed jobs and trigger a single refresh. - let anyCompleted = false; - const nextJobs = { ...activeVersionJobs }; - for (const [tag, jobId] of Object.entries(activeVersionJobs)) { - if (!jobId) continue; - const job = map[jobId]; - if (job?.status === 'done' || job?.status === 'failed') { - delete nextJobs[tag]; - anyCompleted = true; - } - } - if (anyCompleted && !stopped) { - activeVersionJobs = nextJobs; - void loadVersions(); - void refreshRepo(); - } + const data = JSON.parse(event.data) as IndexingJob; + versionJobProgress = { ...versionJobProgress, [data.id]: data }; } catch { - // ignore transient errors + // ignore parse errors } - } + }); + + es.addEventListener('job-done', (event) => { + if (stopped) return; + try { + const data = JSON.parse(event.data) as IndexingJob; + const next = { ...versionJobProgress }; + delete next[data.id]; + versionJobProgress = next; + void loadVersions(); + void refreshRepo(); + } catch { + // ignore parse errors + } + }); + + es.addEventListener('job-failed', (event) => { + if (stopped) return; + try { + const data = JSON.parse(event.data) as IndexingJob; + const next = { ...versionJobProgress }; + delete next[data.id]; + versionJobProgress = next; + void loadVersions(); + void refreshRepo(); + } catch { + // ignore parse errors + } + }); + + es.onerror = () => { + if (stopped) return; + es.close(); + // Fall back to a single fetch for resilience + (async () => { + try { + const res = await fetch( + `/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000` + ); + if (!res.ok || stopped) return; + const d = await res.json(); + const map: Record = {}; + for (const job of (d.jobs ?? []) as IndexingJob[]) { + map[job.id] = job; + } + if (!stopped) versionJobProgress = map; + } catch { + // ignore errors + } + })(); + }; - void poll(); - const interval = setInterval(poll, 2000); return () => { stopped = true; - clearInterval(interval); + es.close(); }; }); @@ -303,7 +331,7 @@ discoveredTags = (d.tags ?? []).filter( (t: { tag: string; commitHash: string }) => !registeredTags.has(t.tag) ); - selectedDiscoveredTags = new Set(discoveredTags.map((t) => t.tag)); + selectedDiscoveredTags = new SvelteSet(discoveredTags.map((t) => t.tag)); showDiscoverPanel = true; } catch (e) { errorMessage = (e as Error).message; @@ -313,13 +341,11 @@ } function toggleDiscoveredTag(tag: string) { - const next = new Set(selectedDiscoveredTags); - if (next.has(tag)) { - next.delete(tag); + if (selectedDiscoveredTags.has(tag)) { + selectedDiscoveredTags.delete(tag); } else { - next.add(tag); + selectedDiscoveredTags.add(tag); } - selectedDiscoveredTags = next; } async function handleRegisterSelected() { @@ -354,7 +380,7 @@ activeVersionJobs = next; showDiscoverPanel = false; discoveredTags = []; - selectedDiscoveredTags = new Set(); + selectedDiscoveredTags = new SvelteSet(); await loadVersions(); } catch (e) { errorMessage = (e as Error).message; @@ -523,7 +549,7 @@ onclick={() => { showDiscoverPanel = false; discoveredTags = []; - selectedDiscoveredTags = new Set(); + selectedDiscoveredTags = new SvelteSet(); }} class="text-xs text-blue-600 hover:underline" > @@ -620,7 +646,10 @@ {@const job = versionJobProgress[activeVersionJobs[version.tag]!]}
- {(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files + + {#if job?.stageDetail}{job.stageDetail}{:else}{(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files{/if} + {#if job?.stage}{' - ' + (stageLabels[job.stage] ?? job.stage)}{/if} + {job?.progress ?? 0}%
diff --git a/src/routes/settings/+page.server.ts b/src/routes/settings/+page.server.ts index 9b1f617..403faeb 100644 --- a/src/routes/settings/+page.server.ts +++ b/src/routes/settings/+page.server.ts @@ -5,7 +5,9 @@ import { EmbeddingSettingsDtoMapper } from '$lib/server/mappers/embedding-settin import { EmbeddingSettingsService } from '$lib/server/services/embedding-settings.service.js'; export const load: PageServerLoad = async () => { - const service = new EmbeddingSettingsService(getClient()); + const db = getClient(); + + const service = new EmbeddingSettingsService(db); const settings = EmbeddingSettingsDtoMapper.toDto(service.getSettings()); let localProviderAvailable = false; @@ -15,8 +17,30 @@ export const load: PageServerLoad = async () => { localProviderAvailable = false; } + // Read indexing concurrency setting + let indexingConcurrency = 2; + const concurrencyRow = db + .prepare<[], { value: string }>( + "SELECT value FROM settings WHERE key = 'indexing.concurrency'" + ) + .get(); + + if (concurrencyRow && concurrencyRow.value) { + try { + const parsed = JSON.parse(concurrencyRow.value); + if (typeof parsed === 'object' && parsed !== null && typeof parsed.value === 'number') { + indexingConcurrency = parsed.value; + } else if (typeof parsed === 'number') { + indexingConcurrency = parsed; + } + } catch { + indexingConcurrency = 2; + } + } + return { settings, - localProviderAvailable + localProviderAvailable, + indexingConcurrency }; }; \ No newline at end of file diff --git a/src/routes/settings/+page.svelte b/src/routes/settings/+page.svelte index 9790bfb..bf149f2 100644 --- a/src/routes/settings/+page.svelte +++ b/src/routes/settings/+page.svelte @@ -66,12 +66,19 @@ let saveError = $state(null); let saveStatusTimer: ReturnType | null = null; + let concurrencyInput = $derived(data.indexingConcurrency); + let concurrencySaving = $state(false); + let concurrencySaveStatus = $state<'idle' | 'ok' | 'error'>('idle'); + let concurrencySaveError = $state(null); + let concurrencySaveStatusTimer: ReturnType | null = null; + const currentSettings = $derived(settingsOverride ?? data.settings); const activeProfile = $derived(currentSettings.activeProfile); const activeConfigEntries = $derived(activeProfile?.configEntries ?? []); onDestroy(() => { if (saveStatusTimer) clearTimeout(saveStatusTimer); + if (concurrencySaveStatusTimer) clearTimeout(concurrencySaveStatusTimer); }); // --------------------------------------------------------------------------- @@ -159,6 +166,38 @@ void save(); } + async function saveConcurrency() { + concurrencySaving = true; + concurrencySaveStatus = 'idle'; + concurrencySaveError = null; + try { + const res = await fetch('/api/v1/settings/indexing', { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ concurrency: concurrencyInput }) + }); + if (res.ok) { + const updated = await res.json(); + concurrencyInput = updated.concurrency; + concurrencySaveStatus = 'ok'; + if (concurrencySaveStatusTimer) clearTimeout(concurrencySaveStatusTimer); + concurrencySaveStatusTimer = setTimeout(() => { + concurrencySaveStatus = 'idle'; + concurrencySaveStatusTimer = null; + }, 3000); + } else { + const data = await res.json(); + concurrencySaveStatus = 'error'; + concurrencySaveError = data.error ?? 'Save failed'; + } + } catch (e) { + concurrencySaveStatus = 'error'; + concurrencySaveError = (e as Error).message; + } finally { + concurrencySaving = false; + } + } + function getOpenAiProfile(settings: EmbeddingSettingsDto): EmbeddingProfileDto | null { return settings.profiles.find((profile) => profile.providerKind === 'openai-compatible') ?? null; } @@ -482,6 +521,45 @@
{/if} + +
+
+ +

+ Number of parallel indexing workers. Range: 1 to 8. +

+
+ +
+ + + + {#if concurrencySaveStatus === 'ok'} + ✓ Saved + {:else if concurrencySaveStatus === 'error'} + {concurrencySaveError} + {/if} +
+
+ {#if saveStatus === 'ok'}