/** * Integration tests for SSE streaming endpoints and the indexing settings API * (TRUEREF-0022). * * Uses the same mock / in-memory DB pattern as api-contract.integration.test.ts. */ import { beforeEach, describe, expect, it, vi } from 'vitest'; import Database from 'better-sqlite3'; import { readFileSync } from 'node:fs'; import { join } from 'node:path'; import type { ProgressBroadcaster as BroadcasterType } from '$lib/server/pipeline/progress-broadcaster.js'; // --------------------------------------------------------------------------- // Module-level mocks (must be hoisted to the top of the file) // --------------------------------------------------------------------------- let db: Database.Database; // Closed over by the vi.mock factory below. let mockBroadcaster: BroadcasterType | null = null; vi.mock('$lib/server/db/client', () => ({ getClient: () => db })); vi.mock('$lib/server/db/client.js', () => ({ getClient: () => db })); vi.mock('$lib/server/pipeline/startup', () => ({ getQueue: () => null, getPool: () => null })); vi.mock('$lib/server/pipeline/startup.js', () => ({ getQueue: () => null, getPool: () => null })); vi.mock('$lib/server/pipeline/progress-broadcaster', async (importOriginal) => { const original = await importOriginal(); return { ...original, getBroadcaster: () => mockBroadcaster }; }); vi.mock('$lib/server/pipeline/progress-broadcaster.js', async (importOriginal) => { const original = await importOriginal(); return { ...original, getBroadcaster: () => mockBroadcaster }; }); // --------------------------------------------------------------------------- // Imports (after mocks are registered) // --------------------------------------------------------------------------- import { ProgressBroadcaster } from '$lib/server/pipeline/progress-broadcaster.js'; import { GET as getJobStream } from './jobs/[id]/stream/+server.js'; import { GET as getJobsStream } from './jobs/stream/+server.js'; import { GET as getIndexingSettings, PUT as putIndexingSettings } from './settings/indexing/+server.js'; // --------------------------------------------------------------------------- // DB factory // --------------------------------------------------------------------------- function createTestDb(): Database.Database { const client = new Database(':memory:'); client.pragma('foreign_keys = ON'); const migrationsFolder = join(import.meta.dirname, '../../../lib/server/db/migrations'); for (const migrationFile of [ '0000_large_master_chief.sql', '0001_quick_nighthawk.sql', '0002_silky_stellaris.sql', '0003_multiversion_config.sql', '0004_complete_sentry.sql', '0005_fix_stage_defaults.sql' ]) { const sql = readFileSync(join(migrationsFolder, migrationFile), 'utf-8'); for (const stmt of sql.split('--> statement-breakpoint').map((s) => s.trim()).filter(Boolean)) { client.exec(stmt); } } return client; } // --------------------------------------------------------------------------- // Fixtures // --------------------------------------------------------------------------- const NOW_S = Math.floor(Date.now() / 1000); function seedRepo(client: Database.Database, id = '/test/repo'): string { client .prepare( `INSERT INTO repositories (id, title, source, source_url, state, created_at, updated_at) VALUES (?, 'Test Repo', 'local', '/tmp/repo', 'indexed', ?, ?)` ) .run(id, NOW_S, NOW_S); return id; } function seedJob( client: Database.Database, overrides: Partial<{ id: string; repository_id: string; status: string; stage: string; progress: number; total_files: number; processed_files: number; error: string | null; }> = {} ): string { const id = overrides.id ?? crypto.randomUUID(); client .prepare( `INSERT INTO indexing_jobs (id, repository_id, version_id, status, progress, total_files, processed_files, stage, stage_detail, error, started_at, completed_at, created_at) VALUES (?, ?, null, ?, ?, ?, ?, ?, null, ?, null, null, ?)` ) .run( id, overrides.repository_id ?? '/test/repo', overrides.status ?? 'queued', overrides.progress ?? 0, overrides.total_files ?? 0, overrides.processed_files ?? 0, overrides.stage ?? 'queued', overrides.error ?? null, NOW_S ); return id; } /** Build a minimal SvelteKit-compatible RequestEvent for SSE handlers. */ function makeEvent[0]>(opts: { params?: Record; url?: string; headers?: Record; body?: unknown; }): T { const url = new URL(opts.url ?? 'http://localhost/api/v1/jobs/test/stream'); const headers = new Headers(opts.headers ?? {}); return { params: opts.params ?? {}, url, request: new Request(url.toString(), { method: opts.body ? 'PUT' : 'GET', headers, body: opts.body ? JSON.stringify(opts.body) : undefined }), route: { id: null }, locals: {}, platform: undefined, cookies: {} as never, fetch: fetch, getClientAddress: () => '127.0.0.1', setHeaders: vi.fn(), isDataRequest: false, isSubRequest: false, depends: vi.fn(), untrack: vi.fn() } as unknown as T; } // --------------------------------------------------------------------------- // Helper: read first chunk from a response body // --------------------------------------------------------------------------- async function readFirstChunk(response: Response): Promise { const reader = response.body?.getReader(); if (!reader) throw new Error('Response has no body'); const { value } = await reader.read(); reader.releaseLock(); // Stream enqueues strings directly — no TextDecoder needed return String(value ?? ''); } // --------------------------------------------------------------------------- // Test group 1: GET /api/v1/jobs/:id/stream // --------------------------------------------------------------------------- describe('GET /api/v1/jobs/:id/stream', () => { beforeEach(() => { db = createTestDb(); mockBroadcaster = new ProgressBroadcaster(); }); it('returns 404 when the job does not exist', async () => { seedRepo(db); const response = await getJobStream( makeEvent({ params: { id: 'non-existent-job-id' } }) ); expect(response.status).toBe(404); }); it('returns 503 when broadcaster is not initialized', async () => { mockBroadcaster = null; seedRepo(db); const jobId = seedJob(db); const response = await getJobStream(makeEvent({ params: { id: jobId } })); expect(response.status).toBe(503); }); it('returns 200 with Content-Type: text/event-stream', async () => { seedRepo(db); const jobId = seedJob(db); const response = await getJobStream(makeEvent({ params: { id: jobId } })); expect(response.status).toBe(200); expect(response.headers.get('Content-Type')).toContain('text/event-stream'); }); it('first chunk contains the initial job state as a data event', async () => { seedRepo(db); const jobId = seedJob(db, { status: 'running', progress: 42 }); const response = await getJobStream(makeEvent({ params: { id: jobId } })); const text = await readFirstChunk(response); expect(text).toContain('data:'); // The initial event carries jobId and status expect(text).toContain(jobId); expect(text).toContain('running'); }); it('closes the stream immediately when job status is "done"', async () => { seedRepo(db); const jobId = seedJob(db, { status: 'done' }); const response = await getJobStream(makeEvent({ params: { id: jobId } })); expect(response.status).toBe(200); // Read both chunks until done const reader = response.body!.getReader(); let fullText = ''; let iterations = 0; while (iterations < 10) { const { done, value } = await reader.read(); if (done) break; fullText += String(value ?? ''); iterations++; } // Stream should close without blocking (done=true was reached) expect(fullText).toContain(jobId); }); it('closes the stream immediately when job status is "failed"', async () => { seedRepo(db); const jobId = seedJob(db, { status: 'failed', error: 'something went wrong' }); const response = await getJobStream(makeEvent({ params: { id: jobId } })); expect(response.status).toBe(200); const reader = response.body!.getReader(); let fullText = ''; let iterations = 0; while (iterations < 10) { const { done, value } = await reader.read(); if (done) break; fullText += String(value ?? ''); iterations++; } expect(fullText).toContain('failed'); }); it('replays last cached event when Last-Event-ID header is provided', async () => { seedRepo(db); const jobId = seedJob(db, { status: 'running' }); // Pre-seed a cached event in the broadcaster mockBroadcaster!.broadcast(jobId, '/test/repo', 'progress', { stage: 'parsing', progress: 50 }); const response = await getJobStream( makeEvent({ params: { id: jobId }, headers: { 'Last-Event-ID': '1' } }) ); expect(response.status).toBe(200); // Consume enough to get both initial state and replay const reader = response.body!.getReader(); let fullText = ''; // Read two chunks for (let i = 0; i < 2; i++) { const { done, value } = await reader.read(); if (done) break; fullText += String(value ?? ''); } reader.releaseLock(); // The replay event should include the cached event data expect(fullText).toContain('progress'); }); }); // --------------------------------------------------------------------------- // Test group 2: GET /api/v1/jobs/stream // --------------------------------------------------------------------------- describe('GET /api/v1/jobs/stream', () => { beforeEach(() => { db = createTestDb(); mockBroadcaster = new ProgressBroadcaster(); }); it('returns 200 with Content-Type: text/event-stream', async () => { const response = await getJobsStream( makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream' }) ); expect(response.status).toBe(200); expect(response.headers.get('Content-Type')).toContain('text/event-stream'); }); it('returns 503 when broadcaster is not initialized', async () => { mockBroadcaster = null; const response = await getJobsStream( makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream' }) ); expect(response.status).toBe(503); }); it('uses subscribeRepository when ?repositoryId= is provided', async () => { const subscribeSpy = vi.spyOn(mockBroadcaster!, 'subscribeRepository'); await getJobsStream( makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream?repositoryId=/test/repo' }) ); expect(subscribeSpy).toHaveBeenCalledWith('/test/repo'); }); it('uses subscribeAll when no repositoryId query param is present', async () => { const subscribeSpy = vi.spyOn(mockBroadcaster!, 'subscribeAll'); await getJobsStream( makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream' }) ); expect(subscribeSpy).toHaveBeenCalled(); }); it('broadcasts to stream subscribers for the correct repository', async () => { seedRepo(db, '/repo/alpha'); const response = await getJobsStream( makeEvent[0]>({ url: 'http://localhost/api/v1/jobs/stream?repositoryId=/repo/alpha' }) ); // Broadcast an event for this repository mockBroadcaster!.broadcast('job-123', '/repo/alpha', 'progress', { stage: 'parsing' }); const reader = response.body!.getReader(); const { value } = await reader.read(); const text = String(value ?? ''); reader.releaseLock(); expect(text).toContain('progress'); }); }); // --------------------------------------------------------------------------- // Test group 3: GET /api/v1/settings/indexing // --------------------------------------------------------------------------- describe('GET /api/v1/settings/indexing', () => { beforeEach(() => { db = createTestDb(); }); it('returns { concurrency: 2 } when no setting exists in DB', async () => { const response = await getIndexingSettings(makeEvent[0]>({})); const body = await response.json(); expect(response.status).toBe(200); expect(body).toEqual({ concurrency: 2 }); }); it('returns the stored concurrency when a setting exists', async () => { db.prepare( "INSERT INTO settings (key, value, updated_at) VALUES ('indexing.concurrency', ?, ?)" ).run(JSON.stringify(4), NOW_S); const response = await getIndexingSettings(makeEvent[0]>({})); const body = await response.json(); expect(body.concurrency).toBe(4); }); it('parses JSON-wrapped value correctly: {"value": 5}', async () => { db.prepare( "INSERT INTO settings (key, value, updated_at) VALUES ('indexing.concurrency', ?, ?)" ).run(JSON.stringify({ value: 5 }), NOW_S); const response = await getIndexingSettings(makeEvent[0]>({})); const body = await response.json(); expect(body.concurrency).toBe(5); }); }); // --------------------------------------------------------------------------- // Test group 4: PUT /api/v1/settings/indexing // --------------------------------------------------------------------------- describe('PUT /api/v1/settings/indexing', () => { beforeEach(() => { db = createTestDb(); }); function makePutEvent(body: unknown) { const url = new URL('http://localhost/api/v1/settings/indexing'); return { params: {}, url, request: new Request(url.toString(), { method: 'PUT', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) }), route: { id: null }, locals: {}, platform: undefined, cookies: {} as never, fetch: fetch, getClientAddress: () => '127.0.0.1', setHeaders: vi.fn(), isDataRequest: false, isSubRequest: false, depends: vi.fn(), untrack: vi.fn() } as unknown as Parameters[0]; } it('returns 200 with { concurrency } for a valid integer input', async () => { const response = await putIndexingSettings(makePutEvent({ concurrency: 3 })); const body = await response.json(); expect(response.status).toBe(200); expect(body.concurrency).toBe(3); }); it('persists the new concurrency to the settings table', async () => { await putIndexingSettings(makePutEvent({ concurrency: 3 })); const row = db .prepare<[], { value: string }>( "SELECT value FROM settings WHERE key = 'indexing.concurrency'" ) .get(); expect(row).toBeDefined(); const parsed = JSON.parse(row!.value); expect(parsed.value).toBe(3); }); it('clamps to minimum of 1', async () => { const response = await putIndexingSettings(makePutEvent({ concurrency: 0 })); const body = await response.json(); expect(body.concurrency).toBeGreaterThanOrEqual(1); }); it('clamps to maximum of max(cpus-1, 1)', async () => { // Pass an absurdly large value; it must be clamped const response = await putIndexingSettings(makePutEvent({ concurrency: 99999 })); const body = await response.json(); const os = await import('node:os'); const expectedMax = Math.max(os.cpus().length - 1, 1); expect(body.concurrency).toBeLessThanOrEqual(expectedMax); }); it('returns 400 for NaN concurrency (non-numeric string)', async () => { // parseInt('abc', 10) is NaN → should return 400 // However, the implementation uses `parseInt(String(body.concurrency ?? 2), 10)` // and then checks isNaN — but the isNaN check is AFTER the Math.max/min clamping. // The actual flow: parseInt('abc') => NaN, Math.max(1, Math.min(NaN, max)) => NaN, // then `if (isNaN(concurrency))` returns 400. // We pass the raw string directly. const response = await putIndexingSettings( makePutEvent({ concurrency: 'not-a-number' }) ); // parseInt('not-a-number') = NaN, so the handler should return 400 expect(response.status).toBe(400); }); it('uses concurrency=2 as default when body.concurrency is missing', async () => { const response = await putIndexingSettings(makePutEvent({})); const body = await response.json(); // Default is 2 per the code: `body.concurrency ?? 2` expect(body.concurrency).toBe(2); }); });