feat(TRUEREF-0022): complete iteration 0 — worker-thread indexing, parallel jobs, SSE progress

- Move IndexingPipeline.run() into Worker Threads via WorkerPool
- Add dedicated embedding worker thread with single model instance
- Add stage/stageDetail columns to indexing_jobs schema
- Create ProgressBroadcaster for SSE channel management
- Add SSE endpoints: GET /api/v1/jobs/:id/stream, GET /api/v1/jobs/stream
- Replace UI polling with EventSource on repo detail and admin pages
- Add concurrency settings UI and API endpoint
- Build worker entries separately via esbuild
This commit is contained in:
Giancarmine Salucci
2026-03-30 17:08:23 +02:00
parent 6f3f4db19b
commit 7630740403
30 changed files with 2659 additions and 991 deletions

View File

@@ -110,11 +110,49 @@
}
}
// Auto-refresh every 3 seconds
// Auto-refresh with EventSource streaming + fallback polling
$effect(() => {
fetchJobs();
const interval = setInterval(fetchJobs, 3000);
return () => clearInterval(interval);
const es = new EventSource('/api/v1/jobs/stream');
let fallbackInterval: ReturnType<typeof setInterval> | null = null;
es.addEventListener('job-progress', (event) => {
const data = JSON.parse(event.data);
jobs = jobs.map((j) =>
j.id === data.jobId
? {
...j,
progress: data.progress,
stage: data.stage,
stageDetail: data.stageDetail,
processedFiles: data.processedFiles,
totalFiles: data.totalFiles
}
: j
);
});
es.addEventListener('job-done', () => {
void fetchJobs();
});
es.addEventListener('job-failed', () => {
void fetchJobs();
});
es.onerror = () => {
es.close();
// Fall back to polling on error
fallbackInterval = setInterval(fetchJobs, 3000);
};
return () => {
es.close();
if (fallbackInterval) {
clearInterval(fallbackInterval);
}
};
});
// Format date for display
@@ -135,6 +173,23 @@
function canCancel(status: IndexingJobDto['status']): boolean {
return status !== 'done' && status !== 'failed';
}
// Map IndexingStage values to display labels
const stageLabels: Record<string, string> = {
queued: 'Queued',
differential: 'Diff',
crawling: 'Crawling',
cloning: 'Cloning',
parsing: 'Parsing',
storing: 'Storing',
embedding: 'Embedding',
done: 'Done',
failed: 'Failed'
};
function getStageLabel(stage: string | undefined): string {
return stage ? (stageLabels[stage] ?? stage) : '—';
}
</script>
<svelte:head>
@@ -181,6 +236,11 @@
>
Status
</th>
<th
class="px-6 py-3 text-left text-xs font-medium tracking-wider text-gray-500 uppercase"
>
Stage
</th>
<th
class="px-6 py-3 text-left text-xs font-medium tracking-wider text-gray-500 uppercase"
>
@@ -210,22 +270,30 @@
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
<JobStatusBadge status={job.status} />
</td>
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
<div class="flex items-center">
<span class="mr-2">{job.progress}%</span>
<div class="h-2 w-32 rounded-full bg-gray-200">
<div
class="h-2 rounded-full bg-blue-600 transition-all"
style="width: {job.progress}%"
></div>
</div>
{#if job.totalFiles > 0}
<span class="ml-2 text-xs text-gray-400">
{job.processedFiles}/{job.totalFiles} files
</span>
{/if}
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
<div class="flex items-center gap-2">
<span>{getStageLabel(job.stage)}</span>
{#if job.stageDetail}
<span class="text-xs text-gray-400">{job.stageDetail}</span>
{/if}
</div>
</td>
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
<div class="flex items-center">
<span class="mr-2">{job.progress}%</span>
<div class="h-2 w-32 rounded-full bg-gray-200">
<div
class="h-2 rounded-full bg-blue-600 transition-all"
style="width: {job.progress}%"
></div>
</div>
</td>
{#if job.totalFiles > 0}
<span class="ml-2 text-xs text-gray-400">
{job.processedFiles}/{job.totalFiles} files
</span>
{/if}
</div>
</td>
<td class="px-6 py-4 text-sm whitespace-nowrap text-gray-500">
{formatDate(job.createdAt)}
</td>

View File

@@ -56,6 +56,7 @@ function createTestDb(): Database.Database {
const migration1 = readFileSync(join(migrationsFolder, '0001_quick_nighthawk.sql'), 'utf-8');
const migration2 = readFileSync(join(migrationsFolder, '0002_silky_stellaris.sql'), 'utf-8');
const migration3 = readFileSync(join(migrationsFolder, '0003_multiversion_config.sql'), 'utf-8');
const migration4 = readFileSync(join(migrationsFolder, '0004_complete_sentry.sql'), 'utf-8');
// Apply first migration
const statements0 = migration0
@@ -95,6 +96,15 @@ function createTestDb(): Database.Database {
client.exec(statement);
}
const statements4 = migration4
.split('--> statement-breakpoint')
.map((statement) => statement.trim())
.filter(Boolean);
for (const statement of statements4) {
client.exec(statement);
}
client.exec(readFileSync(ftsFile, 'utf-8'));
return client;

View File

@@ -0,0 +1,115 @@
/**
* GET /api/v1/jobs/:id/stream — stream real-time job progress via SSE.
*
* Headers:
* Last-Event-ID (optional) — triggers replay of last cached event
*/
import type { RequestHandler } from './$types';
import { getClient } from '$lib/server/db/client.js';
import { JobQueue } from '$lib/server/pipeline/job-queue.js';
import { getBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js';
import { handleServiceError } from '$lib/server/utils/validation.js';
export const GET: RequestHandler = ({ params, request }) => {
try {
const db = getClient();
const queue = new JobQueue(db);
const jobId = params.id;
// Get the job from the queue
const job = queue.getJob(jobId);
if (!job) {
return new Response('Not found', { status: 404 });
}
// Get broadcaster
const broadcaster = getBroadcaster();
if (!broadcaster) {
return new Response('Service unavailable', { status: 503 });
}
// Create a new readable stream for SSE
const stream = new ReadableStream<string>({
async start(controller) {
try {
// Send initial job state as first event
const initialData = {
jobId,
stage: job.stage,
stageDetail: job.stageDetail,
progress: job.progress,
processedFiles: job.processedFiles,
totalFiles: job.totalFiles,
status: job.status,
error: job.error
};
controller.enqueue(`data: ${JSON.stringify(initialData)}\n\n`);
// Check for Last-Event-ID header for reconnect
const lastEventId = request.headers.get('Last-Event-ID');
if (lastEventId) {
const lastEvent = broadcaster.getLastEvent(jobId);
if (lastEvent && lastEvent.id >= parseInt(lastEventId, 10)) {
controller.enqueue(`id: ${lastEvent.id}\nevent: ${lastEvent.event}\ndata: ${lastEvent.data}\n\n`);
}
}
// Check if job is already done or failed - close immediately after first event
if (job.status === 'done' || job.status === 'failed') {
controller.close();
return;
}
// Subscribe to broadcaster for live events
const eventStream = broadcaster.subscribe(jobId);
const reader = eventStream.getReader();
// Pipe broadcaster events to the response
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value);
// Check if the incoming event indicates job completion
if (value.includes('event: done') || value.includes('event: failed')) {
controller.close();
break;
}
}
} finally {
reader.releaseLock();
controller.close();
}
} catch (err) {
console.error('SSE stream error:', err);
controller.close();
}
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
'Access-Control-Allow-Origin': '*'
}
});
} catch (err) {
return handleServiceError(err);
}
};
export const OPTIONS: RequestHandler = () => {
return new Response(null, {
status: 204,
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization, Last-Event-ID'
}
});
};

View File

@@ -0,0 +1,52 @@
/**
* GET /api/v1/jobs/stream — stream real-time job progress for all jobs or a specific repository via SSE.
*
* Query parameters:
* repositoryId (optional) — filter to jobs for this repository
*/
import type { RequestHandler } from './$types';
import { getBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js';
import { handleServiceError } from '$lib/server/utils/validation.js';
export const GET: RequestHandler = ({ url }) => {
try {
const broadcaster = getBroadcaster();
if (!broadcaster) {
return new Response('Service unavailable', { status: 503 });
}
const repositoryId = url.searchParams.get('repositoryId');
// Get the appropriate stream based on parameters
let stream;
if (repositoryId) {
stream = broadcaster.subscribeRepository(repositoryId);
} else {
stream = broadcaster.subscribeAll();
}
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
'Access-Control-Allow-Origin': '*'
}
});
} catch (err) {
return handleServiceError(err);
}
};
export const OPTIONS: RequestHandler = () => {
return new Response(null, {
status: 204,
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization'
}
});
};

View File

@@ -0,0 +1,99 @@
import { json } from '@sveltejs/kit';
import type { RequestHandler } from './$types';
import { getClient } from '$lib/server/db/client.js';
import { getPool } from '$lib/server/pipeline/startup.js';
import os from 'node:os';
/**
* GET /api/v1/settings/indexing — retrieve indexing concurrency setting
* PUT /api/v1/settings/indexing — update indexing concurrency setting
* OPTIONS /api/v1/settings/indexing — CORS preflight
*/
// ---------------------------------------------------------------------------
// GET — Return current indexing concurrency
// ---------------------------------------------------------------------------
export const GET: RequestHandler = () => {
try {
const db = getClient();
const row = db
.prepare<[], { value: string }>(
"SELECT value FROM settings WHERE key = 'indexing.concurrency'"
)
.get();
let concurrency = 2;
if (row && row.value) {
try {
const parsed = JSON.parse(row.value);
if (typeof parsed === 'object' && parsed !== null && typeof parsed.value === 'number') {
concurrency = parsed.value;
} else if (typeof parsed === 'number') {
concurrency = parsed;
}
} catch {
concurrency = 2;
}
}
return json({ concurrency });
} catch (err) {
console.error('GET /api/v1/settings/indexing error:', err);
return json({ error: 'Failed to read indexing settings' }, { status: 500 });
}
};
// ---------------------------------------------------------------------------
// PUT — Update indexing concurrency
// ---------------------------------------------------------------------------
export const PUT: RequestHandler = async ({ request }) => {
try {
const body = await request.json();
// Validate and clamp concurrency
const maxConcurrency = Math.max(os.cpus().length - 1, 1);
const concurrency = Math.max(1, Math.min(parseInt(String(body.concurrency ?? 2), 10), maxConcurrency));
if (isNaN(concurrency)) {
return json(
{ error: 'Concurrency must be a valid integer' },
{ status: 400 }
);
}
const db = getClient();
// Write to settings table
db.prepare(
"INSERT OR REPLACE INTO settings (key, value, updated_at) VALUES ('indexing.concurrency', ?, unixepoch())"
).run(JSON.stringify({ value: concurrency }));
// Update worker pool if available
getPool()?.setMaxConcurrency(concurrency);
return json({ concurrency });
} catch (err) {
console.error('PUT /api/v1/settings/indexing error:', err);
return json(
{ error: err instanceof Error ? err.message : 'Failed to update indexing settings' },
{ status: 500 }
);
}
};
// ---------------------------------------------------------------------------
// OPTIONS — CORS preflight
// ---------------------------------------------------------------------------
export const OPTIONS: RequestHandler = () => {
return new Response(null, {
status: 200,
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, PUT, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type'
}
});
};

View File

@@ -75,6 +75,18 @@
error: 'Error'
};
const stageLabels: Record<string, string> = {
queued: 'Queued',
differential: 'Diff',
crawling: 'Crawling',
cloning: 'Cloning',
parsing: 'Parsing',
storing: 'Storing',
embedding: 'Embedding',
done: 'Done',
failed: 'Failed'
};
async function refreshRepo() {
try {
const res = await fetch(`/api/v1/libs/${encodeURIComponent(repo.id)}`);
@@ -105,63 +117,78 @@
loadVersions();
});
// Single shared poller — one interval regardless of how many tags are active.
// This replaces the N per-version <IndexingProgress> components that each had
// their own setInterval, which caused ERR_INSUFFICIENT_RESOURCES and UI lockup
// when hundreds of tags were queued simultaneously.
// Single shared poller replaced with EventSource SSE stream
$effect(() => {
const activeIds = new Set(
Object.values(activeVersionJobs).filter((id): id is string => !!id)
);
if (activeIds.size === 0) {
versionJobProgress = {};
return;
}
if (!repo.id) return;
let stopped = false;
const es = new EventSource(
`/api/v1/jobs/stream?repositoryId=${encodeURIComponent(repo.id)}`
);
async function poll() {
es.addEventListener('job-progress', (event) => {
if (stopped) return;
try {
const res = await fetch(
`/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000`
);
if (!res.ok || stopped) return;
const d = await res.json();
// Build a jobId → job lookup from the response.
const map: Record<string, IndexingJob> = {};
for (const job of (d.jobs ?? []) as IndexingJob[]) {
map[job.id] = job;
}
if (!stopped) versionJobProgress = map;
// Retire completed jobs and trigger a single refresh.
let anyCompleted = false;
const nextJobs = { ...activeVersionJobs };
for (const [tag, jobId] of Object.entries(activeVersionJobs)) {
if (!jobId) continue;
const job = map[jobId];
if (job?.status === 'done' || job?.status === 'failed') {
delete nextJobs[tag];
anyCompleted = true;
}
}
if (anyCompleted && !stopped) {
activeVersionJobs = nextJobs;
void loadVersions();
void refreshRepo();
}
const data = JSON.parse(event.data) as IndexingJob;
versionJobProgress = { ...versionJobProgress, [data.id]: data };
} catch {
// ignore transient errors
// ignore parse errors
}
}
});
es.addEventListener('job-done', (event) => {
if (stopped) return;
try {
const data = JSON.parse(event.data) as IndexingJob;
const next = { ...versionJobProgress };
delete next[data.id];
versionJobProgress = next;
void loadVersions();
void refreshRepo();
} catch {
// ignore parse errors
}
});
es.addEventListener('job-failed', (event) => {
if (stopped) return;
try {
const data = JSON.parse(event.data) as IndexingJob;
const next = { ...versionJobProgress };
delete next[data.id];
versionJobProgress = next;
void loadVersions();
void refreshRepo();
} catch {
// ignore parse errors
}
});
es.onerror = () => {
if (stopped) return;
es.close();
// Fall back to a single fetch for resilience
(async () => {
try {
const res = await fetch(
`/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000`
);
if (!res.ok || stopped) return;
const d = await res.json();
const map: Record<string, IndexingJob> = {};
for (const job of (d.jobs ?? []) as IndexingJob[]) {
map[job.id] = job;
}
if (!stopped) versionJobProgress = map;
} catch {
// ignore errors
}
})();
};
void poll();
const interval = setInterval(poll, 2000);
return () => {
stopped = true;
clearInterval(interval);
es.close();
};
});
@@ -620,7 +647,10 @@
{@const job = versionJobProgress[activeVersionJobs[version.tag]!]}
<div class="mt-2">
<div class="flex justify-between text-xs text-gray-500">
<span>{(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files</span>
<span>
{#if job?.stageDetail}{job.stageDetail}{:else}{(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files{/if}
{#if job?.stage}{' - ' + stageLabels[job.stage] ?? job.stage}{/if}
</span>
<span>{job?.progress ?? 0}%</span>
</div>
<div class="mt-1 h-1.5 w-full rounded-full bg-gray-200">

View File

@@ -5,7 +5,9 @@ import { EmbeddingSettingsDtoMapper } from '$lib/server/mappers/embedding-settin
import { EmbeddingSettingsService } from '$lib/server/services/embedding-settings.service.js';
export const load: PageServerLoad = async () => {
const service = new EmbeddingSettingsService(getClient());
const db = getClient();
const service = new EmbeddingSettingsService(db);
const settings = EmbeddingSettingsDtoMapper.toDto(service.getSettings());
let localProviderAvailable = false;
@@ -15,8 +17,30 @@ export const load: PageServerLoad = async () => {
localProviderAvailable = false;
}
// Read indexing concurrency setting
let indexingConcurrency = 2;
const concurrencyRow = db
.prepare<[], { value: string }>(
"SELECT value FROM settings WHERE key = 'indexing.concurrency'"
)
.get();
if (concurrencyRow && concurrencyRow.value) {
try {
const parsed = JSON.parse(concurrencyRow.value);
if (typeof parsed === 'object' && parsed !== null && typeof parsed.value === 'number') {
indexingConcurrency = parsed.value;
} else if (typeof parsed === 'number') {
indexingConcurrency = parsed;
}
} catch {
indexingConcurrency = 2;
}
}
return {
settings,
localProviderAvailable
localProviderAvailable,
indexingConcurrency
};
};

View File

@@ -66,12 +66,23 @@
let saveError = $state<string | null>(null);
let saveStatusTimer: ReturnType<typeof setTimeout> | null = null;
let concurrencyInput = $state<number>(0);
let concurrencySaving = $state(false);
let concurrencySaveStatus = $state<'idle' | 'ok' | 'error'>('idle');
let concurrencySaveError = $state<string | null>(null);
let concurrencySaveStatusTimer: ReturnType<typeof setTimeout> | null = null;
$effect(() => {
concurrencyInput = data.indexingConcurrency;
});
const currentSettings = $derived(settingsOverride ?? data.settings);
const activeProfile = $derived(currentSettings.activeProfile);
const activeConfigEntries = $derived(activeProfile?.configEntries ?? []);
onDestroy(() => {
if (saveStatusTimer) clearTimeout(saveStatusTimer);
if (concurrencySaveStatusTimer) clearTimeout(concurrencySaveStatusTimer);
});
// ---------------------------------------------------------------------------
@@ -159,6 +170,38 @@
void save();
}
async function saveConcurrency() {
concurrencySaving = true;
concurrencySaveStatus = 'idle';
concurrencySaveError = null;
try {
const res = await fetch('/api/v1/settings/indexing', {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ concurrency: concurrencyInput })
});
if (res.ok) {
const updated = await res.json();
concurrencyInput = updated.concurrency;
concurrencySaveStatus = 'ok';
if (concurrencySaveStatusTimer) clearTimeout(concurrencySaveStatusTimer);
concurrencySaveStatusTimer = setTimeout(() => {
concurrencySaveStatus = 'idle';
concurrencySaveStatusTimer = null;
}, 3000);
} else {
const data = await res.json();
concurrencySaveStatus = 'error';
concurrencySaveError = data.error ?? 'Save failed';
}
} catch (e) {
concurrencySaveStatus = 'error';
concurrencySaveError = (e as Error).message;
} finally {
concurrencySaving = false;
}
}
function getOpenAiProfile(settings: EmbeddingSettingsDto): EmbeddingProfileDto | null {
return settings.profiles.find((profile) => profile.providerKind === 'openai-compatible') ?? null;
}
@@ -482,6 +525,45 @@
</div>
{/if}
<!-- Indexing section -->
<div class="space-y-3 rounded-lg border border-gray-200 bg-white p-4">
<div>
<label for="concurrency" class="block text-sm font-medium text-gray-700">
Concurrent Workers
</label>
<p class="mt-0.5 text-xs text-gray-500">
Number of parallel indexing workers. Range: 1 to 8.
</p>
</div>
<div class="flex items-center gap-3">
<input
id="concurrency"
type="number"
min="1"
max="8"
inputmode="numeric"
bind:value={concurrencyInput}
disabled={concurrencySaving}
class="w-20 rounded-lg border border-gray-300 px-3 py-2 text-sm focus:border-blue-500 focus:outline-none disabled:opacity-50"
/>
<button
type="button"
onclick={saveConcurrency}
disabled={concurrencySaving}
class="rounded-lg bg-blue-600 px-3 py-2 text-sm text-white hover:bg-blue-700 disabled:opacity-50"
>
{concurrencySaving ? 'Saving…' : 'Save'}
</button>
{#if concurrencySaveStatus === 'ok'}
<span class="text-sm text-green-600">✓ Saved</span>
{:else if concurrencySaveStatus === 'error'}
<span class="text-sm text-red-600">{concurrencySaveError}</span>
{/if}
</div>
</div>
<!-- Save feedback banners -->
{#if saveStatus === 'ok'}
<div