Compare commits

..

3 Commits

Author SHA1 Message Date
Giancarmine Salucci
04142b17a8 feat: whisper-side cancellation + SSE-triggered retry
All checks were successful
Build & Push Docker Image / build-and-push (push) Successful in 48s
- Add cancelJob() to whisper.ts: sends DELETE /jobs/:id to the whisper
  server (best-effort, errors silently ignored)
- DELETE /api/jobs/[id] now calls cancelJob() when cancelling an active
  job that has a whisperJobId, stopping GPU use immediately
- Webhook handler guards against locally-cancelled jobs: returns ok early
  so whisper's late completion cannot overwrite cancelled status or send
  a phantom 'Transcript ready' notification
- Replace blind sleep(Retry-After + 1s) in submitJob() with
  waitForModelReady(): subscribes to /model/events SSE and proceeds as
  soon as state:ready arrives; falls back to the Retry-After timeout if
  SSE is unreachable or closes without model_ready
- Refactor retry tests to use URL-aware makeJobFetch() helper; add 7 new
  tests (3 SSE-triggered retry, 3 cancelJob, 1 webhook cancelled-guard)
  — 144/144 passing

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-09 00:40:40 +02:00
Giancarmine Salucci
01845bec25 test: comprehensive coverage for 503 retry loop and getModelStatus
submitJob — 503 retry behavior (10 new tests):
- calls onModelWaiting with correct state + retryAfterSecs on each 503
- retries until model ready and returns job_id
- tracks all three model states (unloaded, loading, waiting_for_gpu)
- uses retry_after_secs from response body
- falls back to Retry-After header when body field absent
- falls back to 15s when both body and header are absent
- throws after maxAttempts exhausted (fetch called exactly N times)
- does NOT call onModelWaiting for non-503 errors
- does NOT retry on non-503 errors (throws immediately, one fetch call)
- works correctly without an onModelWaiting callback

getModelStatus (6 new tests):
- returns parsed status for each model state tag
- includes optional fields (loaded_at, vram_*, retry_in_secs)
- calls the correct WHISPER_URL/model/status endpoint
- throws when server returns non-ok

Uses vi.useFakeTimers()/runAllTimersAsync() to eliminate real delays.
Rejection handler attached before timer advance to avoid unhandled-rejection
false positives from Vitest's detector.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-09 00:14:09 +02:00
Giancarmine Salucci
b90d57984c feat: model-on-demand lifecycle — retry on 503, live status pill, warming indicator
- whisper.ts: add getModelStatus(); fix submitJob() to retry on 503 using
  Retry-After header instead of throwing; optional onModelWaiting callback
  lets the pipeline surface model state to the UI during the wait
- pipeline.ts: pass onModelWaiting callback → emits model_warming SSE event
  so the job detail page can show 'Warming up model…' while waiting
- types.ts: add ModelStateTag union and ModelStatus interface
- api/model/status: GET route proxies whisper /model/status (falls back to
  {state:'unloaded'} if whisper unreachable)
- api/model/events: GET route relays whisper SSE stream to the browser;
  AbortController tied to request.signal cleans up on disconnect
- layout.svelte: status pill is now live — initial fetch + EventSource on
  /api/model/events; dot colour + label reflect real model state with a
  pulsing animation while loading or waiting_for_gpu
- jobs/[id]/+page.svelte: handle model_warming event type → show a yellow
  'Warming up model…' sub-label with spinner inside the progress card
- whisper.test.ts: update submitJob mocks to status:202 to match real API

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-09 00:08:21 +02:00
11 changed files with 644 additions and 21 deletions

View File

@@ -129,7 +129,9 @@ async function runJob(
emitProgress(jobId, { type: 'status', status: 'transcribing' }); emitProgress(jobId, { type: 'status', status: 'transcribing' });
const webhookUrl = `${WEBHOOK_BASE_URL}/api/webhook/${jobId}`; const webhookUrl = `${WEBHOOK_BASE_URL}/api/webhook/${jobId}`;
const whisperJobId = await submitJob(wavPath, webhookUrl, language); const whisperJobId = await submitJob(wavPath, webhookUrl, language, (state, retryAfterSecs) => {
emitProgress(jobId, { type: 'model_warming', state, retryAfterSecs });
});
updateJob({ id: jobId, whisperJobId }); updateJob({ id: jobId, whisperJobId });
// ── 5. Open SSE for live progress (non-blocking relay) ─────────────── // ── 5. Open SSE for live progress (non-blocking relay) ───────────────

View File

@@ -1,17 +1,90 @@
import { execFile } from 'child_process'; import type { ModelStatus } from '$lib/types.js';
import { promisify } from 'util';
const execFileAsync = promisify(execFile);
function whisperUrl() { function whisperUrl() {
return process.env.WHISPER_URL ?? 'http://localhost:8080'; return process.env.WHISPER_URL ?? 'http://localhost:8080';
} }
/** Submit an audio file to whisper-rtx2080. Returns the whisper job id. */ /** Get the current model state from whisper-rtx2080. */
export async function getModelStatus(): Promise<ModelStatus> {
const { default: fetch } = await import('node-fetch');
const res = await fetch(`${whisperUrl()}/model/status`, {
signal: AbortSignal.timeout(5000)
});
if (!res.ok) throw new Error(`/model/status returned ${res.status}`);
return res.json() as Promise<ModelStatus>;
}
/**
* Wait for the whisper model to become ready.
*
* Subscribes to /model/events SSE and resolves as soon as a payload with
* state:"ready" arrives. Falls back to a plain timeout (`timeoutMs`) if the
* SSE connection fails or closes without that event, so the retry loop can
* try again without hanging indefinitely.
*/
async function waitForModelReady(timeoutMs: number): Promise<void> {
const { default: fetch } = await import('node-fetch');
const ac = new AbortController();
return new Promise((resolve) => {
let done = false;
const finish = () => {
if (!done) {
done = true;
ac.abort();
resolve();
}
};
const timer = setTimeout(finish, timeoutMs);
fetch(`${whisperUrl()}/model/events`, { signal: ac.signal as AbortSignal })
.then(async (res) => {
if (!res.body) {
clearTimeout(timer);
return finish();
}
let buf = '';
for await (const chunk of res.body) {
if (ac.signal.aborted) break;
buf += chunk.toString();
const lines = buf.split('\n');
buf = lines.pop() ?? '';
for (const line of lines) {
if (!line.startsWith('data:')) continue;
try {
const payload = JSON.parse(line.slice(5).trim());
if (payload.state === 'ready') {
clearTimeout(timer);
finish();
return;
}
} catch { /* ignore parse errors */ }
}
}
// Stream ended without model_ready → proceed to retry immediately
clearTimeout(timer);
finish();
})
.catch(() => {
// SSE unreachable — fallback timer will fire eventually
});
});
}
/**
* Submit an audio file to whisper-rtx2080. Returns the whisper job id.
*
* Handles 503 (model not ready) transparently: retries after subscribing to
* /model/events SSE — proceeds as soon as state:"ready" arrives, or after the
* Retry-After timeout elapses (whichever comes first).
* Calls `onModelWaiting` on each 503 so the caller can surface the wait to the user.
*/
export async function submitJob( export async function submitJob(
wavPath: string, wavPath: string,
webhookUrl: string, webhookUrl: string,
language?: string language?: string,
onModelWaiting?: (state: string, retryAfterSecs: number) => void,
maxAttempts = 20
): Promise<string> { ): Promise<string> {
const FormData = (await import('form-data')).default; const FormData = (await import('form-data')).default;
const { createReadStream } = await import('fs'); const { createReadStream } = await import('fs');
@@ -23,19 +96,49 @@ export async function submitJob(
form.append('webhook_url', webhookUrl); form.append('webhook_url', webhookUrl);
if (language) form.append('language', language); if (language) form.append('language', language);
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
const res = await fetch(`${whisperUrl()}/jobs`, { const res = await fetch(`${whisperUrl()}/jobs`, {
method: 'POST', method: 'POST',
body: form, body: form,
headers: form.getHeaders() headers: form.getHeaders()
}); });
if (!res.ok) { if (res.status === 202) {
const json = (await res.json()) as { job_id: string };
return json.job_id;
}
if (res.status === 503) {
const body = (await res.json().catch(() => ({}))) as {
state?: string;
retry_after_secs?: number;
};
const state = body.state ?? 'unloaded';
const waitSecs = body.retry_after_secs ?? parseInt(res.headers.get('Retry-After') ?? '15');
onModelWaiting?.(state, waitSecs);
await waitForModelReady((waitSecs + 1) * 1000);
continue;
}
const text = await res.text(); const text = await res.text();
throw new Error(`whisper /jobs returned ${res.status}: ${text}`); throw new Error(`whisper /jobs returned ${res.status}: ${text}`);
} }
const json = (await res.json()) as { job_id: string }; throw new Error(`Whisper model did not become ready after ${maxAttempts} attempts`);
return json.job_id; }
/**
* Cancel a queued or running job on the whisper server (best-effort).
* Errors are silently ignored — local job status is already set to cancelled.
*/
export async function cancelJob(whisperJobId: string): Promise<void> {
try {
const { default: fetch } = await import('node-fetch');
await fetch(`${whisperUrl()}/jobs/${whisperJobId}`, {
method: 'DELETE',
signal: AbortSignal.timeout(5000)
});
} catch { /* best-effort */ }
} }
/** Open an SSE stream from whisper and call onProgress/onDone callbacks. */ /** Open an SSE stream from whisper and call onProgress/onDone callbacks. */

View File

@@ -1,5 +1,17 @@
export type AudioMode = 'auto' | 'standard' | 'aggressive' | 'none'; export type AudioMode = 'auto' | 'standard' | 'aggressive' | 'none';
export type ModelStateTag = 'unloaded' | 'loading' | 'waiting_for_gpu' | 'ready';
export interface ModelStatus {
state: ModelStateTag;
loaded_at?: string;
vram_needed_mb?: number;
vram_free_mb?: number;
retry_in_secs?: number;
vram_used_mb?: number;
vram_total_mb?: number;
}
export type JobStatus = 'pending' | 'downloading' | 'preparing' | 'transcribing' | 'processing' | 'done' | 'failed' | 'cancelled'; export type JobStatus = 'pending' | 'downloading' | 'preparing' | 'transcribing' | 'processing' | 'done' | 'failed' | 'cancelled';
export interface Segment { export interface Segment {

View File

@@ -1,9 +1,10 @@
<script lang="ts"> <script lang="ts">
import '../app.css'; import '../app.css';
import { onMount } from 'svelte'; import { onMount, onDestroy } from 'svelte';
import { browser } from '$app/environment'; import { browser } from '$app/environment';
import { page } from '$app/stores'; import { page } from '$app/stores';
import { accent } from '$lib/accent.js'; import { accent } from '$lib/accent.js';
import type { ModelStatus } from '$lib/types.js';
let { children } = $props(); let { children } = $props();
@@ -11,8 +12,43 @@
// The store subscriber handles everything; just subscribing here keeps it alive. // The store subscriber handles everything; just subscribing here keeps it alive.
$effect(() => { void $accent; }); $effect(() => { void $accent; });
// ── Model status ───────────────────────────────────────
let modelStatus = $state<ModelStatus>({ state: 'unloaded' });
let modelEs: EventSource | null = null;
function refreshModelStatus() {
fetch('/api/model/status')
.then((r) => r.json())
.then((s) => (modelStatus = s as ModelStatus))
.catch(() => {});
}
function subscribeModelEvents() {
modelEs?.close();
modelEs = new EventSource('/api/model/events');
modelEs.addEventListener('model_loading', () => refreshModelStatus());
modelEs.addEventListener('model_ready', () => refreshModelStatus());
modelEs.addEventListener('model_unloaded', () => refreshModelStatus());
modelEs.addEventListener('model_waiting_for_gpu',() => refreshModelStatus());
modelEs.onerror = () => { /* browser reconnects automatically */ };
}
const modelStateMeta: Record<string, { dot: string; label: string; pulse: boolean }> = {
unloaded: { dot: 'var(--text-dim)', label: 'model unloaded', pulse: false },
loading: { dot: '#f0b429', label: 'model loading…', pulse: true },
waiting_for_gpu: { dot: '#f97316', label: 'waiting for GPU', pulse: true },
ready: { dot: '#5dd47a', label: 'whisper-large-v3',pulse: false }
};
const modelMeta = $derived(
modelStateMeta[modelStatus.state] ?? modelStateMeta.unloaded
);
// Push notification setup // Push notification setup
onMount(async () => { onMount(async () => {
refreshModelStatus();
subscribeModelEvents();
if (!browser || !('serviceWorker' in navigator) || !('PushManager' in window)) return; if (!browser || !('serviceWorker' in navigator) || !('PushManager' in window)) return;
try { try {
const reg = await navigator.serviceWorker.ready; const reg = await navigator.serviceWorker.ready;
@@ -42,6 +78,8 @@
} }
}); });
onDestroy(() => modelEs?.close());
function urlBase64ToUint8Array(base64: string): Uint8Array { function urlBase64ToUint8Array(base64: string): Uint8Array {
const pad = '='.repeat((4 - (base64.length % 4)) % 4); const pad = '='.repeat((4 - (base64.length % 4)) % 4);
const b64 = (base64 + pad).replace(/-/g, '+').replace(/_/g, '/'); const b64 = (base64 + pad).replace(/-/g, '+').replace(/_/g, '/');
@@ -135,8 +173,12 @@
<!-- Status dot --> <!-- Status dot -->
<div class="status-pill"> <div class="status-pill">
<div class="status-dot"></div> <div
<span>whisper-large-v3</span> class="status-dot"
class:pulse={modelMeta.pulse}
style="background: {modelMeta.dot}"
></div>
<span>{modelMeta.label}</span>
</div> </div>
</nav> </nav>
@@ -268,8 +310,15 @@
width: 6px; width: 6px;
height: 6px; height: 6px;
border-radius: 3px; border-radius: 3px;
background: #5dd47a;
flex-shrink: 0; flex-shrink: 0;
transition: background 0.4s;
}
.status-dot.pulse {
animation: dot-pulse 1.4s ease-in-out infinite;
}
@keyframes dot-pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.3; }
} }
/* ── Main content ─────────────────────────────────────── */ /* ── Main content ─────────────────────────────────────── */

View File

@@ -1,5 +1,6 @@
import { json, error } from '@sveltejs/kit'; import { json, error } from '@sveltejs/kit';
import { getJob, setJobStatus, deleteJob } from '$lib/server/db.js'; import { getJob, setJobStatus, deleteJob } from '$lib/server/db.js';
import { cancelJob } from '$lib/server/whisper.js';
import { rm } from 'fs/promises'; import { rm } from 'fs/promises';
export async function GET({ params }) { export async function GET({ params }) {
@@ -17,6 +18,8 @@ export async function DELETE({ params }) {
if (ACTIVE.has(job.status)) { if (ACTIVE.has(job.status)) {
// Cancel active job (keeps DB record) // Cancel active job (keeps DB record)
setJobStatus(params.id, 'cancelled', 0); setJobStatus(params.id, 'cancelled', 0);
// Best-effort: tell whisper to drop the queued job so it stops using GPU
if (job.whisperJobId) cancelJob(job.whisperJobId).catch(() => {});
} else { } else {
// Hard-delete terminal job + clean up output files // Hard-delete terminal job + clean up output files
deleteJob(params.id); deleteJob(params.id);

View File

@@ -0,0 +1,43 @@
const WHISPER_URL = process.env.WHISPER_URL ?? 'http://localhost:8080';
/** Relay the whisper /model/events SSE stream to the browser. */
export async function GET({ request }) {
const { default: fetch } = await import('node-fetch');
const ac = new AbortController();
request.signal.addEventListener('abort', () => ac.abort());
const stream = new ReadableStream({
async start(controller) {
try {
const upstream = await fetch(`${WHISPER_URL}/model/events`, {
signal: ac.signal as AbortSignal
});
if (!upstream.body) {
controller.close();
return;
}
for await (const chunk of upstream.body) {
if (ac.signal.aborted) break;
controller.enqueue(chunk instanceof Buffer ? chunk : Buffer.from(String(chunk)));
}
} catch {
// upstream closed, client disconnected, or whisper unreachable — all fine
} finally {
controller.close();
}
},
cancel() {
ac.abort();
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no'
}
});
}

View File

@@ -0,0 +1,14 @@
import { getModelStatus } from '$lib/server/whisper.js';
export async function GET() {
try {
const status = await getModelStatus();
return new Response(JSON.stringify(status), {
headers: { 'Content-Type': 'application/json' }
});
} catch {
return new Response(JSON.stringify({ state: 'unloaded' }), {
headers: { 'Content-Type': 'application/json' }
});
}
}

View File

@@ -12,6 +12,11 @@ const jobId = params.jobId;
const job = getJob(jobId); const job = getJob(jobId);
if (!job) throw error(404, 'Job not found'); if (!job) throw error(404, 'Job not found');
// Discard the result if the job was cancelled locally while whisper was running
if (job.status === 'cancelled') {
return json({ ok: true });
}
const whisperJob = (await request.json()) as WhisperJob; const whisperJob = (await request.json()) as WhisperJob;
if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') { if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') {

View File

@@ -13,6 +13,7 @@
let segments = $state<Segment[]>([]); let segments = $state<Segment[]>([]);
let error = $state(''); let error = $state('');
let chunkInfo = $state({ chunk: 0, total: 0 }); let chunkInfo = $state({ chunk: 0, total: 0 });
let modelWarming = $state<{ state: string; retryAfterSecs: number } | null>(null);
let eventSource: EventSource | null = null; let eventSource: EventSource | null = null;
const statusLabel: Record<string, string> = { const statusLabel: Record<string, string> = {
@@ -83,8 +84,11 @@
try { try {
const data = JSON.parse(e.data); const data = JSON.parse(e.data);
if (data.type === 'progress') { if (data.type === 'progress') {
modelWarming = null;
chunkInfo = { chunk: data.chunk ?? 0, total: data.total ?? 0 }; chunkInfo = { chunk: data.chunk ?? 0, total: data.total ?? 0 };
if (job) job = { ...job, progress: data.progress ?? job.progress, status: 'transcribing' }; if (job) job = { ...job, progress: data.progress ?? job.progress, status: 'transcribing' };
} else if (data.type === 'model_warming') {
modelWarming = { state: data.state ?? 'loading', retryAfterSecs: data.retryAfterSecs ?? 30 };
} else if (data.type === 'status') { } else if (data.type === 'status') {
if (job) job = { ...job, status: data.status, progress: data.progress ?? job.progress }; if (job) job = { ...job, status: data.status, progress: data.progress ?? job.progress };
} else if (data.type === 'done') { } else if (data.type === 'done') {
@@ -215,6 +219,15 @@
{/if} {/if}
</div> </div>
{#if modelWarming}
<div class="warming-notice mono">
<svg width="12" height="12" viewBox="0 0 12 12" fill="none" style="flex-shrink:0; animation: spin 1.5s linear infinite">
<circle cx="6" cy="6" r="4.5" stroke="currentColor" stroke-width="1.4" fill="none" stroke-dasharray="14 8"/>
</svg>
Warming up model ({modelWarming.state.replace(/_/g, ' ')}) — retrying in {modelWarming.retryAfterSecs}s…
</div>
{/if}
<!-- Progress bar --> <!-- Progress bar -->
<div class="progress-bar-track"> <div class="progress-bar-track">
<div <div
@@ -484,6 +497,16 @@
color: var(--text-muted); color: var(--text-muted);
} }
.warming-notice {
display: flex;
align-items: center;
gap: 8px;
margin-top: 10px;
font-size: 11.5px;
color: #f0b429;
opacity: 0.9;
}
.progress-bar-track { .progress-bar-track {
height: 4px; height: 4px;
border-radius: 2px; border-radius: 2px;

View File

@@ -113,6 +113,25 @@ describe('POST /api/webhook/[jobId] — job not found', () => {
}); });
}); });
// ── Local cancellation guard ──────────────────────────────────────────────────
describe('POST /api/webhook/[jobId] — locally cancelled job', () => {
it('returns ok without processing when the local job is already cancelled', async () => {
mockGetJob.mockReturnValue({ ...makeJob('job-lc'), status: 'cancelled' });
const payload = makeWhisperJob({ status: 'done' });
const res = await POST(makeEvent('job-lc', payload) as any);
expect(res.status).toBe(200);
expect(await res.json()).toEqual({ ok: true });
// Must not touch outputs, status, or notifications
expect(mockSetJobStatus).not.toHaveBeenCalled();
expect(mockUpdateJob).not.toHaveBeenCalled();
expect(mockWriteOutputs).not.toHaveBeenCalled();
expect(mockSendNotification).not.toHaveBeenCalled();
});
});
// ── Whisper job failed / cancelled ─────────────────────────────────────────── // ── Whisper job failed / cancelled ───────────────────────────────────────────
describe('POST /api/webhook/[jobId] — whisper failure', () => { describe('POST /api/webhook/[jobId] — whisper failure', () => {

View File

@@ -1,4 +1,4 @@
import { describe, it, expect, vi, afterEach } from 'vitest'; import { describe, it, expect, vi, afterEach, beforeEach } from 'vitest';
import { Readable } from 'stream'; import { Readable } from 'stream';
// ── Hoist mocks so they're available inside vi.mock() factories ─────────────── // ── Hoist mocks so they're available inside vi.mock() factories ───────────────
@@ -21,7 +21,7 @@ vi.mock('form-data', () => ({
vi.mock('fs', () => ({ createReadStream: vi.fn(() => 'STREAM_PLACEHOLDER') })); vi.mock('fs', () => ({ createReadStream: vi.fn(() => 'STREAM_PLACEHOLDER') }));
import { submitJob, streamJob } from '$lib/server/whisper.js'; import { submitJob, streamJob, getModelStatus, cancelJob } from '$lib/server/whisper.js';
afterEach(() => vi.clearAllMocks()); afterEach(() => vi.clearAllMocks());
@@ -31,6 +31,7 @@ describe('submitJob', () => {
it('POSTs to /jobs and returns job_id', async () => { it('POSTs to /jobs and returns job_id', async () => {
mocks.fetch.mockResolvedValue({ mocks.fetch.mockResolvedValue({
ok: true, ok: true,
status: 202,
json: () => Promise.resolve({ job_id: 'whisper-job-abc' }) json: () => Promise.resolve({ job_id: 'whisper-job-abc' })
}); });
const id = await submitJob('/tmp/audio.wav', 'http://host/api/webhook/job-1'); const id = await submitJob('/tmp/audio.wav', 'http://host/api/webhook/job-1');
@@ -41,6 +42,7 @@ describe('submitJob', () => {
vi.stubEnv('WHISPER_URL', 'http://localhost:8091'); vi.stubEnv('WHISPER_URL', 'http://localhost:8091');
mocks.fetch.mockResolvedValue({ mocks.fetch.mockResolvedValue({
ok: true, ok: true,
status: 202,
json: () => Promise.resolve({ job_id: 'x' }) json: () => Promise.resolve({ job_id: 'x' })
}); });
await submitJob('/tmp/audio.wav', 'http://host/api/webhook/job-1'); await submitJob('/tmp/audio.wav', 'http://host/api/webhook/job-1');
@@ -54,6 +56,7 @@ describe('submitJob', () => {
it('includes task=transcribe in the form', async () => { it('includes task=transcribe in the form', async () => {
mocks.fetch.mockResolvedValue({ mocks.fetch.mockResolvedValue({
ok: true, ok: true,
status: 202,
json: () => Promise.resolve({ job_id: 'x' }) json: () => Promise.resolve({ job_id: 'x' })
}); });
await submitJob('/tmp/audio.wav', 'http://host/webhook'); await submitJob('/tmp/audio.wav', 'http://host/webhook');
@@ -63,6 +66,7 @@ describe('submitJob', () => {
it('includes webhook_url in the form', async () => { it('includes webhook_url in the form', async () => {
mocks.fetch.mockResolvedValue({ mocks.fetch.mockResolvedValue({
ok: true, ok: true,
status: 202,
json: () => Promise.resolve({ job_id: 'x' }) json: () => Promise.resolve({ job_id: 'x' })
}); });
await submitJob('/tmp/audio.wav', 'http://192.168.1.10:3000/api/webhook/job-99'); await submitJob('/tmp/audio.wav', 'http://192.168.1.10:3000/api/webhook/job-99');
@@ -75,6 +79,7 @@ describe('submitJob', () => {
it('includes language when provided', async () => { it('includes language when provided', async () => {
mocks.fetch.mockResolvedValue({ mocks.fetch.mockResolvedValue({
ok: true, ok: true,
status: 202,
json: () => Promise.resolve({ job_id: 'x' }) json: () => Promise.resolve({ job_id: 'x' })
}); });
await submitJob('/tmp/audio.wav', 'http://host/webhook', 'en'); await submitJob('/tmp/audio.wav', 'http://host/webhook', 'en');
@@ -84,6 +89,7 @@ describe('submitJob', () => {
it('omits language field when not provided', async () => { it('omits language field when not provided', async () => {
mocks.fetch.mockResolvedValue({ mocks.fetch.mockResolvedValue({
ok: true, ok: true,
status: 202,
json: () => Promise.resolve({ job_id: 'x' }) json: () => Promise.resolve({ job_id: 'x' })
}); });
await submitJob('/tmp/audio.wav', 'http://host/webhook'); await submitJob('/tmp/audio.wav', 'http://host/webhook');
@@ -101,6 +107,350 @@ describe('submitJob', () => {
}); });
}); });
// ── submitJob — 503 retry & model-warming behavior ───────────────────────────
/** Minimal 503 response the whisper server returns when model not ready. */
function make503(state: string, retry_after_secs: number, headerRetryAfter?: string) {
return {
status: 503,
json: () => Promise.resolve({ error: 'model_not_ready', state, retry_after_secs }),
headers: {
get: (h: string) =>
h.toLowerCase() === 'retry-after' ? (headerRetryAfter ?? String(retry_after_secs)) : null
}
};
}
function make202(job_id: string) {
return { status: 202, json: () => Promise.resolve({ job_id }) };
}
/**
* URL-aware fetch mock: /model/events calls resolve immediately with no body
* (causing waitForModelReady to call finish() right away, so the retry loop
* proceeds without any real timer delay), and /jobs calls consume the
* provided response queue in order.
*/
function makeJobFetch(...responses: object[]) {
let idx = 0;
return (url: string) => {
if (String(url).includes('/model/events')) {
return Promise.resolve({ ok: true, status: 200, body: null });
}
return Promise.resolve(responses[idx++]);
};
}
describe('submitJob — 503 retry behavior', () => {
beforeEach(() => vi.useFakeTimers());
afterEach(() => vi.useRealTimers());
it('calls onModelWaiting with state and retryAfterSecs on first 503', async () => {
mocks.fetch.mockImplementation(makeJobFetch(make503('unloaded', 30), make202('job-1')));
const onModelWaiting = vi.fn();
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting);
expect(id).toBe('job-1');
expect(onModelWaiting).toHaveBeenCalledOnce();
expect(onModelWaiting).toHaveBeenCalledWith('unloaded', 30);
});
it('retries and returns job_id once model becomes ready', async () => {
mocks.fetch.mockImplementation(makeJobFetch(make503('loading', 10), make202('ready-id')));
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
expect(id).toBe('ready-id');
const jobCalls = mocks.fetch.mock.calls.filter(([url]) => String(url).endsWith('/jobs'));
expect(jobCalls).toHaveLength(2);
});
it('calls onModelWaiting once per 503, not on success', async () => {
mocks.fetch.mockImplementation(
makeJobFetch(make503('loading', 0), make503('loading', 0), make202('final-id'))
);
const onModelWaiting = vi.fn();
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10);
expect(id).toBe('final-id');
expect(onModelWaiting).toHaveBeenCalledTimes(2);
});
it('passes the correct state for each 503 response', async () => {
mocks.fetch.mockImplementation(
makeJobFetch(
make503('unloaded', 0),
make503('loading', 0),
make503('waiting_for_gpu', 0),
make202('job-x')
)
);
const onModelWaiting = vi.fn();
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10);
expect(id).toBe('job-x');
expect(onModelWaiting).toHaveBeenNthCalledWith(1, 'unloaded', 0);
expect(onModelWaiting).toHaveBeenNthCalledWith(2, 'loading', 0);
expect(onModelWaiting).toHaveBeenNthCalledWith(3, 'waiting_for_gpu', 0);
});
it('falls back to Retry-After header when body lacks retry_after_secs', async () => {
mocks.fetch.mockImplementation(
makeJobFetch(
{
status: 503,
json: () => Promise.resolve({ state: 'loading' }),
headers: { get: (h: string) => (h.toLowerCase() === 'retry-after' ? '7' : null) }
},
make202('fallback-id')
)
);
const onModelWaiting = vi.fn();
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting);
expect(id).toBe('fallback-id');
expect(onModelWaiting).toHaveBeenCalledWith('loading', 7);
});
it('falls back to 15s when both body and header are absent', async () => {
mocks.fetch.mockImplementation(
makeJobFetch(
{
status: 503,
json: () => Promise.resolve({ state: 'unloaded' }),
headers: { get: () => null }
},
make202('default-wait-id')
)
);
const onModelWaiting = vi.fn();
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting);
expect(id).toBe('default-wait-id');
expect(onModelWaiting).toHaveBeenCalledWith('unloaded', 15);
});
it('throws after maxAttempts 503 responses', async () => {
mocks.fetch.mockImplementation(
makeJobFetch(make503('loading', 0), make503('loading', 0), make503('loading', 0))
);
await expect(
submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, undefined, 3)
).rejects.toThrow(/did not become ready after 3 attempts/i);
const jobCalls = mocks.fetch.mock.calls.filter(([url]) => String(url).endsWith('/jobs'));
expect(jobCalls).toHaveLength(3);
});
it('does NOT call onModelWaiting for non-503 errors', async () => {
mocks.fetch.mockResolvedValue({
status: 500,
text: () => Promise.resolve('internal error')
});
const onModelWaiting = vi.fn();
await expect(
submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting)
).rejects.toThrow('500');
expect(onModelWaiting).not.toHaveBeenCalled();
});
it('does NOT retry on non-503 errors (throws immediately)', async () => {
mocks.fetch.mockResolvedValue({
status: 400,
text: () => Promise.resolve("missing 'audio' field")
});
await expect(
submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, undefined, 10)
).rejects.toThrow('400');
expect(mocks.fetch).toHaveBeenCalledTimes(1);
});
it('works correctly without an onModelWaiting callback', async () => {
mocks.fetch.mockImplementation(makeJobFetch(make503('unloaded', 0), make202('no-cb-id')));
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
expect(id).toBe('no-cb-id');
});
});
// ── submitJob — SSE-triggered retry ──────────────────────────────────────────
describe('submitJob — SSE-triggered retry', () => {
beforeEach(() => vi.useFakeTimers());
afterEach(() => vi.useRealTimers());
it('retries immediately when /model/events fires state:ready before Retry-After timeout', async () => {
let jobCallIdx = 0;
mocks.fetch.mockImplementation((url: string) => {
if (String(url).includes('/model/events')) {
// SSE stream that immediately emits model_ready
return Promise.resolve({
ok: true,
status: 200,
body: Readable.from(['data: {"state":"ready"}\n\n'])
});
}
jobCallIdx++;
if (jobCallIdx === 1) return Promise.resolve(make503('loading', 30));
return Promise.resolve(make202('sse-triggered-id'));
});
// SSE fires model_ready before the 31s timeout — no timer advancement needed
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
expect(id).toBe('sse-triggered-id');
const sseCalls = mocks.fetch.mock.calls.filter(([url]) =>
String(url).includes('/model/events')
);
expect(sseCalls).toHaveLength(1);
});
it('falls back to Retry-After sleep when /model/events is unreachable', async () => {
let jobCallIdx = 0;
mocks.fetch.mockImplementation((url: string) => {
if (String(url).includes('/model/events')) {
return Promise.reject(new Error('Connection refused'));
}
jobCallIdx++;
if (jobCallIdx === 1) return Promise.resolve(make503('loading', 5));
return Promise.resolve(make202('fallback-sleep-id'));
});
// SSE failed → must wait for the Retry-After timer
const p = submitJob('/tmp/audio.wav', 'http://host/webhook');
await vi.runAllTimersAsync();
await expect(p).resolves.toBe('fallback-sleep-id');
});
it('proceeds immediately when SSE stream closes without model_ready', async () => {
let jobCallIdx = 0;
mocks.fetch.mockImplementation((url: string) => {
if (String(url).includes('/model/events')) {
// Empty stream — closes without emitting anything
return Promise.resolve({ ok: true, status: 200, body: Readable.from([]) });
}
jobCallIdx++;
if (jobCallIdx === 1) return Promise.resolve(make503('loading', 30));
return Promise.resolve(make202('stream-closed-id'));
});
// Stream closed without model_ready → should not wait the full 31s timeout
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
expect(id).toBe('stream-closed-id');
});
});
// ── cancelJob ─────────────────────────────────────────────────────────────────
describe('cancelJob', () => {
it('sends DELETE to the correct whisper job URL', async () => {
mocks.fetch.mockResolvedValue({ ok: true, status: 200 });
await cancelJob('whisper-job-abc');
expect(mocks.fetch).toHaveBeenCalledWith(
expect.stringContaining('/jobs/whisper-job-abc'),
expect.objectContaining({ method: 'DELETE' })
);
});
it('uses the configured WHISPER_URL', async () => {
vi.stubEnv('WHISPER_URL', 'http://gpu-box:9090');
mocks.fetch.mockResolvedValue({ ok: true, status: 200 });
await cancelJob('job-xyz');
expect(mocks.fetch).toHaveBeenCalledWith(
'http://gpu-box:9090/jobs/job-xyz',
expect.objectContaining({ method: 'DELETE' })
);
vi.unstubAllEnvs();
});
it('swallows errors silently (best-effort)', async () => {
mocks.fetch.mockRejectedValue(new Error('Connection refused'));
await expect(cancelJob('dead-job')).resolves.not.toThrow();
});
});
// ── getModelStatus ────────────────────────────────────────────────────────────
describe('getModelStatus', () => {
it('returns parsed status when model is ready', async () => {
const readyStatus = {
state: 'ready',
loaded_at: '2026-05-09T00:00:00.000Z',
vram_used_mb: 4096,
vram_total_mb: 8192
};
mocks.fetch.mockResolvedValue({
ok: true,
json: () => Promise.resolve(readyStatus)
});
const status = await getModelStatus();
expect(status.state).toBe('ready');
expect(status.loaded_at).toBe('2026-05-09T00:00:00.000Z');
expect(status.vram_used_mb).toBe(4096);
});
it('returns parsed status when model is unloaded', async () => {
mocks.fetch.mockResolvedValue({
ok: true,
json: () => Promise.resolve({ state: 'unloaded' })
});
const status = await getModelStatus();
expect(status.state).toBe('unloaded');
});
it('returns parsed status when model is loading', async () => {
mocks.fetch.mockResolvedValue({
ok: true,
json: () => Promise.resolve({ state: 'loading' })
});
const status = await getModelStatus();
expect(status.state).toBe('loading');
});
it('returns parsed status when waiting_for_gpu with VRAM fields', async () => {
const waitingStatus = {
state: 'waiting_for_gpu',
vram_needed_mb: 3951,
vram_free_mb: 512,
retry_in_secs: 30
};
mocks.fetch.mockResolvedValue({
ok: true,
json: () => Promise.resolve(waitingStatus)
});
const status = await getModelStatus();
expect(status.state).toBe('waiting_for_gpu');
expect(status.vram_needed_mb).toBe(3951);
expect(status.vram_free_mb).toBe(512);
});
it('calls the correct WHISPER_URL endpoint', async () => {
vi.stubEnv('WHISPER_URL', 'http://gpu-box:9090');
mocks.fetch.mockResolvedValue({
ok: true,
json: () => Promise.resolve({ state: 'ready' })
});
await getModelStatus();
expect(mocks.fetch).toHaveBeenCalledWith(
'http://gpu-box:9090/model/status',
expect.objectContaining({ signal: expect.anything() })
);
vi.unstubAllEnvs();
});
it('throws when the server returns a non-ok response', async () => {
mocks.fetch.mockResolvedValue({ ok: false, status: 503 });
await expect(getModelStatus()).rejects.toThrow('/model/status');
});
});
// ── streamJob SSE parsing ───────────────────────────────────────────────────── // ── streamJob SSE parsing ─────────────────────────────────────────────────────
function makeSSEResponse(lines: string[]) { function makeSSEResponse(lines: string[]) {