feat: whisper-side cancellation + SSE-triggered retry
All checks were successful
Build & Push Docker Image / build-and-push (push) Successful in 48s

- Add cancelJob() to whisper.ts: sends DELETE /jobs/:id to the whisper
  server (best-effort, errors silently ignored)
- DELETE /api/jobs/[id] now calls cancelJob() when cancelling an active
  job that has a whisperJobId, stopping GPU use immediately
- Webhook handler guards against locally-cancelled jobs: returns ok early
  so whisper's late completion cannot overwrite cancelled status or send
  a phantom 'Transcript ready' notification
- Replace blind sleep(Retry-After + 1s) in submitJob() with
  waitForModelReady(): subscribes to /model/events SSE and proceeds as
  soon as state:ready arrives; falls back to the Retry-After timeout if
  SSE is unreachable or closes without model_ready
- Refactor retry tests to use URL-aware makeJobFetch() helper; add 7 new
  tests (3 SSE-triggered retry, 3 cancelJob, 1 webhook cancelled-guard)
  — 144/144 passing

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Giancarmine Salucci
2026-05-09 00:40:40 +02:00
parent 01845bec25
commit 04142b17a8
5 changed files with 267 additions and 73 deletions

View File

@@ -4,8 +4,6 @@ function whisperUrl() {
return process.env.WHISPER_URL ?? 'http://localhost:8080'; return process.env.WHISPER_URL ?? 'http://localhost:8080';
} }
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
/** Get the current model state from whisper-rtx2080. */ /** Get the current model state from whisper-rtx2080. */
export async function getModelStatus(): Promise<ModelStatus> { export async function getModelStatus(): Promise<ModelStatus> {
const { default: fetch } = await import('node-fetch'); const { default: fetch } = await import('node-fetch');
@@ -16,11 +14,69 @@ export async function getModelStatus(): Promise<ModelStatus> {
return res.json() as Promise<ModelStatus>; return res.json() as Promise<ModelStatus>;
} }
/**
* Wait for the whisper model to become ready.
*
* Subscribes to /model/events SSE and resolves as soon as a payload with
* state:"ready" arrives. Falls back to a plain timeout (`timeoutMs`) if the
* SSE connection fails or closes without that event, so the retry loop can
* try again without hanging indefinitely.
*/
async function waitForModelReady(timeoutMs: number): Promise<void> {
const { default: fetch } = await import('node-fetch');
const ac = new AbortController();
return new Promise((resolve) => {
let done = false;
const finish = () => {
if (!done) {
done = true;
ac.abort();
resolve();
}
};
const timer = setTimeout(finish, timeoutMs);
fetch(`${whisperUrl()}/model/events`, { signal: ac.signal as AbortSignal })
.then(async (res) => {
if (!res.body) {
clearTimeout(timer);
return finish();
}
let buf = '';
for await (const chunk of res.body) {
if (ac.signal.aborted) break;
buf += chunk.toString();
const lines = buf.split('\n');
buf = lines.pop() ?? '';
for (const line of lines) {
if (!line.startsWith('data:')) continue;
try {
const payload = JSON.parse(line.slice(5).trim());
if (payload.state === 'ready') {
clearTimeout(timer);
finish();
return;
}
} catch { /* ignore parse errors */ }
}
}
// Stream ended without model_ready → proceed to retry immediately
clearTimeout(timer);
finish();
})
.catch(() => {
// SSE unreachable — fallback timer will fire eventually
});
});
}
/** /**
* Submit an audio file to whisper-rtx2080. Returns the whisper job id. * Submit an audio file to whisper-rtx2080. Returns the whisper job id.
* *
* Handles 503 (model not ready) transparently: retries using the * Handles 503 (model not ready) transparently: retries after subscribing to
* `Retry-After` header until the model loads or maxAttempts is exhausted. * /model/events SSE — proceeds as soon as state:"ready" arrives, or after the
* Retry-After timeout elapses (whichever comes first).
* Calls `onModelWaiting` on each 503 so the caller can surface the wait to the user. * Calls `onModelWaiting` on each 503 so the caller can surface the wait to the user.
*/ */
export async function submitJob( export async function submitJob(
@@ -60,7 +116,7 @@ export async function submitJob(
const state = body.state ?? 'unloaded'; const state = body.state ?? 'unloaded';
const waitSecs = body.retry_after_secs ?? parseInt(res.headers.get('Retry-After') ?? '15'); const waitSecs = body.retry_after_secs ?? parseInt(res.headers.get('Retry-After') ?? '15');
onModelWaiting?.(state, waitSecs); onModelWaiting?.(state, waitSecs);
await sleep((waitSecs + 1) * 1000); await waitForModelReady((waitSecs + 1) * 1000);
continue; continue;
} }
@@ -71,6 +127,20 @@ export async function submitJob(
throw new Error(`Whisper model did not become ready after ${maxAttempts} attempts`); throw new Error(`Whisper model did not become ready after ${maxAttempts} attempts`);
} }
/**
* Cancel a queued or running job on the whisper server (best-effort).
* Errors are silently ignored — local job status is already set to cancelled.
*/
export async function cancelJob(whisperJobId: string): Promise<void> {
try {
const { default: fetch } = await import('node-fetch');
await fetch(`${whisperUrl()}/jobs/${whisperJobId}`, {
method: 'DELETE',
signal: AbortSignal.timeout(5000)
});
} catch { /* best-effort */ }
}
/** Open an SSE stream from whisper and call onProgress/onDone callbacks. */ /** Open an SSE stream from whisper and call onProgress/onDone callbacks. */
export async function streamJob( export async function streamJob(
whisperJobId: string, whisperJobId: string,

View File

@@ -1,5 +1,6 @@
import { json, error } from '@sveltejs/kit'; import { json, error } from '@sveltejs/kit';
import { getJob, setJobStatus, deleteJob } from '$lib/server/db.js'; import { getJob, setJobStatus, deleteJob } from '$lib/server/db.js';
import { cancelJob } from '$lib/server/whisper.js';
import { rm } from 'fs/promises'; import { rm } from 'fs/promises';
export async function GET({ params }) { export async function GET({ params }) {
@@ -17,6 +18,8 @@ export async function DELETE({ params }) {
if (ACTIVE.has(job.status)) { if (ACTIVE.has(job.status)) {
// Cancel active job (keeps DB record) // Cancel active job (keeps DB record)
setJobStatus(params.id, 'cancelled', 0); setJobStatus(params.id, 'cancelled', 0);
// Best-effort: tell whisper to drop the queued job so it stops using GPU
if (job.whisperJobId) cancelJob(job.whisperJobId).catch(() => {});
} else { } else {
// Hard-delete terminal job + clean up output files // Hard-delete terminal job + clean up output files
deleteJob(params.id); deleteJob(params.id);

View File

@@ -12,6 +12,11 @@ const jobId = params.jobId;
const job = getJob(jobId); const job = getJob(jobId);
if (!job) throw error(404, 'Job not found'); if (!job) throw error(404, 'Job not found');
// Discard the result if the job was cancelled locally while whisper was running
if (job.status === 'cancelled') {
return json({ ok: true });
}
const whisperJob = (await request.json()) as WhisperJob; const whisperJob = (await request.json()) as WhisperJob;
if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') { if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') {

View File

@@ -113,6 +113,25 @@ describe('POST /api/webhook/[jobId] — job not found', () => {
}); });
}); });
// ── Local cancellation guard ──────────────────────────────────────────────────
describe('POST /api/webhook/[jobId] — locally cancelled job', () => {
it('returns ok without processing when the local job is already cancelled', async () => {
mockGetJob.mockReturnValue({ ...makeJob('job-lc'), status: 'cancelled' });
const payload = makeWhisperJob({ status: 'done' });
const res = await POST(makeEvent('job-lc', payload) as any);
expect(res.status).toBe(200);
expect(await res.json()).toEqual({ ok: true });
// Must not touch outputs, status, or notifications
expect(mockSetJobStatus).not.toHaveBeenCalled();
expect(mockUpdateJob).not.toHaveBeenCalled();
expect(mockWriteOutputs).not.toHaveBeenCalled();
expect(mockSendNotification).not.toHaveBeenCalled();
});
});
// ── Whisper job failed / cancelled ─────────────────────────────────────────── // ── Whisper job failed / cancelled ───────────────────────────────────────────
describe('POST /api/webhook/[jobId] — whisper failure', () => { describe('POST /api/webhook/[jobId] — whisper failure', () => {

View File

@@ -21,7 +21,7 @@ vi.mock('form-data', () => ({
vi.mock('fs', () => ({ createReadStream: vi.fn(() => 'STREAM_PLACEHOLDER') })); vi.mock('fs', () => ({ createReadStream: vi.fn(() => 'STREAM_PLACEHOLDER') }));
import { submitJob, streamJob, getModelStatus } from '$lib/server/whisper.js'; import { submitJob, streamJob, getModelStatus, cancelJob } from '$lib/server/whisper.js';
afterEach(() => vi.clearAllMocks()); afterEach(() => vi.clearAllMocks());
@@ -125,115 +125,121 @@ function make202(job_id: string) {
return { status: 202, json: () => Promise.resolve({ job_id }) }; return { status: 202, json: () => Promise.resolve({ job_id }) };
} }
/**
* URL-aware fetch mock: /model/events calls resolve immediately with no body
* (causing waitForModelReady to call finish() right away, so the retry loop
* proceeds without any real timer delay), and /jobs calls consume the
* provided response queue in order.
*/
function makeJobFetch(...responses: object[]) {
let idx = 0;
return (url: string) => {
if (String(url).includes('/model/events')) {
return Promise.resolve({ ok: true, status: 200, body: null });
}
return Promise.resolve(responses[idx++]);
};
}
describe('submitJob — 503 retry behavior', () => { describe('submitJob — 503 retry behavior', () => {
beforeEach(() => vi.useFakeTimers()); beforeEach(() => vi.useFakeTimers());
afterEach(() => vi.useRealTimers()); afterEach(() => vi.useRealTimers());
it('calls onModelWaiting with state and retryAfterSecs on first 503', async () => { it('calls onModelWaiting with state and retryAfterSecs on first 503', async () => {
mocks.fetch mocks.fetch.mockImplementation(makeJobFetch(make503('unloaded', 30), make202('job-1')));
.mockResolvedValueOnce(make503('unloaded', 30))
.mockResolvedValueOnce(make202('job-1'));
const onModelWaiting = vi.fn(); const onModelWaiting = vi.fn();
const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting);
await vi.runAllTimersAsync(); expect(id).toBe('job-1');
await expect(p).resolves.toBe('job-1');
expect(onModelWaiting).toHaveBeenCalledOnce(); expect(onModelWaiting).toHaveBeenCalledOnce();
expect(onModelWaiting).toHaveBeenCalledWith('unloaded', 30); expect(onModelWaiting).toHaveBeenCalledWith('unloaded', 30);
}); });
it('retries and returns job_id once model becomes ready', async () => { it('retries and returns job_id once model becomes ready', async () => {
mocks.fetch mocks.fetch.mockImplementation(makeJobFetch(make503('loading', 10), make202('ready-id')));
.mockResolvedValueOnce(make503('loading', 10))
.mockResolvedValueOnce(make202('ready-id'));
const p = submitJob('/tmp/audio.wav', 'http://host/webhook'); const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
await vi.runAllTimersAsync(); expect(id).toBe('ready-id');
const jobCalls = mocks.fetch.mock.calls.filter(([url]) => String(url).endsWith('/jobs'));
await expect(p).resolves.toBe('ready-id'); expect(jobCalls).toHaveLength(2);
expect(mocks.fetch).toHaveBeenCalledTimes(2);
}); });
it('calls onModelWaiting once per 503, not on success', async () => { it('calls onModelWaiting once per 503, not on success', async () => {
mocks.fetch mocks.fetch.mockImplementation(
.mockResolvedValueOnce(make503('loading', 0)) makeJobFetch(make503('loading', 0), make503('loading', 0), make202('final-id'))
.mockResolvedValueOnce(make503('loading', 0)) );
.mockResolvedValueOnce(make202('final-id'));
const onModelWaiting = vi.fn(); const onModelWaiting = vi.fn();
const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10); const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10);
await vi.runAllTimersAsync(); expect(id).toBe('final-id');
await expect(p).resolves.toBe('final-id');
expect(onModelWaiting).toHaveBeenCalledTimes(2); expect(onModelWaiting).toHaveBeenCalledTimes(2);
}); });
it('passes the correct state for each 503 response', async () => { it('passes the correct state for each 503 response', async () => {
mocks.fetch mocks.fetch.mockImplementation(
.mockResolvedValueOnce(make503('unloaded', 0)) makeJobFetch(
.mockResolvedValueOnce(make503('loading', 0)) make503('unloaded', 0),
.mockResolvedValueOnce(make503('waiting_for_gpu', 0)) make503('loading', 0),
.mockResolvedValueOnce(make202('job-x')); make503('waiting_for_gpu', 0),
make202('job-x')
)
);
const onModelWaiting = vi.fn(); const onModelWaiting = vi.fn();
const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10); const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10);
await vi.runAllTimersAsync(); expect(id).toBe('job-x');
await expect(p).resolves.toBe('job-x');
expect(onModelWaiting).toHaveBeenNthCalledWith(1, 'unloaded', 0); expect(onModelWaiting).toHaveBeenNthCalledWith(1, 'unloaded', 0);
expect(onModelWaiting).toHaveBeenNthCalledWith(2, 'loading', 0); expect(onModelWaiting).toHaveBeenNthCalledWith(2, 'loading', 0);
expect(onModelWaiting).toHaveBeenNthCalledWith(3, 'waiting_for_gpu', 0); expect(onModelWaiting).toHaveBeenNthCalledWith(3, 'waiting_for_gpu', 0);
}); });
it('falls back to Retry-After header when body lacks retry_after_secs', async () => { it('falls back to Retry-After header when body lacks retry_after_secs', async () => {
// Body with no retry_after_secs — only header mocks.fetch.mockImplementation(
mocks.fetch makeJobFetch(
.mockResolvedValueOnce({ {
status: 503, status: 503,
json: () => Promise.resolve({ state: 'loading' }), json: () => Promise.resolve({ state: 'loading' }),
headers: { get: (h: string) => (h.toLowerCase() === 'retry-after' ? '7' : null) } headers: { get: (h: string) => (h.toLowerCase() === 'retry-after' ? '7' : null) }
}) },
.mockResolvedValueOnce(make202('fallback-id')); make202('fallback-id')
)
);
const onModelWaiting = vi.fn(); const onModelWaiting = vi.fn();
const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting);
await vi.runAllTimersAsync(); expect(id).toBe('fallback-id');
await expect(p).resolves.toBe('fallback-id');
expect(onModelWaiting).toHaveBeenCalledWith('loading', 7); expect(onModelWaiting).toHaveBeenCalledWith('loading', 7);
}); });
it('falls back to 15s when both body and header are absent', async () => { it('falls back to 15s when both body and header are absent', async () => {
mocks.fetch mocks.fetch.mockImplementation(
.mockResolvedValueOnce({ makeJobFetch(
status: 503, {
json: () => Promise.resolve({ state: 'unloaded' }), status: 503,
headers: { get: () => null } json: () => Promise.resolve({ state: 'unloaded' }),
}) headers: { get: () => null }
.mockResolvedValueOnce(make202('default-wait-id')); },
make202('default-wait-id')
)
);
const onModelWaiting = vi.fn(); const onModelWaiting = vi.fn();
const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting);
await vi.runAllTimersAsync(); expect(id).toBe('default-wait-id');
await expect(p).resolves.toBe('default-wait-id');
expect(onModelWaiting).toHaveBeenCalledWith('unloaded', 15); expect(onModelWaiting).toHaveBeenCalledWith('unloaded', 15);
}); });
it('throws after maxAttempts 503 responses', async () => { it('throws after maxAttempts 503 responses', async () => {
mocks.fetch.mockResolvedValue(make503('loading', 0)); mocks.fetch.mockImplementation(
makeJobFetch(make503('loading', 0), make503('loading', 0), make503('loading', 0))
);
// Attach .rejects handler BEFORE advancing timers so the rejection await expect(
// is always handled before Vitest's unhandled-rejection detector fires.
const expectation = expect(
submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, undefined, 3) submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, undefined, 3)
).rejects.toThrow(/did not become ready after 3 attempts/i); ).rejects.toThrow(/did not become ready after 3 attempts/i);
await vi.runAllTimersAsync(); const jobCalls = mocks.fetch.mock.calls.filter(([url]) => String(url).endsWith('/jobs'));
await expectation; expect(jobCalls).toHaveLength(3);
expect(mocks.fetch).toHaveBeenCalledTimes(3);
}); });
it('does NOT call onModelWaiting for non-503 errors', async () => { it('does NOT call onModelWaiting for non-503 errors', async () => {
@@ -262,14 +268,105 @@ describe('submitJob — 503 retry behavior', () => {
}); });
it('works correctly without an onModelWaiting callback', async () => { it('works correctly without an onModelWaiting callback', async () => {
mocks.fetch mocks.fetch.mockImplementation(makeJobFetch(make503('unloaded', 0), make202('no-cb-id')));
.mockResolvedValueOnce(make503('unloaded', 0))
.mockResolvedValueOnce(make202('no-cb-id'));
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
expect(id).toBe('no-cb-id');
});
});
// ── submitJob — SSE-triggered retry ──────────────────────────────────────────
describe('submitJob — SSE-triggered retry', () => {
beforeEach(() => vi.useFakeTimers());
afterEach(() => vi.useRealTimers());
it('retries immediately when /model/events fires state:ready before Retry-After timeout', async () => {
let jobCallIdx = 0;
mocks.fetch.mockImplementation((url: string) => {
if (String(url).includes('/model/events')) {
// SSE stream that immediately emits model_ready
return Promise.resolve({
ok: true,
status: 200,
body: Readable.from(['data: {"state":"ready"}\n\n'])
});
}
jobCallIdx++;
if (jobCallIdx === 1) return Promise.resolve(make503('loading', 30));
return Promise.resolve(make202('sse-triggered-id'));
});
// SSE fires model_ready before the 31s timeout — no timer advancement needed
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
expect(id).toBe('sse-triggered-id');
const sseCalls = mocks.fetch.mock.calls.filter(([url]) =>
String(url).includes('/model/events')
);
expect(sseCalls).toHaveLength(1);
});
it('falls back to Retry-After sleep when /model/events is unreachable', async () => {
let jobCallIdx = 0;
mocks.fetch.mockImplementation((url: string) => {
if (String(url).includes('/model/events')) {
return Promise.reject(new Error('Connection refused'));
}
jobCallIdx++;
if (jobCallIdx === 1) return Promise.resolve(make503('loading', 5));
return Promise.resolve(make202('fallback-sleep-id'));
});
// SSE failed → must wait for the Retry-After timer
const p = submitJob('/tmp/audio.wav', 'http://host/webhook'); const p = submitJob('/tmp/audio.wav', 'http://host/webhook');
await vi.runAllTimersAsync(); await vi.runAllTimersAsync();
await expect(p).resolves.toBe('fallback-sleep-id');
});
await expect(p).resolves.toBe('no-cb-id'); it('proceeds immediately when SSE stream closes without model_ready', async () => {
let jobCallIdx = 0;
mocks.fetch.mockImplementation((url: string) => {
if (String(url).includes('/model/events')) {
// Empty stream — closes without emitting anything
return Promise.resolve({ ok: true, status: 200, body: Readable.from([]) });
}
jobCallIdx++;
if (jobCallIdx === 1) return Promise.resolve(make503('loading', 30));
return Promise.resolve(make202('stream-closed-id'));
});
// Stream closed without model_ready → should not wait the full 31s timeout
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook');
expect(id).toBe('stream-closed-id');
});
});
// ── cancelJob ─────────────────────────────────────────────────────────────────
describe('cancelJob', () => {
it('sends DELETE to the correct whisper job URL', async () => {
mocks.fetch.mockResolvedValue({ ok: true, status: 200 });
await cancelJob('whisper-job-abc');
expect(mocks.fetch).toHaveBeenCalledWith(
expect.stringContaining('/jobs/whisper-job-abc'),
expect.objectContaining({ method: 'DELETE' })
);
});
it('uses the configured WHISPER_URL', async () => {
vi.stubEnv('WHISPER_URL', 'http://gpu-box:9090');
mocks.fetch.mockResolvedValue({ ok: true, status: 200 });
await cancelJob('job-xyz');
expect(mocks.fetch).toHaveBeenCalledWith(
'http://gpu-box:9090/jobs/job-xyz',
expect.objectContaining({ method: 'DELETE' })
);
vi.unstubAllEnvs();
});
it('swallows errors silently (best-effort)', async () => {
mocks.fetch.mockRejectedValue(new Error('Connection refused'));
await expect(cancelJob('dead-job')).resolves.not.toThrow();
}); });
}); });