- Preserve phase results on partial retry and keep interrupted phase context after restart. - Avoid webhook bookkeeping crashes when retention deletes stale jobs. - Add deeper unit, integration, and e2e coverage around queue seams. - Require verify job to pass before publish runs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
108 lines
3.8 KiB
TypeScript
108 lines
3.8 KiB
TypeScript
import { waitFor } from './helpers.js';
|
|
import { JobQueueHarness } from './e2e/JobQueueHarness.js';
|
|
|
|
describe('JobQueue e2e', () => {
|
|
it('runs a multi-phase workflow with retry, SSE, and webhooks', async () => {
|
|
const harness = new JobQueueHarness<{ url: string }>();
|
|
let processAttempts = 0;
|
|
|
|
try {
|
|
const queue = await harness.start(
|
|
{
|
|
phases: ['download', 'process', 'upload'],
|
|
concurrency: 2,
|
|
retry: {
|
|
maxAttempts: 2,
|
|
baseDelayMs: 5,
|
|
classifyError: async (error) =>
|
|
error instanceof Error && error.message === 'recoverable' ? 'recoverable' : 'fatal',
|
|
},
|
|
webhook: {
|
|
events: ['job:retrying', 'job:completed'],
|
|
},
|
|
},
|
|
{ webhookDelayMs: 5 },
|
|
);
|
|
const stream = await harness.createStream({ includeSnapshot: false });
|
|
|
|
queue.handle('download', async (_job, ctx) => {
|
|
await ctx.progress(100, 'downloaded');
|
|
return { filePath: '/tmp/video.mp4' };
|
|
});
|
|
queue.handle('process', async (_job, ctx) => {
|
|
processAttempts += 1;
|
|
const filePath = ctx.phaseResult<{ filePath: string }>('download')?.filePath;
|
|
|
|
if (processAttempts === 1) {
|
|
await ctx.progress(25, 'processing');
|
|
throw new Error('recoverable');
|
|
}
|
|
|
|
await ctx.progress(100, 'processed');
|
|
return { outputPath: `${filePath}.json` };
|
|
});
|
|
queue.handle('upload', async (_job, ctx) => {
|
|
const outputPath = ctx.phaseResult<{ outputPath: string }>('process')?.outputPath;
|
|
await ctx.progress(100, 'uploaded');
|
|
return { uploaded: Boolean(outputPath) };
|
|
});
|
|
|
|
const jobId = await queue.enqueue({ url: 'https://example.com/video' });
|
|
await harness.waitForJobStatus(jobId, 'completed');
|
|
await waitFor(() => harness.webhooks.length >= 2);
|
|
await stream.stop();
|
|
|
|
const job = queue.getJob(jobId);
|
|
expect(job?.phaseResults.download).toEqual({ filePath: '/tmp/video.mp4' });
|
|
expect(job?.phaseResults.process).toEqual({ outputPath: '/tmp/video.mp4.json' });
|
|
expect(job?.phaseResults.upload).toEqual({ uploaded: true });
|
|
expect(harness.webhooks.map((entry) => entry.event)).toEqual(['job:retrying', 'job:completed']);
|
|
expect(stream.eventNames).toContain('job:retrying');
|
|
expect(stream.eventNames).toContain('job:completed');
|
|
expect(stream.eventNames.filter((event) => event === 'job:phase:completed')).toHaveLength(3);
|
|
} finally {
|
|
await harness.cleanup();
|
|
}
|
|
});
|
|
|
|
it('survives stale-job webhook completion after retention deletes the job', async () => {
|
|
const harness = new JobQueueHarness<{ url: string }>();
|
|
|
|
try {
|
|
const queue = await harness.start(
|
|
{
|
|
phases: ['run'],
|
|
concurrency: 1,
|
|
webhook: {
|
|
events: ['job:stale'],
|
|
},
|
|
retention: {
|
|
staleAfterMs: 20,
|
|
deleteAfterMs: 40,
|
|
intervalMs: 10,
|
|
},
|
|
},
|
|
{ webhookDelayMs: 80 },
|
|
);
|
|
const stream = await harness.createStream({ includeSnapshot: false });
|
|
|
|
queue.handle('run', async () => ({ ok: true }));
|
|
|
|
const firstJob = await queue.enqueue({ url: 'https://example.com/one' });
|
|
await harness.waitForJobStatus(firstJob, 'completed');
|
|
await harness.waitForJobDeletion(firstJob);
|
|
await waitFor(() => harness.webhooks.length >= 1, { timeoutMs: 4_000 });
|
|
|
|
const secondJob = await queue.enqueue({ url: 'https://example.com/two' });
|
|
await harness.waitForJobStatus(secondJob, 'completed');
|
|
await stream.stop();
|
|
|
|
expect(harness.webhooks[0]?.event).toBe('job:stale');
|
|
expect(stream.eventNames).toContain('job:stale');
|
|
expect(stream.eventNames).toContain('job:deleted');
|
|
} finally {
|
|
await harness.cleanup();
|
|
}
|
|
});
|
|
});
|