fix: FormData stream exhausted on retry + undefined segments crash
Two bugs triggered together when the model was unloaded during a job: 1. submitJob() created FormData/createReadStream once outside the retry loop. After a 503, the audio ReadStream was consumed and subsequent retries sent an empty body to whisper, causing it to return segments:undefined. 2. webhook handler cast whisperJob.segments as Segment[] without guarding against undefined, so deduplicateSegments(undefined) crashed with 'Cannot read properties of undefined (reading 'map')' — stored as job.error. Fixes: - Move FormData + createReadStream inside the retry loop (fresh stream per attempt) - Use (whisperJob.segments ?? []) in webhook handler - Add Array.isArray guard at top of deduplicateSegments() as belt-and-suspenders Tests: - New: verifies createReadStream called once per attempt (3 attempts = 3 streams) - New: webhook handles segments:undefined without throwing - New: webhook handles segments:null without throwing - 150/150 passing Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -83,6 +83,7 @@ function ngramDedup(segments: Segment[]): Segment[] {
|
|||||||
// ── Full deduplication pipeline ──────────────────────────────────────────────
|
// ── Full deduplication pipeline ──────────────────────────────────────────────
|
||||||
|
|
||||||
export function deduplicateSegments(segments: Segment[]): Segment[] {
|
export function deduplicateSegments(segments: Segment[]): Segment[] {
|
||||||
|
if (!Array.isArray(segments)) return [];
|
||||||
// 1. Collapse repeats within each segment's text
|
// 1. Collapse repeats within each segment's text
|
||||||
let result = segments.map((s) => ({
|
let result = segments.map((s) => ({
|
||||||
...s,
|
...s,
|
||||||
|
|||||||
@@ -90,13 +90,16 @@ export async function submitJob(
|
|||||||
const { createReadStream } = await import('fs');
|
const { createReadStream } = await import('fs');
|
||||||
const { default: fetch } = await import('node-fetch');
|
const { default: fetch } = await import('node-fetch');
|
||||||
|
|
||||||
const form = new FormData();
|
|
||||||
form.append('audio', createReadStream(wavPath));
|
|
||||||
form.append('task', 'transcribe');
|
|
||||||
form.append('webhook_url', webhookUrl);
|
|
||||||
if (language) form.append('language', language);
|
|
||||||
|
|
||||||
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
|
||||||
|
// Recreate form with a fresh readable stream on every attempt.
|
||||||
|
// A consumed ReadStream cannot be rewound, so reusing it across retries
|
||||||
|
// would send an empty body to whisper after the first 503.
|
||||||
|
const form = new FormData();
|
||||||
|
form.append('audio', createReadStream(wavPath));
|
||||||
|
form.append('task', 'transcribe');
|
||||||
|
form.append('webhook_url', webhookUrl);
|
||||||
|
if (language) form.append('language', language);
|
||||||
|
|
||||||
const res = await fetch(`${whisperUrl()}/jobs`, {
|
const res = await fetch(`${whisperUrl()}/jobs`, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
body: form,
|
body: form,
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ try {
|
|||||||
setJobStatus(jobId, 'processing', 90);
|
setJobStatus(jobId, 'processing', 90);
|
||||||
emitProgress(jobId, { type: 'status', status: 'processing', progress: 90 });
|
emitProgress(jobId, { type: 'status', status: 'processing', progress: 90 });
|
||||||
|
|
||||||
const rawSegments = whisperJob.segments as Segment[];
|
const rawSegments = (whisperJob.segments ?? []) as Segment[];
|
||||||
const segments = deduplicateSegments(rawSegments);
|
const segments = deduplicateSegments(rawSegments);
|
||||||
|
|
||||||
const paths = await writeOutputs(segments, job.title, jobId);
|
const paths = await writeOutputs(segments, job.title, jobId);
|
||||||
|
|||||||
@@ -289,6 +289,34 @@ describe('POST /api/webhook/[jobId] — empty segments', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// ── Undefined / missing segments (model returned no segments field) ───────────
|
||||||
|
|
||||||
|
describe('POST /api/webhook/[jobId] — undefined segments', () => {
|
||||||
|
it('completes the job as done when segments field is absent from whisper payload', async () => {
|
||||||
|
mockGetJob.mockReturnValue(makeJob('job-noseg'));
|
||||||
|
// Simulate whisper returning a result without a segments field
|
||||||
|
const payload = { ...makeWhisperJob(), segments: undefined as unknown as never[] };
|
||||||
|
|
||||||
|
const res = await POST(makeEvent('job-noseg', payload) as any);
|
||||||
|
expect(res.status).toBe(200);
|
||||||
|
expect(await res.json()).toEqual({ ok: true });
|
||||||
|
expect(mockUpdateJob).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({ status: 'done', id: 'job-noseg' })
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not throw "cannot read properties of undefined" when segments is null', async () => {
|
||||||
|
mockGetJob.mockReturnValue(makeJob('job-nullseg'));
|
||||||
|
const payload = { ...makeWhisperJob(), segments: null as unknown as never[] };
|
||||||
|
|
||||||
|
// Must NOT throw — previously crashed with "Cannot read properties of undefined (reading 'map')"
|
||||||
|
await expect(POST(makeEvent('job-nullseg', payload) as any)).resolves.toBeDefined();
|
||||||
|
expect(mockUpdateJob).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({ status: 'done' })
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
// ── Internal error handling ───────────────────────────────────────────────────
|
// ── Internal error handling ───────────────────────────────────────────────────
|
||||||
|
|
||||||
describe('POST /api/webhook/[jobId] — internal errors', () => {
|
describe('POST /api/webhook/[jobId] — internal errors', () => {
|
||||||
|
|||||||
@@ -6,7 +6,8 @@ import { Readable } from 'stream';
|
|||||||
const mocks = vi.hoisted(() => ({
|
const mocks = vi.hoisted(() => ({
|
||||||
fetch: vi.fn(),
|
fetch: vi.fn(),
|
||||||
append: vi.fn(),
|
append: vi.fn(),
|
||||||
getHeaders: vi.fn(() => ({ 'content-type': 'multipart/form-data; boundary=test' }))
|
getHeaders: vi.fn(() => ({ 'content-type': 'multipart/form-data; boundary=test' })),
|
||||||
|
createReadStream: vi.fn(() => 'STREAM_PLACEHOLDER')
|
||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock('node-fetch', () => ({ default: mocks.fetch }));
|
vi.mock('node-fetch', () => ({ default: mocks.fetch }));
|
||||||
@@ -19,7 +20,7 @@ vi.mock('form-data', () => ({
|
|||||||
})
|
})
|
||||||
}));
|
}));
|
||||||
|
|
||||||
vi.mock('fs', () => ({ createReadStream: vi.fn(() => 'STREAM_PLACEHOLDER') }));
|
vi.mock('fs', () => ({ createReadStream: mocks.createReadStream }));
|
||||||
|
|
||||||
import { submitJob, streamJob, getModelStatus, cancelJob, unloadModel } from '$lib/server/whisper.js';
|
import { submitJob, streamJob, getModelStatus, cancelJob, unloadModel } from '$lib/server/whisper.js';
|
||||||
|
|
||||||
@@ -255,6 +256,20 @@ describe('submitJob — 503 retry behavior', () => {
|
|||||||
expect(onModelWaiting).not.toHaveBeenCalled();
|
expect(onModelWaiting).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('creates a fresh ReadStream for each attempt (stream not reused across retries)', async () => {
|
||||||
|
mocks.fetch.mockImplementation(
|
||||||
|
makeJobFetch(make503('loading', 0), make503('loading', 0), make202('fresh-stream-id'))
|
||||||
|
);
|
||||||
|
|
||||||
|
const id = await submitJob('/tmp/audio.wav', 'http://host/webhook', undefined, undefined, 10);
|
||||||
|
expect(id).toBe('fresh-stream-id');
|
||||||
|
// 3 attempts → 3 separate createReadStream calls, one fresh stream per form
|
||||||
|
expect(mocks.createReadStream).toHaveBeenCalledTimes(3);
|
||||||
|
expect(mocks.createReadStream).toHaveBeenNthCalledWith(1, '/tmp/audio.wav');
|
||||||
|
expect(mocks.createReadStream).toHaveBeenNthCalledWith(2, '/tmp/audio.wav');
|
||||||
|
expect(mocks.createReadStream).toHaveBeenNthCalledWith(3, '/tmp/audio.wav');
|
||||||
|
});
|
||||||
|
|
||||||
it('does NOT retry on non-503 errors (throws immediately)', async () => {
|
it('does NOT retry on non-503 errors (throws immediately)', async () => {
|
||||||
mocks.fetch.mockResolvedValue({
|
mocks.fetch.mockResolvedValue({
|
||||||
status: 400,
|
status: 400,
|
||||||
|
|||||||
Reference in New Issue
Block a user