diff --git a/src/lib/server/whisper.ts b/src/lib/server/whisper.ts index 8591176..ab307a0 100644 --- a/src/lib/server/whisper.ts +++ b/src/lib/server/whisper.ts @@ -4,8 +4,6 @@ function whisperUrl() { return process.env.WHISPER_URL ?? 'http://localhost:8080'; } -const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); - /** Get the current model state from whisper-rtx2080. */ export async function getModelStatus(): Promise { const { default: fetch } = await import('node-fetch'); @@ -16,11 +14,69 @@ export async function getModelStatus(): Promise { return res.json() as Promise; } +/** + * Wait for the whisper model to become ready. + * + * Subscribes to /model/events SSE and resolves as soon as a payload with + * state:"ready" arrives. Falls back to a plain timeout (`timeoutMs`) if the + * SSE connection fails or closes without that event, so the retry loop can + * try again without hanging indefinitely. + */ +async function waitForModelReady(timeoutMs: number): Promise { + const { default: fetch } = await import('node-fetch'); + const ac = new AbortController(); + return new Promise((resolve) => { + let done = false; + const finish = () => { + if (!done) { + done = true; + ac.abort(); + resolve(); + } + }; + + const timer = setTimeout(finish, timeoutMs); + + fetch(`${whisperUrl()}/model/events`, { signal: ac.signal as AbortSignal }) + .then(async (res) => { + if (!res.body) { + clearTimeout(timer); + return finish(); + } + let buf = ''; + for await (const chunk of res.body) { + if (ac.signal.aborted) break; + buf += chunk.toString(); + const lines = buf.split('\n'); + buf = lines.pop() ?? ''; + for (const line of lines) { + if (!line.startsWith('data:')) continue; + try { + const payload = JSON.parse(line.slice(5).trim()); + if (payload.state === 'ready') { + clearTimeout(timer); + finish(); + return; + } + } catch { /* ignore parse errors */ } + } + } + // Stream ended without model_ready → proceed to retry immediately + clearTimeout(timer); + finish(); + }) + .catch(() => { + // SSE unreachable — fallback timer will fire eventually + }); + }); +} + /** * Submit an audio file to whisper-rtx2080. Returns the whisper job id. * - * Handles 503 (model not ready) transparently: retries using the - * `Retry-After` header until the model loads or maxAttempts is exhausted. + * Handles 503 (model not ready) transparently: retries after subscribing to + * /model/events SSE — proceeds as soon as state:"ready" arrives, or after the + * Retry-After timeout elapses (whichever comes first). * Calls `onModelWaiting` on each 503 so the caller can surface the wait to the user. */ export async function submitJob( @@ -60,7 +116,7 @@ export async function submitJob( const state = body.state ?? 'unloaded'; const waitSecs = body.retry_after_secs ?? parseInt(res.headers.get('Retry-After') ?? '15'); onModelWaiting?.(state, waitSecs); - await sleep((waitSecs + 1) * 1000); + await waitForModelReady((waitSecs + 1) * 1000); continue; } @@ -71,6 +127,20 @@ export async function submitJob( throw new Error(`Whisper model did not become ready after ${maxAttempts} attempts`); } +/** + * Cancel a queued or running job on the whisper server (best-effort). + * Errors are silently ignored — local job status is already set to cancelled. + */ +export async function cancelJob(whisperJobId: string): Promise { + try { + const { default: fetch } = await import('node-fetch'); + await fetch(`${whisperUrl()}/jobs/${whisperJobId}`, { + method: 'DELETE', + signal: AbortSignal.timeout(5000) + }); + } catch { /* best-effort */ } +} + /** Open an SSE stream from whisper and call onProgress/onDone callbacks. */ export async function streamJob( whisperJobId: string, diff --git a/src/routes/api/jobs/[id]/+server.ts b/src/routes/api/jobs/[id]/+server.ts index 21af0cb..ab1f6e7 100644 --- a/src/routes/api/jobs/[id]/+server.ts +++ b/src/routes/api/jobs/[id]/+server.ts @@ -1,5 +1,6 @@ import { json, error } from '@sveltejs/kit'; import { getJob, setJobStatus, deleteJob } from '$lib/server/db.js'; +import { cancelJob } from '$lib/server/whisper.js'; import { rm } from 'fs/promises'; export async function GET({ params }) { @@ -17,6 +18,8 @@ export async function DELETE({ params }) { if (ACTIVE.has(job.status)) { // Cancel active job (keeps DB record) setJobStatus(params.id, 'cancelled', 0); + // Best-effort: tell whisper to drop the queued job so it stops using GPU + if (job.whisperJobId) cancelJob(job.whisperJobId).catch(() => {}); } else { // Hard-delete terminal job + clean up output files deleteJob(params.id); diff --git a/src/routes/api/webhook/[jobId]/+server.ts b/src/routes/api/webhook/[jobId]/+server.ts index d9d49dd..5db77a4 100644 --- a/src/routes/api/webhook/[jobId]/+server.ts +++ b/src/routes/api/webhook/[jobId]/+server.ts @@ -12,6 +12,11 @@ const jobId = params.jobId; const job = getJob(jobId); if (!job) throw error(404, 'Job not found'); +// Discard the result if the job was cancelled locally while whisper was running +if (job.status === 'cancelled') { +return json({ ok: true }); +} + const whisperJob = (await request.json()) as WhisperJob; if (whisperJob.status === 'failed' || whisperJob.status === 'cancelled') { diff --git a/src/tests/webhook.test.ts b/src/tests/webhook.test.ts index eb83e71..56a2cba 100644 --- a/src/tests/webhook.test.ts +++ b/src/tests/webhook.test.ts @@ -113,6 +113,25 @@ describe('POST /api/webhook/[jobId] — job not found', () => { }); }); +// ── Local cancellation guard ────────────────────────────────────────────────── + +describe('POST /api/webhook/[jobId] — locally cancelled job', () => { + it('returns ok without processing when the local job is already cancelled', async () => { + mockGetJob.mockReturnValue({ ...makeJob('job-lc'), status: 'cancelled' }); + const payload = makeWhisperJob({ status: 'done' }); + + const res = await POST(makeEvent('job-lc', payload) as any); + expect(res.status).toBe(200); + expect(await res.json()).toEqual({ ok: true }); + + // Must not touch outputs, status, or notifications + expect(mockSetJobStatus).not.toHaveBeenCalled(); + expect(mockUpdateJob).not.toHaveBeenCalled(); + expect(mockWriteOutputs).not.toHaveBeenCalled(); + expect(mockSendNotification).not.toHaveBeenCalled(); + }); +}); + // ── Whisper job failed / cancelled ─────────────────────────────────────────── describe('POST /api/webhook/[jobId] — whisper failure', () => { diff --git a/src/tests/whisper.test.ts b/src/tests/whisper.test.ts index dfc3de0..7a77824 100644 --- a/src/tests/whisper.test.ts +++ b/src/tests/whisper.test.ts @@ -21,7 +21,7 @@ vi.mock('form-data', () => ({ vi.mock('fs', () => ({ createReadStream: vi.fn(() => 'STREAM_PLACEHOLDER') })); -import { submitJob, streamJob, getModelStatus } from '$lib/server/whisper.js'; +import { submitJob, streamJob, getModelStatus, cancelJob } from '$lib/server/whisper.js'; afterEach(() => vi.clearAllMocks()); @@ -125,115 +125,121 @@ function make202(job_id: string) { return { status: 202, json: () => Promise.resolve({ job_id }) }; } +/** + * URL-aware fetch mock: /model/events calls resolve immediately with no body + * (causing waitForModelReady to call finish() right away, so the retry loop + * proceeds without any real timer delay), and /jobs calls consume the + * provided response queue in order. + */ +function makeJobFetch(...responses: object[]) { + let idx = 0; + return (url: string) => { + if (String(url).includes('/model/events')) { + return Promise.resolve({ ok: true, status: 200, body: null }); + } + return Promise.resolve(responses[idx++]); + }; +} + describe('submitJob — 503 retry behavior', () => { beforeEach(() => vi.useFakeTimers()); afterEach(() => vi.useRealTimers()); it('calls onModelWaiting with state and retryAfterSecs on first 503', async () => { - mocks.fetch - .mockResolvedValueOnce(make503('unloaded', 30)) - .mockResolvedValueOnce(make202('job-1')); + mocks.fetch.mockImplementation(makeJobFetch(make503('unloaded', 30), make202('job-1'))); const onModelWaiting = vi.fn(); - const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); - await vi.runAllTimersAsync(); - - await expect(p).resolves.toBe('job-1'); + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); + expect(id).toBe('job-1'); expect(onModelWaiting).toHaveBeenCalledOnce(); expect(onModelWaiting).toHaveBeenCalledWith('unloaded', 30); }); it('retries and returns job_id once model becomes ready', async () => { - mocks.fetch - .mockResolvedValueOnce(make503('loading', 10)) - .mockResolvedValueOnce(make202('ready-id')); + mocks.fetch.mockImplementation(makeJobFetch(make503('loading', 10), make202('ready-id'))); - const p = submitJob('/tmp/audio.wav', 'http://host/webhook'); - await vi.runAllTimersAsync(); - - await expect(p).resolves.toBe('ready-id'); - expect(mocks.fetch).toHaveBeenCalledTimes(2); + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook'); + expect(id).toBe('ready-id'); + const jobCalls = mocks.fetch.mock.calls.filter(([url]) => String(url).endsWith('/jobs')); + expect(jobCalls).toHaveLength(2); }); it('calls onModelWaiting once per 503, not on success', async () => { - mocks.fetch - .mockResolvedValueOnce(make503('loading', 0)) - .mockResolvedValueOnce(make503('loading', 0)) - .mockResolvedValueOnce(make202('final-id')); + mocks.fetch.mockImplementation( + makeJobFetch(make503('loading', 0), make503('loading', 0), make202('final-id')) + ); const onModelWaiting = vi.fn(); - const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10); - await vi.runAllTimersAsync(); - - await expect(p).resolves.toBe('final-id'); + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10); + expect(id).toBe('final-id'); expect(onModelWaiting).toHaveBeenCalledTimes(2); }); it('passes the correct state for each 503 response', async () => { - mocks.fetch - .mockResolvedValueOnce(make503('unloaded', 0)) - .mockResolvedValueOnce(make503('loading', 0)) - .mockResolvedValueOnce(make503('waiting_for_gpu', 0)) - .mockResolvedValueOnce(make202('job-x')); + mocks.fetch.mockImplementation( + makeJobFetch( + make503('unloaded', 0), + make503('loading', 0), + make503('waiting_for_gpu', 0), + make202('job-x') + ) + ); const onModelWaiting = vi.fn(); - const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10); - await vi.runAllTimersAsync(); - - await expect(p).resolves.toBe('job-x'); + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting, 10); + expect(id).toBe('job-x'); expect(onModelWaiting).toHaveBeenNthCalledWith(1, 'unloaded', 0); expect(onModelWaiting).toHaveBeenNthCalledWith(2, 'loading', 0); expect(onModelWaiting).toHaveBeenNthCalledWith(3, 'waiting_for_gpu', 0); }); it('falls back to Retry-After header when body lacks retry_after_secs', async () => { - // Body with no retry_after_secs — only header - mocks.fetch - .mockResolvedValueOnce({ - status: 503, - json: () => Promise.resolve({ state: 'loading' }), - headers: { get: (h: string) => (h.toLowerCase() === 'retry-after' ? '7' : null) } - }) - .mockResolvedValueOnce(make202('fallback-id')); + mocks.fetch.mockImplementation( + makeJobFetch( + { + status: 503, + json: () => Promise.resolve({ state: 'loading' }), + headers: { get: (h: string) => (h.toLowerCase() === 'retry-after' ? '7' : null) } + }, + make202('fallback-id') + ) + ); const onModelWaiting = vi.fn(); - const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); - await vi.runAllTimersAsync(); - - await expect(p).resolves.toBe('fallback-id'); + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); + expect(id).toBe('fallback-id'); expect(onModelWaiting).toHaveBeenCalledWith('loading', 7); }); it('falls back to 15s when both body and header are absent', async () => { - mocks.fetch - .mockResolvedValueOnce({ - status: 503, - json: () => Promise.resolve({ state: 'unloaded' }), - headers: { get: () => null } - }) - .mockResolvedValueOnce(make202('default-wait-id')); + mocks.fetch.mockImplementation( + makeJobFetch( + { + status: 503, + json: () => Promise.resolve({ state: 'unloaded' }), + headers: { get: () => null } + }, + make202('default-wait-id') + ) + ); const onModelWaiting = vi.fn(); - const p = submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); - await vi.runAllTimersAsync(); - - await expect(p).resolves.toBe('default-wait-id'); + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, onModelWaiting); + expect(id).toBe('default-wait-id'); expect(onModelWaiting).toHaveBeenCalledWith('unloaded', 15); }); it('throws after maxAttempts 503 responses', async () => { - mocks.fetch.mockResolvedValue(make503('loading', 0)); + mocks.fetch.mockImplementation( + makeJobFetch(make503('loading', 0), make503('loading', 0), make503('loading', 0)) + ); - // Attach .rejects handler BEFORE advancing timers so the rejection - // is always handled before Vitest's unhandled-rejection detector fires. - const expectation = expect( + await expect( submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, undefined, 3) ).rejects.toThrow(/did not become ready after 3 attempts/i); - await vi.runAllTimersAsync(); - await expectation; - - expect(mocks.fetch).toHaveBeenCalledTimes(3); + const jobCalls = mocks.fetch.mock.calls.filter(([url]) => String(url).endsWith('/jobs')); + expect(jobCalls).toHaveLength(3); }); it('does NOT call onModelWaiting for non-503 errors', async () => { @@ -262,14 +268,105 @@ describe('submitJob — 503 retry behavior', () => { }); it('works correctly without an onModelWaiting callback', async () => { - mocks.fetch - .mockResolvedValueOnce(make503('unloaded', 0)) - .mockResolvedValueOnce(make202('no-cb-id')); + mocks.fetch.mockImplementation(makeJobFetch(make503('unloaded', 0), make202('no-cb-id'))); + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook'); + expect(id).toBe('no-cb-id'); + }); +}); + +// ── submitJob — SSE-triggered retry ────────────────────────────────────────── + +describe('submitJob — SSE-triggered retry', () => { + beforeEach(() => vi.useFakeTimers()); + afterEach(() => vi.useRealTimers()); + + it('retries immediately when /model/events fires state:ready before Retry-After timeout', async () => { + let jobCallIdx = 0; + mocks.fetch.mockImplementation((url: string) => { + if (String(url).includes('/model/events')) { + // SSE stream that immediately emits model_ready + return Promise.resolve({ + ok: true, + status: 200, + body: Readable.from(['data: {"state":"ready"}\n\n']) + }); + } + jobCallIdx++; + if (jobCallIdx === 1) return Promise.resolve(make503('loading', 30)); + return Promise.resolve(make202('sse-triggered-id')); + }); + + // SSE fires model_ready before the 31s timeout — no timer advancement needed + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook'); + expect(id).toBe('sse-triggered-id'); + const sseCalls = mocks.fetch.mock.calls.filter(([url]) => + String(url).includes('/model/events') + ); + expect(sseCalls).toHaveLength(1); + }); + + it('falls back to Retry-After sleep when /model/events is unreachable', async () => { + let jobCallIdx = 0; + mocks.fetch.mockImplementation((url: string) => { + if (String(url).includes('/model/events')) { + return Promise.reject(new Error('Connection refused')); + } + jobCallIdx++; + if (jobCallIdx === 1) return Promise.resolve(make503('loading', 5)); + return Promise.resolve(make202('fallback-sleep-id')); + }); + + // SSE failed → must wait for the Retry-After timer const p = submitJob('/tmp/audio.wav', 'http://host/webhook'); await vi.runAllTimersAsync(); + await expect(p).resolves.toBe('fallback-sleep-id'); + }); - await expect(p).resolves.toBe('no-cb-id'); + it('proceeds immediately when SSE stream closes without model_ready', async () => { + let jobCallIdx = 0; + mocks.fetch.mockImplementation((url: string) => { + if (String(url).includes('/model/events')) { + // Empty stream — closes without emitting anything + return Promise.resolve({ ok: true, status: 200, body: Readable.from([]) }); + } + jobCallIdx++; + if (jobCallIdx === 1) return Promise.resolve(make503('loading', 30)); + return Promise.resolve(make202('stream-closed-id')); + }); + + // Stream closed without model_ready → should not wait the full 31s timeout + const id = await submitJob('/tmp/audio.wav', 'http://host/webhook'); + expect(id).toBe('stream-closed-id'); + }); +}); + +// ── cancelJob ───────────────────────────────────────────────────────────────── + +describe('cancelJob', () => { + it('sends DELETE to the correct whisper job URL', async () => { + mocks.fetch.mockResolvedValue({ ok: true, status: 200 }); + await cancelJob('whisper-job-abc'); + expect(mocks.fetch).toHaveBeenCalledWith( + expect.stringContaining('/jobs/whisper-job-abc'), + expect.objectContaining({ method: 'DELETE' }) + ); + }); + + it('uses the configured WHISPER_URL', async () => { + vi.stubEnv('WHISPER_URL', 'http://gpu-box:9090'); + mocks.fetch.mockResolvedValue({ ok: true, status: 200 }); + await cancelJob('job-xyz'); + expect(mocks.fetch).toHaveBeenCalledWith( + 'http://gpu-box:9090/jobs/job-xyz', + expect.objectContaining({ method: 'DELETE' }) + ); + vi.unstubAllEnvs(); + }); + + it('swallows errors silently (best-effort)', async () => { + mocks.fetch.mockRejectedValue(new Error('Connection refused')); + await expect(cancelJob('dead-job')).resolves.not.toThrow(); }); });