fix(TRUEREF-0021): reduce event loop blocking, add busy_timeout, and add TRUEREF-0022 PRD

This commit is contained in:
U811073
2026-03-30 15:06:54 +02:00
committed by Giancarmine Salucci
parent f4fe8c6043
commit 6f3f4db19b
6 changed files with 577 additions and 34 deletions

View File

@@ -13,6 +13,7 @@ export function getClient(): Database.Database {
_client = new Database(env.DATABASE_URL);
_client.pragma('journal_mode = WAL');
_client.pragma('foreign_keys = ON');
_client.pragma('busy_timeout = 5000');
}
return _client;
}

View File

@@ -15,6 +15,10 @@ const client = new Database(env.DATABASE_URL);
client.pragma('journal_mode = WAL');
// Enforce foreign key constraints.
client.pragma('foreign_keys = ON');
// Wait up to 5 s when the DB is locked instead of failing immediately.
// Prevents SQLITE_BUSY errors when the indexing pipeline holds the write lock
// and an HTTP request arrives simultaneously.
client.pragma('busy_timeout = 5000');
export const db = drizzle(client, { schema });

View File

@@ -215,7 +215,19 @@ export class IndexingPipeline {
this.updateJob(job.id, { processedFiles, progress: initialProgress });
}
// Yield the event loop and flush progress every N files.
// Lower = more responsive UI; higher = less overhead.
const YIELD_EVERY = 20;
for (const [i, file] of filesToProcess.entries()) {
// Yield the Node.js event loop periodically so the HTTP server can
// handle incoming requests (navigation, polling) between file parses.
// Without this, the synchronous parse + SQLite work blocks the thread
// entirely and the UI becomes unresponsive during indexing.
if (i > 0 && i % YIELD_EVERY === 0) {
await new Promise<void>((resolve) => setImmediate(resolve));
}
const checksum = file.sha || sha256(file.content);
// Create new document record.
@@ -247,16 +259,20 @@ export class IndexingPipeline {
newDocuments.push(newDoc);
newSnippets.push(...snippets);
// Count ALL files (including skipped unchanged ones) in progress.
// Write progress to the DB only on yield boundaries or the final file.
// Avoids a synchronous SQLite UPDATE on every single iteration.
const totalProcessed = diff.unchanged.length + i + 1;
const progress = calculateProgress(
totalProcessed,
totalFiles,
0,
0,
this.embeddingService !== null
);
this.updateJob(job.id, { processedFiles: totalProcessed, progress });
const isLast = i === filesToProcess.length - 1;
if (isLast || i % YIELD_EVERY === YIELD_EVERY - 1) {
const progress = calculateProgress(
totalProcessed,
totalFiles,
0,
0,
this.embeddingService !== null
);
this.updateJob(job.id, { processedFiles: totalProcessed, progress });
}
}
// After the loop processedFiles should reflect the full count.

View File

@@ -24,7 +24,7 @@ export const GET: RequestHandler = ({ url }) => {
const status = (url.searchParams.get('status') ?? undefined) as
| IndexingJob['status']
| undefined;
const limit = Math.min(parseInt(url.searchParams.get('limit') ?? '20', 10) || 20, 200);
const limit = Math.min(parseInt(url.searchParams.get('limit') ?? '20', 10) || 20, 1000);
const jobs = queue.listJobs({ repositoryId, status, limit });
const total = queue.countJobs({ repositoryId, status });

View File

@@ -55,6 +55,9 @@
// Active version indexing jobs: tag -> jobId
let activeVersionJobs = $state<Record<string, string | undefined>>({});
// Job progress data fed by the single shared poller (replaces per-version <IndexingProgress>).
let versionJobProgress = $state<Record<string, IndexingJob>>({});
// Remove confirm
let removeTag = $state<string | null>(null);
@@ -102,6 +105,66 @@
loadVersions();
});
// Single shared poller — one interval regardless of how many tags are active.
// This replaces the N per-version <IndexingProgress> components that each had
// their own setInterval, which caused ERR_INSUFFICIENT_RESOURCES and UI lockup
// when hundreds of tags were queued simultaneously.
$effect(() => {
const activeIds = new Set(
Object.values(activeVersionJobs).filter((id): id is string => !!id)
);
if (activeIds.size === 0) {
versionJobProgress = {};
return;
}
let stopped = false;
async function poll() {
if (stopped) return;
try {
const res = await fetch(
`/api/v1/jobs?repositoryId=${encodeURIComponent(repo.id)}&limit=1000`
);
if (!res.ok || stopped) return;
const d = await res.json();
// Build a jobId → job lookup from the response.
const map: Record<string, IndexingJob> = {};
for (const job of (d.jobs ?? []) as IndexingJob[]) {
map[job.id] = job;
}
if (!stopped) versionJobProgress = map;
// Retire completed jobs and trigger a single refresh.
let anyCompleted = false;
const nextJobs = { ...activeVersionJobs };
for (const [tag, jobId] of Object.entries(activeVersionJobs)) {
if (!jobId) continue;
const job = map[jobId];
if (job?.status === 'done' || job?.status === 'failed') {
delete nextJobs[tag];
anyCompleted = true;
}
}
if (anyCompleted && !stopped) {
activeVersionJobs = nextJobs;
void loadVersions();
void refreshRepo();
}
} catch {
// ignore transient errors
}
}
void poll();
const interval = setInterval(poll, 2000);
return () => {
stopped = true;
clearInterval(interval);
};
});
async function handleReindex() {
errorMessage = null;
successMessage = null;
@@ -265,23 +328,29 @@
errorMessage = null;
try {
const tags = [...selectedDiscoveredTags];
const responses = await Promise.all(
tags.map((tag) =>
fetch(`/api/v1/libs/${encodeURIComponent(repo.id)}/versions`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ tag, autoIndex: true })
})
)
);
const results = await Promise.all(responses.map((r) => (r.ok ? r.json() : null)));
const BATCH_SIZE = 5;
let next = { ...activeVersionJobs };
for (let i = 0; i < tags.length; i++) {
const result = results[i];
if (result?.job?.id) {
next = { ...next, [tags[i]]: result.job.id };
for (let i = 0; i < tags.length; i += BATCH_SIZE) {
const batch = tags.slice(i, i + BATCH_SIZE);
const responses = await Promise.all(
batch.map((tag) =>
fetch(`/api/v1/libs/${encodeURIComponent(repo.id)}/versions`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ tag, autoIndex: true })
})
)
);
const results = await Promise.all(responses.map((r) => (r.ok ? r.json() : null)));
for (let j = 0; j < batch.length; j++) {
const result = results[j];
if (result?.job?.id) {
next = { ...next, [batch[j]]: result.job.id };
}
}
}
activeVersionJobs = next;
showDiscoverPanel = false;
discoveredTags = [];
@@ -547,16 +616,23 @@
{/each}
</div>
{/if}
{#if !!activeVersionJobs[version.tag]}
<IndexingProgress
jobId={activeVersionJobs[version.tag]!}
oncomplete={() => {
const { [version.tag]: _, ...rest } = activeVersionJobs;
activeVersionJobs = rest;
loadVersions();
refreshRepo();
}}
/>
{#if activeVersionJobs[version.tag]}
{@const job = versionJobProgress[activeVersionJobs[version.tag]!]}
<div class="mt-2">
<div class="flex justify-between text-xs text-gray-500">
<span>{(job?.processedFiles ?? 0).toLocaleString()} / {(job?.totalFiles ?? 0).toLocaleString()} files</span>
<span>{job?.progress ?? 0}%</span>
</div>
<div class="mt-1 h-1.5 w-full rounded-full bg-gray-200">
<div
class="h-1.5 rounded-full bg-blue-600 transition-all duration-300"
style="width: {job?.progress ?? 0}%"
></div>
</div>
{#if job?.status === 'failed'}
<p class="mt-1 text-xs text-red-600">{job.error ?? 'Indexing failed.'}</p>
{/if}
</div>
{/if}
</div>
{/each}