import { createServer } from 'node:http'; import { JobQueue, type JobData, type QueueConfig, type QueueStreamEvent, type StreamOptions } from '../../src/index.js'; import { cleanupDir, createDbPath, createTempDir, waitFor } from '../helpers.js'; interface HarnessStream { events: QueueStreamEvent[]; eventNames: string[]; stop: () => Promise; } interface WebhookCapture { event: string; body: string; payload: unknown; } function parseSseChunk(chunk: string): Array<{ event: string; payload: QueueStreamEvent }> { return chunk .split('\n\n') .map((block) => block.trim()) .filter(Boolean) .map((block) => { const lines = block.split('\n'); const event = lines.find((line) => line.startsWith('event: '))?.slice(7); const data = lines.find((line) => line.startsWith('data: '))?.slice(6); if (!event || !data) { throw new Error(`Unable to parse SSE block: ${block}`); } return { event, payload: JSON.parse(data) as QueueStreamEvent, }; }); } export class JobQueueHarness { public readonly dir = createTempDir('jobqueue-e2e-'); public readonly webhooks: WebhookCapture[] = []; private queue: JobQueue | null = null; private server: | ReturnType | null = null; private webhookUrl: string | null = null; private readonly streams: Array<{ stop: () => Promise }> = []; public async start( config: Omit, 'dbPath'>, options: { webhookDelayMs?: number } = {}, ): Promise> { if (config.webhook) { await this.startWebhookServer(options.webhookDelayMs ?? 0); } this.queue = new JobQueue({ ...config, dbPath: createDbPath(this.dir), webhook: config.webhook && this.webhookUrl ? { ...config.webhook, url: config.webhook.url ?? this.webhookUrl, } : config.webhook, }); return this.queue; } public getQueue(): JobQueue { if (!this.queue) { throw new Error('Harness queue has not been started'); } return this.queue; } public async createStream(options: StreamOptions = {}): Promise> { const events: QueueStreamEvent[] = []; const eventNames: string[] = []; const reader = this.getQueue().createEventStream(options).getReader(); let active = true; const readLoop = (async () => { while (active) { const { value, done } = await reader.read(); if (done) { return; } if (!value) { continue; } const chunk = new TextDecoder().decode(value); for (const parsed of parseSseChunk(chunk)) { eventNames.push(parsed.event); events.push(parsed.payload); } } })(); const stop = async () => { active = false; await reader.cancel(); await readLoop; }; this.streams.push({ stop }); return { events, eventNames, stop }; } public async waitForJobStatus(id: string, status: string, timeoutMs = 4_000): Promise { await waitFor(() => this.getQueue().getJob(id)?.status === status, { timeoutMs }); } public async waitForJobDeletion(id: string, timeoutMs = 4_000): Promise { await waitFor(() => this.getQueue().getJob(id) === null, { timeoutMs }); } public async cleanup(): Promise { for (const stream of this.streams.splice(0)) { await stream.stop(); } if (this.queue) { await this.queue.shutdown(); this.queue = null; } if (this.server) { await new Promise((resolve, reject) => { this.server?.close((error) => { if (error) { reject(error); return; } resolve(); }); }); this.server = null; } cleanupDir(this.dir); } private async startWebhookServer(delayMs: number): Promise { this.server = createServer((request, response) => { let body = ''; request.on('data', (chunk) => { body += chunk.toString(); }); request.on('end', () => { setTimeout(() => { let payload: unknown = null; try { payload = JSON.parse(body); } catch { payload = body; } this.webhooks.push({ event: String(request.headers['x-jobqueue-event'] ?? ''), body, payload, }); response.writeHead(202).end(); }, delayMs); }); }); await new Promise((resolve) => { this.server?.listen(0, '127.0.0.1', () => resolve()); }); const address = this.server.address(); if (!address || typeof address === 'string') { throw new Error('Webhook server address unavailable'); } this.webhookUrl = `http://127.0.0.1:${address.port}/hook`; } }