diff --git a/src/lib/server/whisper.ts b/src/lib/server/whisper.ts index 4455ce9..32f442b 100644 --- a/src/lib/server/whisper.ts +++ b/src/lib/server/whisper.ts @@ -1,4 +1,32 @@ -import type { ModelStatus } from '$lib/types.js'; +import type { ModelStateTag, ModelStatus } from '$lib/types.js'; + +const MODEL_STATES = new Set(['unloaded', 'loading', 'waiting_for_gpu', 'ready']); + +function isModelStateTag(value: unknown): value is ModelStateTag { + return typeof value === 'string' && MODEL_STATES.has(value as ModelStateTag); +} + +function extractSseMessages(buffer: string): { messages: { eventType: string; data: string }[]; rest: string } { + const normalized = buffer.replace(/\r/g, ''); + const chunks = normalized.split('\n\n'); + const rest = chunks.pop() ?? ''; + const messages = chunks + .map((chunk) => { + let eventType = ''; + const dataLines: string[] = []; + for (const line of chunk.split('\n')) { + if (line.startsWith('event:')) { + eventType = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + dataLines.push(line.slice(5).trim()); + } + } + return { eventType, data: dataLines.join('\n') }; + }) + .filter((message) => message.data.length > 0); + + return { messages, rest }; +} function whisperUrl() { return process.env.WHISPER_URL ?? 'http://localhost:8080'; @@ -22,7 +50,10 @@ export async function getModelStatus(): Promise { * SSE connection fails or closes without that event, so the retry loop can * try again without hanging indefinitely. */ -async function waitForModelReady(timeoutMs: number): Promise { +async function waitForModelReady( + timeoutMs: number, + onStateChange?: (state: ModelStateTag) => void +): Promise { const { default: fetch } = await import('node-fetch'); const ac = new AbortController(); return new Promise((resolve) => { @@ -47,17 +78,18 @@ async function waitForModelReady(timeoutMs: number): Promise { for await (const chunk of res.body) { if (ac.signal.aborted) break; buf += chunk.toString(); - const lines = buf.split('\n'); - buf = lines.pop() ?? ''; - for (const line of lines) { - if (!line.startsWith('data:')) continue; + const { messages, rest } = extractSseMessages(buf); + buf = rest; + for (const message of messages) { try { - const payload = JSON.parse(line.slice(5).trim()); + const payload = JSON.parse(message.data) as { state?: unknown }; + if (!isModelStateTag(payload.state)) continue; if (payload.state === 'ready') { clearTimeout(timer); finish(); return; } + onStateChange?.(payload.state); } catch { /* ignore parse errors */ } } } @@ -83,7 +115,7 @@ export async function submitJob( wavPath: string, webhookUrl: string, language?: string, - onModelWaiting?: (state: string, retryAfterSecs: number) => void, + onModelWaiting?: (state: ModelStateTag, retryAfterSecs: number) => void, maxAttempts = 20 ): Promise { const FormData = (await import('form-data')).default; @@ -116,10 +148,15 @@ export async function submitJob( state?: string; retry_after_secs?: number; }; - const state = body.state ?? 'unloaded'; + const state = isModelStateTag(body.state) ? body.state : 'unloaded'; const waitSecs = body.retry_after_secs ?? parseInt(res.headers.get('Retry-After') ?? '15'); onModelWaiting?.(state, waitSecs); - await waitForModelReady((waitSecs + 1) * 1000); + let lastState = state; + await waitForModelReady((waitSecs + 1) * 1000, (nextState) => { + if (nextState === lastState) return; + lastState = nextState; + onModelWaiting?.(nextState, waitSecs); + }); continue; } @@ -169,30 +206,23 @@ export async function streamJob( let buf = ''; for await (const chunk of res.body) { buf += chunk.toString(); - const lines = buf.split('\n'); - buf = lines.pop() ?? ''; + const { messages, rest } = extractSseMessages(buf); + buf = rest; - let eventType = ''; - let dataLine = ''; - for (const line of lines) { - if (line.startsWith('event:')) eventType = line.slice(6).trim(); - else if (line.startsWith('data:')) dataLine = line.slice(5).trim(); + for (const message of messages) { + try { + const payload = JSON.parse(message.data); + if (payload.type === 'progress') { + onProgress(payload.percent ?? 0, payload.chunk ?? 0, payload.total ?? 0); + } else if (payload.type === 'done') { + onDone(); + return; + } else if (payload.type === 'error') { + onError(payload.message ?? 'unknown error'); + return; + } + } catch { /* ignore parse errors */ } } - - if (!dataLine) continue; - - try { - const payload = JSON.parse(dataLine); - if (payload.type === 'progress') { - onProgress(payload.percent ?? 0, payload.chunk ?? 0, payload.total ?? 0); - } else if (payload.type === 'done') { - onDone(); - return; - } else if (payload.type === 'error') { - onError(payload.message ?? 'unknown error'); - return; - } - } catch { /* ignore parse errors */ } } } diff --git a/src/routes/api/webhook/[jobId]/+server.ts b/src/routes/api/webhook/[jobId]/+server.ts index e1fdb56..320fd00 100644 --- a/src/routes/api/webhook/[jobId]/+server.ts +++ b/src/routes/api/webhook/[jobId]/+server.ts @@ -6,62 +6,85 @@ import { cleanupJobTmp } from '$lib/server/downloader.js'; import { emitProgress } from '$lib/server/pipeline.js'; import type { Segment, WhisperJob } from '$lib/types.js'; +const WHISPER_JOB_STATUSES = new Set([ + 'queued', + 'running', + 'done', + 'failed', + 'cancelled' +]); + +function isWhisperJobWebhook(payload: unknown): payload is WhisperJob { + if (!payload || typeof payload !== 'object') return false; + const candidate = payload as Record; + return ( + typeof candidate.id === 'string' && + typeof candidate.status === 'string' && + WHISPER_JOB_STATUSES.has(candidate.status as WhisperJob['status']) + ); +} + export async function POST({ params, request }) { -const jobId = params.jobId; -const job = getJob(jobId); -if (!job) throw error(404, 'Job not found'); + const jobId = params.jobId; + const job = getJob(jobId); + if (!job) throw error(404, 'Job not found'); -const whisperJob = (await request.json()) as WhisperJob; + const payload = (await request.json()) as unknown; + if (!isWhisperJobWebhook(payload)) { + // whisper-rtx2080 also fires model lifecycle events to registered job webhooks. + return json({ ok: true, ignored: 'not_a_job_event' }); + } + const whisperJob = payload; -// Discard the result if the job was cancelled locally while whisper was running -if (job.status === 'cancelled') { -return json({ ok: true }); -} - -// Ignore stale callbacks from a previous whisper job after a local retry/reset. -if (job.whisperJobId && whisperJob.id !== job.whisperJobId) { -return json({ ok: true, ignored: 'stale_whisper_job' }); -} - -// Ignore replayed success callbacks after the transcript is already persisted. -if (job.status === 'done' && job.segmentsJson) { -return json({ ok: true, ignored: 'duplicate_webhook' }); -} - -if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') { -const msg = whisperJob.error ?? `Whisper job ${whisperJob.status}`; -updateJob({ id: jobId, status: 'failed', error: msg }); -emitProgress(jobId, { type: 'error', message: msg }); -return json({ ok: true }); -} - -try { -setJobStatus(jobId, 'processing', 90); -emitProgress(jobId, { type: 'status', status: 'processing', progress: 90 }); - -const segments = (whisperJob.segments ?? []) as Segment[]; - -const paths = await writeOutputs(segments, job.title, jobId); -const outputDir = paths.srt.replace(/\/[^/]+$/, ''); - -updateJob({ -id: jobId, -status: 'done', -progress: 100, -segmentsJson: JSON.stringify(segments), -outputDir -}); - -emitProgress(jobId, { type: 'done', status: 'done' }); - -await sendNotification(jobId, '✅ Transcript ready', job.title); -await cleanupJobTmp(jobId); - -return json({ ok: true }); -} catch (err: unknown) { -const message = err instanceof Error ? err.message : String(err); -updateJob({ id: jobId, status: 'failed', error: message }); -emitProgress(jobId, { type: 'error', message }); -return json({ ok: false, error: message }, { status: 500 }); -} + // Discard the result if the job was cancelled locally while whisper was running + if (job.status === 'cancelled') { + return json({ ok: true }); + } + + // Ignore stale callbacks from a previous whisper job after a local retry/reset. + if (job.whisperJobId && whisperJob.id !== job.whisperJobId) { + return json({ ok: true, ignored: 'stale_whisper_job' }); + } + + // Ignore replayed success callbacks after the transcript is already persisted. + if (job.status === 'done' && job.segmentsJson) { + return json({ ok: true, ignored: 'duplicate_webhook' }); + } + + if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') { + const msg = whisperJob.error ?? `Whisper job ${whisperJob.status}`; + updateJob({ id: jobId, status: 'failed', error: msg }); + emitProgress(jobId, { type: 'error', message: msg }); + return json({ ok: true }); + } + + try { + setJobStatus(jobId, 'processing', 90); + emitProgress(jobId, { type: 'status', status: 'processing', progress: 90 }); + + const segments = (whisperJob.segments ?? []) as Segment[]; + + const paths = await writeOutputs(segments, job.title, jobId); + const outputDir = paths.srt.replace(/\/[^/]+$/, ''); + + updateJob({ + id: jobId, + status: 'done', + progress: 100, + segmentsJson: JSON.stringify(segments), + outputDir + }); + + emitProgress(jobId, { type: 'done', status: 'done' }); + + await sendNotification(jobId, '✅ Transcript ready', job.title); + await cleanupJobTmp(jobId); + + return json({ ok: true }); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + updateJob({ id: jobId, status: 'failed', error: message }); + emitProgress(jobId, { type: 'error', message }); + return json({ ok: false, error: message }, { status: 500 }); + } } diff --git a/src/tests/webhook.test.ts b/src/tests/webhook.test.ts index 8e7982f..282d4c3 100644 --- a/src/tests/webhook.test.ts +++ b/src/tests/webhook.test.ts @@ -106,6 +106,64 @@ describe('POST /api/webhook/[jobId] — job not found', () => { }); }); +// ── Ignore backend model lifecycle webhooks ───────────────────────────────────── + +describe('POST /api/webhook/[jobId] — non-job webhook payloads', () => { + it('ignores model_ready events sent to job webhooks', async () => { + mockGetJob.mockReturnValue(makeJob('job-model-ready')); + + const res = await POST( + makeEvent('job-model-ready', { + type: 'model_ready', + loaded_at: new Date().toISOString() + }) as any + ); + + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ ok: true, ignored: 'not_a_job_event' }); + expect(mockSetJobStatus).not.toHaveBeenCalled(); + expect(mockUpdateJob).not.toHaveBeenCalled(); + expect(mockWriteOutputs).not.toHaveBeenCalled(); + expect(mockSendNotification).not.toHaveBeenCalled(); + }); + + it('ignores model_unloaded events sent to job webhooks', async () => { + mockGetJob.mockReturnValue(makeJob('job-model-unloaded')); + + const res = await POST( + makeEvent('job-model-unloaded', { + type: 'model_unloaded', + unloaded_at: new Date().toISOString() + }) as any + ); + + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ ok: true, ignored: 'not_a_job_event' }); + expect(mockSetJobStatus).not.toHaveBeenCalled(); + expect(mockUpdateJob).not.toHaveBeenCalled(); + expect(mockWriteOutputs).not.toHaveBeenCalled(); + expect(mockSendNotification).not.toHaveBeenCalled(); + }); + + it('ignores payloads with invalid status values', async () => { + mockGetJob.mockReturnValue(makeJob('job-invalid-status')); + + const res = await POST( + makeEvent('job-invalid-status', { + id: 'bogus-whisper-id', + status: 'model_ready', + segments: [] + }) as any + ); + + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ ok: true, ignored: 'not_a_job_event' }); + expect(mockSetJobStatus).not.toHaveBeenCalled(); + expect(mockUpdateJob).not.toHaveBeenCalled(); + expect(mockWriteOutputs).not.toHaveBeenCalled(); + }); +}); + // ── Local cancellation guard ────────────────────────────────────────────────── describe('POST /api/webhook/[jobId] — locally cancelled job', () => { diff --git a/src/tests/whisper.test.ts b/src/tests/whisper.test.ts index caa1683..e420870 100644 --- a/src/tests/whisper.test.ts +++ b/src/tests/whisper.test.ts @@ -354,6 +354,32 @@ describe('submitJob — SSE-triggered retry', () => { const id = await submitJob('/tmp/audio.wav', 'http://host/webhook'); expect(id).toBe('stream-closed-id'); }); + + it('relays intermediate model states from /model/events while waiting to retry', async () => { + let jobCallIdx = 0; + mocks.fetch.mockImplementation((url: string) => { + if (String(url).includes('/model/events')) { + return Promise.resolve({ + ok: true, + status: 200, + body: Readable.from([ + 'data: {"state":"loading"}\n\ndata: {"state":"waiting_for_gpu"}\n\ndata: {"state":"ready"}\n\n' + ]) + }); + } + jobCallIdx++; + if (jobCallIdx === 1) return Promise.resolve(make503('unloaded', 30)); + return Promise.resolve(make202('state-relay-id')); + }); + + const onModelWaiting = vi.fn(); + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); + expect(id).toBe('state-relay-id'); + expect(onModelWaiting).toHaveBeenNthCalledWith(1, 'unloaded', 30); + expect(onModelWaiting).toHaveBeenNthCalledWith(2, 'loading', 30); + expect(onModelWaiting).toHaveBeenNthCalledWith(3, 'waiting_for_gpu', 30); + expect(onModelWaiting).toHaveBeenCalledTimes(3); + }); }); // ── unloadModel ─────────────────────────────────────────────────────────────── @@ -509,6 +535,10 @@ function makeSSEResponse(lines: string[]) { return { ok: true, body }; } +function makeSSEChunkResponse(chunks: string[]) { + return { ok: true, body: Readable.from(chunks) }; +} + describe('streamJob — SSE event parsing', () => { it('calls onProgress for progress events with percent, chunk, total', async () => { const onProgress = vi.fn(); @@ -602,6 +632,27 @@ describe('streamJob — SSE event parsing', () => { expect(onProgress).toHaveBeenNthCalledWith(3, 75, 3, 4); }); + it('handles multiple SSE events delivered in a single chunk', async () => { + const onProgress = vi.fn(); + const onDone = vi.fn(); + const onError = vi.fn(); + + mocks.fetch.mockResolvedValue( + makeSSEChunkResponse([ + 'data: {"type":"progress","percent":25,"chunk":1,"total":2}\n\n' + + 'data: {"type":"progress","percent":50,"chunk":2,"total":2}\n\n' + + 'data: {"type":"done","job":{}}\n\n' + ]) + ); + + await streamJob('whisper-id', onProgress, onDone, onError); + expect(onProgress).toHaveBeenCalledTimes(2); + expect(onProgress).toHaveBeenNthCalledWith(1, 25, 1, 2); + expect(onProgress).toHaveBeenNthCalledWith(2, 50, 2, 2); + expect(onDone).toHaveBeenCalledOnce(); + expect(onError).not.toHaveBeenCalled(); + }); + it('defaults chunk and total to 0 when missing from progress event', async () => { const onProgress = vi.fn(); const onDone = vi.fn();