fix(whisper): handle model warmup events
All checks were successful
Build & Push Docker Image / test (push) Successful in 12s
Build & Push Docker Image / build-and-push (push) Successful in 52s

- Ignore backend model lifecycle webhooks so model warmup does not
  mark jobs done early
- Parse batched SSE messages and relay model load states during
  submit retries

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-05-15 00:08:32 +02:00
parent f70cefc5e9
commit 1072679360
4 changed files with 249 additions and 87 deletions

View File

@@ -1,4 +1,32 @@
import type { ModelStatus } from '$lib/types.js'; import type { ModelStateTag, ModelStatus } from '$lib/types.js';
const MODEL_STATES = new Set<ModelStateTag>(['unloaded', 'loading', 'waiting_for_gpu', 'ready']);
function isModelStateTag(value: unknown): value is ModelStateTag {
return typeof value === 'string' && MODEL_STATES.has(value as ModelStateTag);
}
function extractSseMessages(buffer: string): { messages: { eventType: string; data: string }[]; rest: string } {
const normalized = buffer.replace(/\r/g, '');
const chunks = normalized.split('\n\n');
const rest = chunks.pop() ?? '';
const messages = chunks
.map((chunk) => {
let eventType = '';
const dataLines: string[] = [];
for (const line of chunk.split('\n')) {
if (line.startsWith('event:')) {
eventType = line.slice(6).trim();
} else if (line.startsWith('data:')) {
dataLines.push(line.slice(5).trim());
}
}
return { eventType, data: dataLines.join('\n') };
})
.filter((message) => message.data.length > 0);
return { messages, rest };
}
function whisperUrl() { function whisperUrl() {
return process.env.WHISPER_URL ?? 'http://localhost:8080'; return process.env.WHISPER_URL ?? 'http://localhost:8080';
@@ -22,7 +50,10 @@ export async function getModelStatus(): Promise<ModelStatus> {
* SSE connection fails or closes without that event, so the retry loop can * SSE connection fails or closes without that event, so the retry loop can
* try again without hanging indefinitely. * try again without hanging indefinitely.
*/ */
async function waitForModelReady(timeoutMs: number): Promise<void> { async function waitForModelReady(
timeoutMs: number,
onStateChange?: (state: ModelStateTag) => void
): Promise<void> {
const { default: fetch } = await import('node-fetch'); const { default: fetch } = await import('node-fetch');
const ac = new AbortController(); const ac = new AbortController();
return new Promise((resolve) => { return new Promise((resolve) => {
@@ -47,17 +78,18 @@ async function waitForModelReady(timeoutMs: number): Promise<void> {
for await (const chunk of res.body) { for await (const chunk of res.body) {
if (ac.signal.aborted) break; if (ac.signal.aborted) break;
buf += chunk.toString(); buf += chunk.toString();
const lines = buf.split('\n'); const { messages, rest } = extractSseMessages(buf);
buf = lines.pop() ?? ''; buf = rest;
for (const line of lines) { for (const message of messages) {
if (!line.startsWith('data:')) continue;
try { try {
const payload = JSON.parse(line.slice(5).trim()); const payload = JSON.parse(message.data) as { state?: unknown };
if (!isModelStateTag(payload.state)) continue;
if (payload.state === 'ready') { if (payload.state === 'ready') {
clearTimeout(timer); clearTimeout(timer);
finish(); finish();
return; return;
} }
onStateChange?.(payload.state);
} catch { /* ignore parse errors */ } } catch { /* ignore parse errors */ }
} }
} }
@@ -83,7 +115,7 @@ export async function submitJob(
wavPath: string, wavPath: string,
webhookUrl: string, webhookUrl: string,
language?: string, language?: string,
onModelWaiting?: (state: string, retryAfterSecs: number) => void, onModelWaiting?: (state: ModelStateTag, retryAfterSecs: number) => void,
maxAttempts = 20 maxAttempts = 20
): Promise<string> { ): Promise<string> {
const FormData = (await import('form-data')).default; const FormData = (await import('form-data')).default;
@@ -116,10 +148,15 @@ export async function submitJob(
state?: string; state?: string;
retry_after_secs?: number; retry_after_secs?: number;
}; };
const state = body.state ?? 'unloaded'; const state = isModelStateTag(body.state) ? body.state : 'unloaded';
const waitSecs = body.retry_after_secs ?? parseInt(res.headers.get('Retry-After') ?? '15'); const waitSecs = body.retry_after_secs ?? parseInt(res.headers.get('Retry-After') ?? '15');
onModelWaiting?.(state, waitSecs); onModelWaiting?.(state, waitSecs);
await waitForModelReady((waitSecs + 1) * 1000); let lastState = state;
await waitForModelReady((waitSecs + 1) * 1000, (nextState) => {
if (nextState === lastState) return;
lastState = nextState;
onModelWaiting?.(nextState, waitSecs);
});
continue; continue;
} }
@@ -169,20 +206,12 @@ export async function streamJob(
let buf = ''; let buf = '';
for await (const chunk of res.body) { for await (const chunk of res.body) {
buf += chunk.toString(); buf += chunk.toString();
const lines = buf.split('\n'); const { messages, rest } = extractSseMessages(buf);
buf = lines.pop() ?? ''; buf = rest;
let eventType = '';
let dataLine = '';
for (const line of lines) {
if (line.startsWith('event:')) eventType = line.slice(6).trim();
else if (line.startsWith('data:')) dataLine = line.slice(5).trim();
}
if (!dataLine) continue;
for (const message of messages) {
try { try {
const payload = JSON.parse(dataLine); const payload = JSON.parse(message.data);
if (payload.type === 'progress') { if (payload.type === 'progress') {
onProgress(payload.percent ?? 0, payload.chunk ?? 0, payload.total ?? 0); onProgress(payload.percent ?? 0, payload.chunk ?? 0, payload.total ?? 0);
} else if (payload.type === 'done') { } else if (payload.type === 'done') {
@@ -194,6 +223,7 @@ export async function streamJob(
} }
} catch { /* ignore parse errors */ } } catch { /* ignore parse errors */ }
} }
}
} }
/** Check if the whisper server is healthy. */ /** Check if the whisper server is healthy. */

View File

@@ -6,62 +6,85 @@ import { cleanupJobTmp } from '$lib/server/downloader.js';
import { emitProgress } from '$lib/server/pipeline.js'; import { emitProgress } from '$lib/server/pipeline.js';
import type { Segment, WhisperJob } from '$lib/types.js'; import type { Segment, WhisperJob } from '$lib/types.js';
const WHISPER_JOB_STATUSES = new Set<WhisperJob['status']>([
'queued',
'running',
'done',
'failed',
'cancelled'
]);
function isWhisperJobWebhook(payload: unknown): payload is WhisperJob {
if (!payload || typeof payload !== 'object') return false;
const candidate = payload as Record<string, unknown>;
return (
typeof candidate.id === 'string' &&
typeof candidate.status === 'string' &&
WHISPER_JOB_STATUSES.has(candidate.status as WhisperJob['status'])
);
}
export async function POST({ params, request }) { export async function POST({ params, request }) {
const jobId = params.jobId; const jobId = params.jobId;
const job = getJob(jobId); const job = getJob(jobId);
if (!job) throw error(404, 'Job not found'); if (!job) throw error(404, 'Job not found');
const whisperJob = (await request.json()) as WhisperJob; const payload = (await request.json()) as unknown;
if (!isWhisperJobWebhook(payload)) {
// whisper-rtx2080 also fires model lifecycle events to registered job webhooks.
return json({ ok: true, ignored: 'not_a_job_event' });
}
const whisperJob = payload;
// Discard the result if the job was cancelled locally while whisper was running // Discard the result if the job was cancelled locally while whisper was running
if (job.status === 'cancelled') { if (job.status === 'cancelled') {
return json({ ok: true }); return json({ ok: true });
} }
// Ignore stale callbacks from a previous whisper job after a local retry/reset. // Ignore stale callbacks from a previous whisper job after a local retry/reset.
if (job.whisperJobId && whisperJob.id !== job.whisperJobId) { if (job.whisperJobId && whisperJob.id !== job.whisperJobId) {
return json({ ok: true, ignored: 'stale_whisper_job' }); return json({ ok: true, ignored: 'stale_whisper_job' });
} }
// Ignore replayed success callbacks after the transcript is already persisted. // Ignore replayed success callbacks after the transcript is already persisted.
if (job.status === 'done' && job.segmentsJson) { if (job.status === 'done' && job.segmentsJson) {
return json({ ok: true, ignored: 'duplicate_webhook' }); return json({ ok: true, ignored: 'duplicate_webhook' });
} }
if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') { if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') {
const msg = whisperJob.error ?? `Whisper job ${whisperJob.status}`; const msg = whisperJob.error ?? `Whisper job ${whisperJob.status}`;
updateJob({ id: jobId, status: 'failed', error: msg }); updateJob({ id: jobId, status: 'failed', error: msg });
emitProgress(jobId, { type: 'error', message: msg }); emitProgress(jobId, { type: 'error', message: msg });
return json({ ok: true }); return json({ ok: true });
} }
try { try {
setJobStatus(jobId, 'processing', 90); setJobStatus(jobId, 'processing', 90);
emitProgress(jobId, { type: 'status', status: 'processing', progress: 90 }); emitProgress(jobId, { type: 'status', status: 'processing', progress: 90 });
const segments = (whisperJob.segments ?? []) as Segment[]; const segments = (whisperJob.segments ?? []) as Segment[];
const paths = await writeOutputs(segments, job.title, jobId); const paths = await writeOutputs(segments, job.title, jobId);
const outputDir = paths.srt.replace(/\/[^/]+$/, ''); const outputDir = paths.srt.replace(/\/[^/]+$/, '');
updateJob({ updateJob({
id: jobId, id: jobId,
status: 'done', status: 'done',
progress: 100, progress: 100,
segmentsJson: JSON.stringify(segments), segmentsJson: JSON.stringify(segments),
outputDir outputDir
}); });
emitProgress(jobId, { type: 'done', status: 'done' }); emitProgress(jobId, { type: 'done', status: 'done' });
await sendNotification(jobId, '✅ Transcript ready', job.title); await sendNotification(jobId, '✅ Transcript ready', job.title);
await cleanupJobTmp(jobId); await cleanupJobTmp(jobId);
return json({ ok: true }); return json({ ok: true });
} catch (err: unknown) { } catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err); const message = err instanceof Error ? err.message : String(err);
updateJob({ id: jobId, status: 'failed', error: message }); updateJob({ id: jobId, status: 'failed', error: message });
emitProgress(jobId, { type: 'error', message }); emitProgress(jobId, { type: 'error', message });
return json({ ok: false, error: message }, { status: 500 }); return json({ ok: false, error: message }, { status: 500 });
} }
} }

View File

@@ -106,6 +106,64 @@ describe('POST /api/webhook/[jobId] — job not found', () => {
}); });
}); });
// ── Ignore backend model lifecycle webhooks ─────────────────────────────────────
describe('POST /api/webhook/[jobId] — non-job webhook payloads', () => {
it('ignores model_ready events sent to job webhooks', async () => {
mockGetJob.mockReturnValue(makeJob('job-model-ready'));
const res = await POST(
makeEvent('job-model-ready', {
type: 'model_ready',
loaded_at: new Date().toISOString()
}) as any
);
expect(res.status).toBe(200);
expect(await res.json()).toEqual({ ok: true, ignored: 'not_a_job_event' });
expect(mockSetJobStatus).not.toHaveBeenCalled();
expect(mockUpdateJob).not.toHaveBeenCalled();
expect(mockWriteOutputs).not.toHaveBeenCalled();
expect(mockSendNotification).not.toHaveBeenCalled();
});
it('ignores model_unloaded events sent to job webhooks', async () => {
mockGetJob.mockReturnValue(makeJob('job-model-unloaded'));
const res = await POST(
makeEvent('job-model-unloaded', {
type: 'model_unloaded',
unloaded_at: new Date().toISOString()
}) as any
);
expect(res.status).toBe(200);
expect(await res.json()).toEqual({ ok: true, ignored: 'not_a_job_event' });
expect(mockSetJobStatus).not.toHaveBeenCalled();
expect(mockUpdateJob).not.toHaveBeenCalled();
expect(mockWriteOutputs).not.toHaveBeenCalled();
expect(mockSendNotification).not.toHaveBeenCalled();
});
it('ignores payloads with invalid status values', async () => {
mockGetJob.mockReturnValue(makeJob('job-invalid-status'));
const res = await POST(
makeEvent('job-invalid-status', {
id: 'bogus-whisper-id',
status: 'model_ready',
segments: []
}) as any
);
expect(res.status).toBe(200);
expect(await res.json()).toEqual({ ok: true, ignored: 'not_a_job_event' });
expect(mockSetJobStatus).not.toHaveBeenCalled();
expect(mockUpdateJob).not.toHaveBeenCalled();
expect(mockWriteOutputs).not.toHaveBeenCalled();
});
});
// ── Local cancellation guard ────────────────────────────────────────────────── // ── Local cancellation guard ──────────────────────────────────────────────────
describe('POST /api/webhook/[jobId] — locally cancelled job', () => { describe('POST /api/webhook/[jobId] — locally cancelled job', () => {

View File

@@ -354,6 +354,32 @@ describe('submitJob — SSE-triggered retry', () => {
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook'); const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
expect(id).toBe('stream-closed-id'); expect(id).toBe('stream-closed-id');
}); });
it('relays intermediate model states from /model/events while waiting to retry', async () => {
let jobCallIdx = 0;
mocks.fetch.mockImplementation((url: string) => {
if (String(url).includes('/model/events')) {
return Promise.resolve({
ok: true,
status: 200,
body: Readable.from([
'data: {"state":"loading"}\n\ndata: {"state":"waiting_for_gpu"}\n\ndata: {"state":"ready"}\n\n'
])
});
}
jobCallIdx++;
if (jobCallIdx === 1) return Promise.resolve(make503('unloaded', 30));
return Promise.resolve(make202('state-relay-id'));
});
const onModelWaiting = vi.fn();
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting);
expect(id).toBe('state-relay-id');
expect(onModelWaiting).toHaveBeenNthCalledWith(1, 'unloaded', 30);
expect(onModelWaiting).toHaveBeenNthCalledWith(2, 'loading', 30);
expect(onModelWaiting).toHaveBeenNthCalledWith(3, 'waiting_for_gpu', 30);
expect(onModelWaiting).toHaveBeenCalledTimes(3);
});
}); });
// ── unloadModel ─────────────────────────────────────────────────────────────── // ── unloadModel ───────────────────────────────────────────────────────────────
@@ -509,6 +535,10 @@ function makeSSEResponse(lines: string[]) {
return { ok: true, body }; return { ok: true, body };
} }
function makeSSEChunkResponse(chunks: string[]) {
return { ok: true, body: Readable.from(chunks) };
}
describe('streamJob — SSE event parsing', () => { describe('streamJob — SSE event parsing', () => {
it('calls onProgress for progress events with percent, chunk, total', async () => { it('calls onProgress for progress events with percent, chunk, total', async () => {
const onProgress = vi.fn(); const onProgress = vi.fn();
@@ -602,6 +632,27 @@ describe('streamJob — SSE event parsing', () => {
expect(onProgress).toHaveBeenNthCalledWith(3, 75, 3, 4); expect(onProgress).toHaveBeenNthCalledWith(3, 75, 3, 4);
}); });
it('handles multiple SSE events delivered in a single chunk', async () => {
const onProgress = vi.fn();
const onDone = vi.fn();
const onError = vi.fn();
mocks.fetch.mockResolvedValue(
makeSSEChunkResponse([
'data: {"type":"progress","percent":25,"chunk":1,"total":2}\n\n' +
'data: {"type":"progress","percent":50,"chunk":2,"total":2}\n\n' +
'data: {"type":"done","job":{}}\n\n'
])
);
await streamJob('whisper-id', onProgress, onDone, onError);
expect(onProgress).toHaveBeenCalledTimes(2);
expect(onProgress).toHaveBeenNthCalledWith(1, 25, 1, 2);
expect(onProgress).toHaveBeenNthCalledWith(2, 50, 2, 2);
expect(onDone).toHaveBeenCalledOnce();
expect(onError).not.toHaveBeenCalled();
});
it('defaults chunk and total to 0 when missing from progress event', async () => { it('defaults chunk and total to 0 when missing from progress event', async () => {
const onProgress = vi.fn(); const onProgress = vi.fn();
const onDone = vi.fn(); const onDone = vi.fn();