- Preserve phase results on partial retry and keep interrupted phase context after restart. - Avoid webhook bookkeeping crashes when retention deletes stale jobs. - Add deeper unit, integration, and e2e coverage around queue seams. - Require verify job to pass before publish runs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
188 lines
5.0 KiB
TypeScript
188 lines
5.0 KiB
TypeScript
import { createServer } from 'node:http';
|
|
|
|
import { JobQueue, type JobData, type QueueConfig, type QueueStreamEvent, type StreamOptions } from '../../src/index.js';
|
|
import { cleanupDir, createDbPath, createTempDir, waitFor } from '../helpers.js';
|
|
|
|
interface HarnessStream<TData extends JobData> {
|
|
events: QueueStreamEvent<TData>[];
|
|
eventNames: string[];
|
|
stop: () => Promise<void>;
|
|
}
|
|
|
|
interface WebhookCapture {
|
|
event: string;
|
|
body: string;
|
|
payload: unknown;
|
|
}
|
|
|
|
function parseSseChunk<TData extends JobData>(chunk: string): Array<{ event: string; payload: QueueStreamEvent<TData> }> {
|
|
return chunk
|
|
.split('\n\n')
|
|
.map((block) => block.trim())
|
|
.filter(Boolean)
|
|
.map((block) => {
|
|
const lines = block.split('\n');
|
|
const event = lines.find((line) => line.startsWith('event: '))?.slice(7);
|
|
const data = lines.find((line) => line.startsWith('data: '))?.slice(6);
|
|
|
|
if (!event || !data) {
|
|
throw new Error(`Unable to parse SSE block: ${block}`);
|
|
}
|
|
|
|
return {
|
|
event,
|
|
payload: JSON.parse(data) as QueueStreamEvent<TData>,
|
|
};
|
|
});
|
|
}
|
|
|
|
export class JobQueueHarness<TData extends JobData = JobData> {
|
|
public readonly dir = createTempDir('jobqueue-e2e-');
|
|
public readonly webhooks: WebhookCapture[] = [];
|
|
|
|
private queue: JobQueue<TData> | null = null;
|
|
private server:
|
|
| ReturnType<typeof createServer>
|
|
| null = null;
|
|
private webhookUrl: string | null = null;
|
|
private readonly streams: Array<{ stop: () => Promise<void> }> = [];
|
|
|
|
public async start(
|
|
config: Omit<QueueConfig<TData>, 'dbPath'>,
|
|
options: { webhookDelayMs?: number } = {},
|
|
): Promise<JobQueue<TData>> {
|
|
if (config.webhook) {
|
|
await this.startWebhookServer(options.webhookDelayMs ?? 0);
|
|
}
|
|
|
|
this.queue = new JobQueue<TData>({
|
|
...config,
|
|
dbPath: createDbPath(this.dir),
|
|
webhook:
|
|
config.webhook && this.webhookUrl
|
|
? {
|
|
...config.webhook,
|
|
url: config.webhook.url ?? this.webhookUrl,
|
|
}
|
|
: config.webhook,
|
|
});
|
|
|
|
return this.queue;
|
|
}
|
|
|
|
public getQueue(): JobQueue<TData> {
|
|
if (!this.queue) {
|
|
throw new Error('Harness queue has not been started');
|
|
}
|
|
|
|
return this.queue;
|
|
}
|
|
|
|
public async createStream(options: StreamOptions = {}): Promise<HarnessStream<TData>> {
|
|
const events: QueueStreamEvent<TData>[] = [];
|
|
const eventNames: string[] = [];
|
|
const reader = this.getQueue().createEventStream(options).getReader();
|
|
let active = true;
|
|
|
|
const readLoop = (async () => {
|
|
while (active) {
|
|
const { value, done } = await reader.read();
|
|
if (done) {
|
|
return;
|
|
}
|
|
|
|
if (!value) {
|
|
continue;
|
|
}
|
|
|
|
const chunk = new TextDecoder().decode(value);
|
|
for (const parsed of parseSseChunk<TData>(chunk)) {
|
|
eventNames.push(parsed.event);
|
|
events.push(parsed.payload);
|
|
}
|
|
}
|
|
})();
|
|
|
|
const stop = async () => {
|
|
active = false;
|
|
await reader.cancel();
|
|
await readLoop;
|
|
};
|
|
|
|
this.streams.push({ stop });
|
|
return { events, eventNames, stop };
|
|
}
|
|
|
|
public async waitForJobStatus(id: string, status: string, timeoutMs = 4_000): Promise<void> {
|
|
await waitFor(() => this.getQueue().getJob(id)?.status === status, { timeoutMs });
|
|
}
|
|
|
|
public async waitForJobDeletion(id: string, timeoutMs = 4_000): Promise<void> {
|
|
await waitFor(() => this.getQueue().getJob(id) === null, { timeoutMs });
|
|
}
|
|
|
|
public async cleanup(): Promise<void> {
|
|
for (const stream of this.streams.splice(0)) {
|
|
await stream.stop();
|
|
}
|
|
|
|
if (this.queue) {
|
|
await this.queue.shutdown();
|
|
this.queue = null;
|
|
}
|
|
|
|
if (this.server) {
|
|
await new Promise<void>((resolve, reject) => {
|
|
this.server?.close((error) => {
|
|
if (error) {
|
|
reject(error);
|
|
return;
|
|
}
|
|
|
|
resolve();
|
|
});
|
|
});
|
|
this.server = null;
|
|
}
|
|
|
|
cleanupDir(this.dir);
|
|
}
|
|
|
|
private async startWebhookServer(delayMs: number): Promise<void> {
|
|
this.server = createServer((request, response) => {
|
|
let body = '';
|
|
request.on('data', (chunk) => {
|
|
body += chunk.toString();
|
|
});
|
|
request.on('end', () => {
|
|
setTimeout(() => {
|
|
let payload: unknown = null;
|
|
try {
|
|
payload = JSON.parse(body);
|
|
} catch {
|
|
payload = body;
|
|
}
|
|
|
|
this.webhooks.push({
|
|
event: String(request.headers['x-jobqueue-event'] ?? ''),
|
|
body,
|
|
payload,
|
|
});
|
|
response.writeHead(202).end();
|
|
}, delayMs);
|
|
});
|
|
});
|
|
|
|
await new Promise<void>((resolve) => {
|
|
this.server?.listen(0, '127.0.0.1', () => resolve());
|
|
});
|
|
|
|
const address = this.server.address();
|
|
if (!address || typeof address === 'string') {
|
|
throw new Error('Webhook server address unavailable');
|
|
}
|
|
|
|
this.webhookUrl = `http://127.0.0.1:${address.port}/hook`;
|
|
}
|
|
}
|