- Preserve phase results on partial retry and keep interrupted phase context after restart. - Avoid webhook bookkeeping crashes when retention deletes stale jobs. - Add deeper unit, integration, and e2e coverage around queue seams. - Require verify job to pass before publish runs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
449 lines
13 KiB
TypeScript
449 lines
13 KiB
TypeScript
import { createServer } from 'node:http';
|
|
|
|
import { JobQueue, SqliteStorage } from '../src/index.js';
|
|
import { cleanupDir, createDbPath, createTempDir, waitFor } from './helpers.js';
|
|
|
|
describe('JobQueue', () => {
|
|
it('runs multi-phase jobs to completion', async () => {
|
|
const dir = createTempDir();
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['download', 'process'],
|
|
concurrency: 1,
|
|
});
|
|
const events: string[] = [];
|
|
|
|
queue.on('job:started', () => events.push('started'));
|
|
queue.on('job:phase:completed', (_, phase) => events.push(`phase:${phase.name}`));
|
|
queue.on('job:completed', () => events.push('completed'));
|
|
|
|
queue.handle('download', async (_job, ctx) => {
|
|
await ctx.progress(50, 'downloading');
|
|
return { filePath: '/tmp/video.mp4' };
|
|
});
|
|
|
|
queue.handle('process', async (_job, ctx) => {
|
|
expect(ctx.phaseResult<{ filePath: string }>('download')?.filePath).toBe('/tmp/video.mp4');
|
|
await ctx.progress(25, 'processing');
|
|
return { outputPath: '/tmp/video.txt' };
|
|
});
|
|
|
|
try {
|
|
const jobId = await queue.enqueue({ url: 'https://example.com/video' });
|
|
await waitFor(() => queue.getJob(jobId)?.status === 'completed');
|
|
|
|
const job = queue.getJob(jobId);
|
|
expect(job?.status).toBe('completed');
|
|
expect(job?.phaseResults.download).toEqual({ filePath: '/tmp/video.mp4' });
|
|
expect(job?.phaseResults.process).toEqual({ outputPath: '/tmp/video.txt' });
|
|
expect(events).toEqual(['started', 'phase:download', 'phase:process', 'completed']);
|
|
} finally {
|
|
await queue.shutdown();
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
|
|
it('retries recoverable failures and eventually completes', async () => {
|
|
const dir = createTempDir();
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['run'],
|
|
concurrency: 1,
|
|
retry: {
|
|
maxAttempts: 3,
|
|
baseDelayMs: 10,
|
|
classifyError: async (error) =>
|
|
error instanceof Error && error.message === 'recoverable' ? 'recoverable' : 'fatal',
|
|
},
|
|
});
|
|
let attempts = 0;
|
|
let retries = 0;
|
|
|
|
queue.on('job:retrying', () => {
|
|
retries += 1;
|
|
});
|
|
|
|
queue.handle('run', async () => {
|
|
attempts += 1;
|
|
if (attempts === 1) {
|
|
throw new Error('recoverable');
|
|
}
|
|
|
|
return { ok: true };
|
|
});
|
|
|
|
try {
|
|
const jobId = await queue.enqueue({ url: 'https://example.com/video' });
|
|
await waitFor(() => queue.getJob(jobId)?.status === 'completed', { timeoutMs: 4_000 });
|
|
|
|
const job = queue.getJob(jobId);
|
|
expect(job?.status).toBe('completed');
|
|
expect(job?.retryCount).toBe(1);
|
|
expect(retries).toBe(1);
|
|
} finally {
|
|
await queue.shutdown();
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
|
|
it('streams queue events as SSE', async () => {
|
|
const dir = createTempDir();
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['run'],
|
|
concurrency: 1,
|
|
});
|
|
const stream = queue.createEventStream({ includeSnapshot: false });
|
|
const reader = stream.getReader();
|
|
const chunks: string[] = [];
|
|
const readPromise = (async () => {
|
|
while (true) {
|
|
const { value, done } = await reader.read();
|
|
if (done) {
|
|
return;
|
|
}
|
|
|
|
if (value) {
|
|
chunks.push(new TextDecoder().decode(value));
|
|
if (chunks.some((chunk) => chunk.includes('event: job:completed'))) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
})();
|
|
|
|
queue.handle('run', async (_job, ctx) => {
|
|
await ctx.progress(100, 'done');
|
|
return { ok: true };
|
|
});
|
|
|
|
try {
|
|
await queue.enqueue({ url: 'https://example.com/video' });
|
|
await Promise.race([
|
|
readPromise,
|
|
waitFor(() => chunks.some((chunk) => chunk.includes('event: job:completed')), {
|
|
timeoutMs: 4_000,
|
|
}),
|
|
]);
|
|
} finally {
|
|
await reader.cancel();
|
|
await queue.shutdown();
|
|
cleanupDir(dir);
|
|
}
|
|
|
|
expect(chunks.join('\n')).toContain('event: job:completed');
|
|
expect(chunks.join('\n')).toContain('event: job:progress');
|
|
});
|
|
|
|
it('marks unfinished phases as cancelled when cancelling a pending job', async () => {
|
|
const dir = createTempDir();
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['download', 'process'],
|
|
concurrency: 1,
|
|
});
|
|
|
|
try {
|
|
const jobId = await queue.enqueue(
|
|
{ url: 'https://example.com/video' },
|
|
{ scheduledAt: new Date(Date.now() + 60_000) },
|
|
);
|
|
const cancelled = await queue.cancel(jobId);
|
|
|
|
expect(cancelled.status).toBe('cancelled');
|
|
expect(cancelled.phases.map((phase) => phase.status)).toEqual(['cancelled', 'cancelled']);
|
|
} finally {
|
|
await queue.shutdown();
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
|
|
it('emits job:cancelled once when cancelling after a phase completes', async () => {
|
|
const dir = createTempDir();
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['download', 'process'],
|
|
concurrency: 1,
|
|
});
|
|
let processStarted = false;
|
|
let cancelledEvents = 0;
|
|
|
|
queue.on('job:phase:completed', (job, phase) => {
|
|
if (phase.name === 'download') {
|
|
void queue.cancel(job.id);
|
|
}
|
|
});
|
|
queue.on('job:cancelled', () => {
|
|
cancelledEvents += 1;
|
|
});
|
|
|
|
queue.handle('download', async (_job, ctx) => {
|
|
await ctx.progress(100, 'downloaded');
|
|
return { filePath: '/tmp/video.mp4' };
|
|
});
|
|
queue.handle('process', async () => {
|
|
processStarted = true;
|
|
return { outputPath: '/tmp/video.txt' };
|
|
});
|
|
|
|
try {
|
|
const jobId = await queue.enqueue({ url: 'https://example.com/video' });
|
|
await waitFor(() => queue.getJob(jobId)?.status === 'cancelled');
|
|
|
|
expect(cancelledEvents).toBe(1);
|
|
expect(processStarted).toBe(false);
|
|
expect(queue.getJob(jobId)?.phases.map((phase) => phase.status)).toEqual([
|
|
'completed',
|
|
'cancelled',
|
|
]);
|
|
} finally {
|
|
await queue.shutdown();
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
|
|
it('waits for in-flight webhooks during shutdown', async () => {
|
|
const dir = createTempDir();
|
|
const deliveries: string[] = [];
|
|
const server = createServer((request, response) => {
|
|
let body = '';
|
|
request.on('data', (chunk) => {
|
|
body += chunk.toString();
|
|
});
|
|
request.on('end', () => {
|
|
setTimeout(() => {
|
|
deliveries.push(body);
|
|
response.writeHead(202).end();
|
|
}, 50);
|
|
});
|
|
});
|
|
|
|
await new Promise<void>((resolve) => {
|
|
server.listen(0, '127.0.0.1', () => resolve());
|
|
});
|
|
|
|
const address = server.address();
|
|
if (!address || typeof address === 'string') {
|
|
throw new Error('Server address unavailable');
|
|
}
|
|
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['run'],
|
|
concurrency: 1,
|
|
webhook: {
|
|
url: `http://127.0.0.1:${address.port}/hook`,
|
|
events: ['job:completed'],
|
|
},
|
|
});
|
|
let deliveredEvents = 0;
|
|
|
|
queue.on('job:webhook:delivered', () => {
|
|
deliveredEvents += 1;
|
|
});
|
|
queue.handle('run', async () => ({ ok: true }));
|
|
|
|
try {
|
|
const jobId = await queue.enqueue({ url: 'https://example.com/video' });
|
|
await waitFor(() => queue.getJob(jobId)?.status === 'completed');
|
|
|
|
const startedAt = Date.now();
|
|
await queue.shutdown();
|
|
|
|
expect(Date.now() - startedAt).toBeGreaterThanOrEqual(40);
|
|
expect(deliveredEvents).toBe(1);
|
|
expect(deliveries).toHaveLength(1);
|
|
} finally {
|
|
await new Promise<void>((resolve, reject) => {
|
|
server.close((error) => {
|
|
if (error) {
|
|
reject(error);
|
|
return;
|
|
}
|
|
|
|
resolve();
|
|
});
|
|
});
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
|
|
it('cleans up listeners and storage when shutdown times out', async () => {
|
|
const dir = createTempDir();
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['run'],
|
|
concurrency: 1,
|
|
});
|
|
const internalQueue = queue as JobQueue<{ url: string }> & {
|
|
storage: { close: () => void };
|
|
events: { removeAllListeners: () => void };
|
|
};
|
|
const closeSpy = vi.spyOn(internalQueue.storage, 'close');
|
|
const removeAllListenersSpy = vi.spyOn(internalQueue.events, 'removeAllListeners');
|
|
|
|
queue.handle(
|
|
'run',
|
|
async (_job, ctx) =>
|
|
await new Promise((resolve) => {
|
|
ctx.signal.addEventListener(
|
|
'abort',
|
|
() => {
|
|
resolve({ ok: false });
|
|
},
|
|
{ once: true },
|
|
);
|
|
}),
|
|
);
|
|
|
|
try {
|
|
const jobId = await queue.enqueue({ url: 'https://example.com/video' });
|
|
await waitFor(() => queue.getJob(jobId)?.status === 'active');
|
|
|
|
await expect(queue.shutdown(10)).rejects.toThrow(/Timed out waiting for workers to drain/);
|
|
expect(removeAllListenersSpy).toHaveBeenCalledTimes(1);
|
|
expect(closeSpy).toHaveBeenCalledTimes(1);
|
|
} finally {
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
|
|
it('preserves completed phase results when retrying from partial progress', async () => {
|
|
const dir = createTempDir();
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['download', 'process'],
|
|
concurrency: 1,
|
|
});
|
|
let downloadRuns = 0;
|
|
let processRuns = 0;
|
|
let resumedFilePath: string | undefined;
|
|
|
|
queue.handle('download', async () => {
|
|
downloadRuns += 1;
|
|
return { filePath: '/tmp/video.mp4' };
|
|
});
|
|
queue.handle('process', async (_job, ctx) => {
|
|
processRuns += 1;
|
|
const filePath = ctx.phaseResult<{ filePath: string }>('download')?.filePath;
|
|
|
|
if (processRuns === 1) {
|
|
expect(filePath).toBe('/tmp/video.mp4');
|
|
throw new Error('fatal');
|
|
}
|
|
|
|
resumedFilePath = filePath;
|
|
return { outputPath: `${filePath}.json` };
|
|
});
|
|
|
|
try {
|
|
const jobId = await queue.enqueue({ url: 'https://example.com/video' });
|
|
await waitFor(() => queue.getJob(jobId)?.status === 'failed');
|
|
|
|
await queue.retry(jobId, { fromStart: false });
|
|
await waitFor(() => queue.getJob(jobId)?.status === 'completed');
|
|
|
|
expect(downloadRuns).toBe(1);
|
|
expect(processRuns).toBe(2);
|
|
expect(resumedFilePath).toBe('/tmp/video.mp4');
|
|
expect(queue.getJob(jobId)?.phaseResults.download).toEqual({ filePath: '/tmp/video.mp4' });
|
|
} finally {
|
|
await queue.shutdown();
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
|
|
it('restores interrupted active jobs as failed with phase context on restart', async () => {
|
|
const dir = createTempDir();
|
|
const dbPath = createDbPath(dir);
|
|
const storage = new SqliteStorage<{ url: string }>(dbPath);
|
|
|
|
try {
|
|
const job = storage.createJob(
|
|
'job-1',
|
|
{ url: 'https://example.com/video' },
|
|
[
|
|
{
|
|
name: 'run',
|
|
status: 'pending',
|
|
progress: 0,
|
|
message: null,
|
|
startedAt: null,
|
|
completedAt: null,
|
|
error: null,
|
|
},
|
|
],
|
|
{},
|
|
1,
|
|
);
|
|
|
|
expect(storage.claimPendingJob(job.id)).toBe(true);
|
|
storage.saveProgress(
|
|
job.id,
|
|
'run',
|
|
[
|
|
{
|
|
name: 'run',
|
|
status: 'active',
|
|
progress: 25,
|
|
message: 'working',
|
|
startedAt: new Date().toISOString(),
|
|
completedAt: null,
|
|
error: null,
|
|
},
|
|
],
|
|
25,
|
|
'working',
|
|
);
|
|
storage.close();
|
|
|
|
const restarted = new JobQueue<{ url: string }>({
|
|
dbPath,
|
|
phases: ['run'],
|
|
concurrency: 1,
|
|
});
|
|
|
|
try {
|
|
const restartedJob = restarted.getJob(job.id);
|
|
expect(restartedJob?.status).toBe('failed');
|
|
expect(restartedJob?.error?.phase).toBe('run');
|
|
expect(restartedJob?.error?.attempt).toBe(1);
|
|
} finally {
|
|
await restarted.shutdown();
|
|
}
|
|
} finally {
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
|
|
it('executes scheduled jobs when their wakeup time arrives', async () => {
|
|
const dir = createTempDir();
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: createDbPath(dir),
|
|
phases: ['run'],
|
|
concurrency: 1,
|
|
});
|
|
const startedAt: number[] = [];
|
|
const scheduledAt = Date.now() + 50;
|
|
|
|
queue.handle('run', async () => {
|
|
startedAt.push(Date.now());
|
|
return { ok: true };
|
|
});
|
|
|
|
try {
|
|
const jobId = await queue.enqueue(
|
|
{ url: 'https://example.com/video' },
|
|
{ scheduledAt: new Date(scheduledAt) },
|
|
);
|
|
await waitFor(() => queue.getJob(jobId)?.status === 'completed', { timeoutMs: 4_000 });
|
|
|
|
expect(startedAt).toHaveLength(1);
|
|
expect(startedAt[0]).toBeGreaterThanOrEqual(scheduledAt - 10);
|
|
} finally {
|
|
await queue.shutdown();
|
|
cleanupDir(dir);
|
|
}
|
|
});
|
|
});
|