import { createServer } from 'node:http'; import { JobQueue, SqliteStorage } from '../src/index.js'; import { cleanupDir, createDbPath, createTempDir, waitFor } from './helpers.js'; describe('JobQueue', () => { it('runs multi-phase jobs to completion', async () => { const dir = createTempDir(); const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['download', 'process'], concurrency: 1, }); const events: string[] = []; queue.on('job:started', () => events.push('started')); queue.on('job:phase:completed', (_, phase) => events.push(`phase:${phase.name}`)); queue.on('job:completed', () => events.push('completed')); queue.handle('download', async (_job, ctx) => { await ctx.progress(50, 'downloading'); return { filePath: '/tmp/video.mp4' }; }); queue.handle('process', async (_job, ctx) => { expect(ctx.phaseResult<{ filePath: string }>('download')?.filePath).toBe('/tmp/video.mp4'); await ctx.progress(25, 'processing'); return { outputPath: '/tmp/video.txt' }; }); try { const jobId = await queue.enqueue({ url: 'https://example.com/video' }); await waitFor(() => queue.getJob(jobId)?.status === 'completed'); const job = queue.getJob(jobId); expect(job?.status).toBe('completed'); expect(job?.phaseResults.download).toEqual({ filePath: '/tmp/video.mp4' }); expect(job?.phaseResults.process).toEqual({ outputPath: '/tmp/video.txt' }); expect(events).toEqual(['started', 'phase:download', 'phase:process', 'completed']); } finally { await queue.shutdown(); cleanupDir(dir); } }); it('retries recoverable failures and eventually completes', async () => { const dir = createTempDir(); const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['run'], concurrency: 1, retry: { maxAttempts: 3, baseDelayMs: 10, classifyError: async (error) => error instanceof Error && error.message === 'recoverable' ? 'recoverable' : 'fatal', }, }); let attempts = 0; let retries = 0; queue.on('job:retrying', () => { retries += 1; }); queue.handle('run', async () => { attempts += 1; if (attempts === 1) { throw new Error('recoverable'); } return { ok: true }; }); try { const jobId = await queue.enqueue({ url: 'https://example.com/video' }); await waitFor(() => queue.getJob(jobId)?.status === 'completed', { timeoutMs: 4_000 }); const job = queue.getJob(jobId); expect(job?.status).toBe('completed'); expect(job?.retryCount).toBe(1); expect(retries).toBe(1); } finally { await queue.shutdown(); cleanupDir(dir); } }); it('streams queue events as SSE', async () => { const dir = createTempDir(); const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['run'], concurrency: 1, }); const stream = queue.createEventStream({ includeSnapshot: false }); const reader = stream.getReader(); const chunks: string[] = []; const readPromise = (async () => { while (true) { const { value, done } = await reader.read(); if (done) { return; } if (value) { chunks.push(new TextDecoder().decode(value)); if (chunks.some((chunk) => chunk.includes('event: job:completed'))) { return; } } } })(); queue.handle('run', async (_job, ctx) => { await ctx.progress(100, 'done'); return { ok: true }; }); try { await queue.enqueue({ url: 'https://example.com/video' }); await Promise.race([ readPromise, waitFor(() => chunks.some((chunk) => chunk.includes('event: job:completed')), { timeoutMs: 4_000, }), ]); } finally { await reader.cancel(); await queue.shutdown(); cleanupDir(dir); } expect(chunks.join('\n')).toContain('event: job:completed'); expect(chunks.join('\n')).toContain('event: job:progress'); }); it('marks unfinished phases as cancelled when cancelling a pending job', async () => { const dir = createTempDir(); const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['download', 'process'], concurrency: 1, }); try { const jobId = await queue.enqueue( { url: 'https://example.com/video' }, { scheduledAt: new Date(Date.now() + 60_000) }, ); const cancelled = await queue.cancel(jobId); expect(cancelled.status).toBe('cancelled'); expect(cancelled.phases.map((phase) => phase.status)).toEqual(['cancelled', 'cancelled']); } finally { await queue.shutdown(); cleanupDir(dir); } }); it('emits job:cancelled once when cancelling after a phase completes', async () => { const dir = createTempDir(); const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['download', 'process'], concurrency: 1, }); let processStarted = false; let cancelledEvents = 0; queue.on('job:phase:completed', (job, phase) => { if (phase.name === 'download') { void queue.cancel(job.id); } }); queue.on('job:cancelled', () => { cancelledEvents += 1; }); queue.handle('download', async (_job, ctx) => { await ctx.progress(100, 'downloaded'); return { filePath: '/tmp/video.mp4' }; }); queue.handle('process', async () => { processStarted = true; return { outputPath: '/tmp/video.txt' }; }); try { const jobId = await queue.enqueue({ url: 'https://example.com/video' }); await waitFor(() => queue.getJob(jobId)?.status === 'cancelled'); expect(cancelledEvents).toBe(1); expect(processStarted).toBe(false); expect(queue.getJob(jobId)?.phases.map((phase) => phase.status)).toEqual([ 'completed', 'cancelled', ]); } finally { await queue.shutdown(); cleanupDir(dir); } }); it('waits for in-flight webhooks during shutdown', async () => { const dir = createTempDir(); const deliveries: string[] = []; const server = createServer((request, response) => { let body = ''; request.on('data', (chunk) => { body += chunk.toString(); }); request.on('end', () => { setTimeout(() => { deliveries.push(body); response.writeHead(202).end(); }, 50); }); }); await new Promise((resolve) => { server.listen(0, '127.0.0.1', () => resolve()); }); const address = server.address(); if (!address || typeof address === 'string') { throw new Error('Server address unavailable'); } const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['run'], concurrency: 1, webhook: { url: `http://127.0.0.1:${address.port}/hook`, events: ['job:completed'], }, }); let deliveredEvents = 0; queue.on('job:webhook:delivered', () => { deliveredEvents += 1; }); queue.handle('run', async () => ({ ok: true })); try { const jobId = await queue.enqueue({ url: 'https://example.com/video' }); await waitFor(() => queue.getJob(jobId)?.status === 'completed'); const startedAt = Date.now(); await queue.shutdown(); expect(Date.now() - startedAt).toBeGreaterThanOrEqual(40); expect(deliveredEvents).toBe(1); expect(deliveries).toHaveLength(1); } finally { await new Promise((resolve, reject) => { server.close((error) => { if (error) { reject(error); return; } resolve(); }); }); cleanupDir(dir); } }); it('cleans up listeners and storage when shutdown times out', async () => { const dir = createTempDir(); const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['run'], concurrency: 1, }); const internalQueue = queue as JobQueue<{ url: string }> & { storage: { close: () => void }; events: { removeAllListeners: () => void }; }; const closeSpy = vi.spyOn(internalQueue.storage, 'close'); const removeAllListenersSpy = vi.spyOn(internalQueue.events, 'removeAllListeners'); queue.handle( 'run', async (_job, ctx) => await new Promise((resolve) => { ctx.signal.addEventListener( 'abort', () => { resolve({ ok: false }); }, { once: true }, ); }), ); try { const jobId = await queue.enqueue({ url: 'https://example.com/video' }); await waitFor(() => queue.getJob(jobId)?.status === 'active'); await expect(queue.shutdown(10)).rejects.toThrow(/Timed out waiting for workers to drain/); expect(removeAllListenersSpy).toHaveBeenCalledTimes(1); expect(closeSpy).toHaveBeenCalledTimes(1); } finally { cleanupDir(dir); } }); it('preserves completed phase results when retrying from partial progress', async () => { const dir = createTempDir(); const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['download', 'process'], concurrency: 1, }); let downloadRuns = 0; let processRuns = 0; let resumedFilePath: string | undefined; queue.handle('download', async () => { downloadRuns += 1; return { filePath: '/tmp/video.mp4' }; }); queue.handle('process', async (_job, ctx) => { processRuns += 1; const filePath = ctx.phaseResult<{ filePath: string }>('download')?.filePath; if (processRuns === 1) { expect(filePath).toBe('/tmp/video.mp4'); throw new Error('fatal'); } resumedFilePath = filePath; return { outputPath: `${filePath}.json` }; }); try { const jobId = await queue.enqueue({ url: 'https://example.com/video' }); await waitFor(() => queue.getJob(jobId)?.status === 'failed'); await queue.retry(jobId, { fromStart: false }); await waitFor(() => queue.getJob(jobId)?.status === 'completed'); expect(downloadRuns).toBe(1); expect(processRuns).toBe(2); expect(resumedFilePath).toBe('/tmp/video.mp4'); expect(queue.getJob(jobId)?.phaseResults.download).toEqual({ filePath: '/tmp/video.mp4' }); } finally { await queue.shutdown(); cleanupDir(dir); } }); it('restores interrupted active jobs as failed with phase context on restart', async () => { const dir = createTempDir(); const dbPath = createDbPath(dir); const storage = new SqliteStorage<{ url: string }>(dbPath); try { const job = storage.createJob( 'job-1', { url: 'https://example.com/video' }, [ { name: 'run', status: 'pending', progress: 0, message: null, startedAt: null, completedAt: null, error: null, }, ], {}, 1, ); expect(storage.claimPendingJob(job.id)).toBe(true); storage.saveProgress( job.id, 'run', [ { name: 'run', status: 'active', progress: 25, message: 'working', startedAt: new Date().toISOString(), completedAt: null, error: null, }, ], 25, 'working', ); storage.close(); const restarted = new JobQueue<{ url: string }>({ dbPath, phases: ['run'], concurrency: 1, }); try { const restartedJob = restarted.getJob(job.id); expect(restartedJob?.status).toBe('failed'); expect(restartedJob?.error?.phase).toBe('run'); expect(restartedJob?.error?.attempt).toBe(1); } finally { await restarted.shutdown(); } } finally { cleanupDir(dir); } }); it('executes scheduled jobs when their wakeup time arrives', async () => { const dir = createTempDir(); const queue = new JobQueue<{ url: string }>({ dbPath: createDbPath(dir), phases: ['run'], concurrency: 1, }); const startedAt: number[] = []; const scheduledAt = Date.now() + 50; queue.handle('run', async () => { startedAt.push(Date.now()); return { ok: true }; }); try { const jobId = await queue.enqueue( { url: 'https://example.com/video' }, { scheduledAt: new Date(scheduledAt) }, ); await waitFor(() => queue.getJob(jobId)?.status === 'completed', { timeoutMs: 4_000 }); expect(startedAt).toHaveLength(1); expect(startedAt[0]).toBeGreaterThanOrEqual(scheduledAt - 10); } finally { await queue.shutdown(); cleanupDir(dir); } }); });