import { waitFor } from './helpers.js'; import { JobQueueHarness } from './e2e/JobQueueHarness.js'; describe('JobQueue e2e', () => { it('runs a multi-phase workflow with retry, SSE, and webhooks', async () => { const harness = new JobQueueHarness<{ url: string }>(); let processAttempts = 0; try { const queue = await harness.start( { phases: ['download', 'process', 'upload'], concurrency: 2, retry: { maxAttempts: 2, baseDelayMs: 5, classifyError: async (error) => error instanceof Error && error.message === 'recoverable' ? 'recoverable' : 'fatal', }, webhook: { events: ['job:retrying', 'job:completed'], }, }, { webhookDelayMs: 5 }, ); const stream = await harness.createStream({ includeSnapshot: false }); queue.handle('download', async (_job, ctx) => { await ctx.progress(100, 'downloaded'); return { filePath: '/tmp/video.mp4' }; }); queue.handle('process', async (_job, ctx) => { processAttempts += 1; const filePath = ctx.phaseResult<{ filePath: string }>('download')?.filePath; if (processAttempts === 1) { await ctx.progress(25, 'processing'); throw new Error('recoverable'); } await ctx.progress(100, 'processed'); return { outputPath: `${filePath}.json` }; }); queue.handle('upload', async (_job, ctx) => { const outputPath = ctx.phaseResult<{ outputPath: string }>('process')?.outputPath; await ctx.progress(100, 'uploaded'); return { uploaded: Boolean(outputPath) }; }); const jobId = await queue.enqueue({ url: 'https://example.com/video' }); await harness.waitForJobStatus(jobId, 'completed'); await waitFor(() => harness.webhooks.length >= 2); await stream.stop(); const job = queue.getJob(jobId); expect(job?.phaseResults.download).toEqual({ filePath: '/tmp/video.mp4' }); expect(job?.phaseResults.process).toEqual({ outputPath: '/tmp/video.mp4.json' }); expect(job?.phaseResults.upload).toEqual({ uploaded: true }); expect(harness.webhooks.map((entry) => entry.event)).toEqual(['job:retrying', 'job:completed']); expect(stream.eventNames).toContain('job:retrying'); expect(stream.eventNames).toContain('job:completed'); expect(stream.eventNames.filter((event) => event === 'job:phase:completed')).toHaveLength(3); } finally { await harness.cleanup(); } }); it('survives stale-job webhook completion after retention deletes the job', async () => { const harness = new JobQueueHarness<{ url: string }>(); try { const queue = await harness.start( { phases: ['run'], concurrency: 1, webhook: { events: ['job:stale'], }, retention: { staleAfterMs: 20, deleteAfterMs: 40, intervalMs: 10, }, }, { webhookDelayMs: 80 }, ); const stream = await harness.createStream({ includeSnapshot: false }); queue.handle('run', async () => ({ ok: true })); const firstJob = await queue.enqueue({ url: 'https://example.com/one' }); await harness.waitForJobStatus(firstJob, 'completed'); await harness.waitForJobDeletion(firstJob); await waitFor(() => harness.webhooks.length >= 1, { timeoutMs: 4_000 }); const secondJob = await queue.enqueue({ url: 'https://example.com/two' }); await harness.waitForJobStatus(secondJob, 'completed'); await stream.stop(); expect(harness.webhooks[0]?.event).toBe('job:stale'); expect(stream.eventNames).toContain('job:stale'); expect(stream.eventNames).toContain('job:deleted'); } finally { await harness.cleanup(); } }); });