Files
jobqueue/src/JobQueue.ts
Giancarmine Salucci a9429e2118 fix: harden queue lifecycle and publish gate
- Preserve phase results on partial retry and keep interrupted phase
  context after restart.
- Avoid webhook bookkeeping crashes when retention deletes stale jobs.
- Add deeper unit, integration, and e2e coverage around queue seams.
- Require verify job to pass before publish runs.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-16 18:39:19 +02:00

781 lines
23 KiB
TypeScript

import { CancellationError } from './util/errors.js';
import { TypedEventBus } from './events/EventBus.js';
import { SseSerializer } from './events/SseSerializer.js';
import { WorkerPool } from './processor/WorkerPool.js';
import { PhaseRunner } from './processor/PhaseRunner.js';
import { RetryStrategy } from './retry/RetryStrategy.js';
import { SqliteStorage } from './storage/SqliteStorage.js';
import { RetentionScheduler } from './retention/RetentionScheduler.js';
import { WebhookDispatcher } from './webhook/WebhookDispatcher.js';
import { createJobId } from './util/id.js';
import type {
EnqueueOptions,
JobData,
JobFailure,
JobPhaseState,
JobProgressEvent,
JobQueueEvents,
JobRecord,
ListJobsOptions,
PhaseHandler,
QueueConfig,
QueueStreamEvent,
RetryOptions,
StreamOptions,
WebhookDispatchError,
WebhookEventName,
} from './types.js';
const DEFAULT_CONCURRENCY = 1;
const DEFAULT_SHUTDOWN_TIMEOUT_MS = 30_000;
function now(): string {
return new Date().toISOString();
}
function createInitialPhases(phaseNames: readonly string[]): JobPhaseState[] {
return phaseNames.map((name) => ({
name,
status: 'pending',
progress: 0,
message: null,
startedAt: null,
completedAt: null,
error: null,
}));
}
function clonePhases(phases: JobPhaseState[]): JobPhaseState[] {
return phases.map((phase) => ({ ...phase }));
}
function normalizeQueueConfig<TData extends JobData>(config: QueueConfig<TData>): QueueConfig<TData> {
const phases = [...new Set(config.phases)];
if (phases.length === 0) {
throw new Error('QueueConfig.phases must contain at least one phase');
}
return {
...config,
phases,
concurrency: config.concurrency ?? DEFAULT_CONCURRENCY,
shutdownTimeoutMs: config.shutdownTimeoutMs ?? DEFAULT_SHUTDOWN_TIMEOUT_MS,
};
}
export class JobQueue<TData extends JobData = JobData> {
private readonly config: QueueConfig<TData>;
private readonly storage: SqliteStorage<TData>;
private readonly events = new TypedEventBus<JobQueueEvents<TData>>();
private readonly retryStrategy: RetryStrategy<TData>;
private readonly workerPool: WorkerPool;
private readonly handlers = new Map<string, PhaseHandler<TData>>();
private readonly serializer = new SseSerializer();
private readonly controllers = new Map<string, AbortController>();
private readonly pendingWebhookDispatches = new Set<Promise<void>>();
private readonly webhookDispatcher?: WebhookDispatcher<TData>;
private readonly retentionScheduler?: RetentionScheduler<TData>;
private wakeupTimer: NodeJS.Timeout | null = null;
private closed = false;
private pumping = false;
private repumpRequested = false;
public constructor(queueConfig: QueueConfig<TData>) {
this.config = normalizeQueueConfig(queueConfig);
this.storage = new SqliteStorage<TData>(this.config.dbPath);
this.retryStrategy = new RetryStrategy(this.config.retry);
this.workerPool = new WorkerPool(this.config.concurrency ?? DEFAULT_CONCURRENCY);
this.webhookDispatcher = this.config.webhook
? new WebhookDispatcher<TData>(this.config.webhook)
: undefined;
this.storage.resetActiveJobs('Interrupted by process restart');
if (this.config.retention) {
this.retentionScheduler = new RetentionScheduler(this.config.retention, {
markStale: async (cutoffIso) => this.markStaleJobs(cutoffIso),
deleteStale: async (cutoffIso) => this.deleteStaleJobs(cutoffIso),
});
this.retentionScheduler.start();
}
this.requestPump();
}
public handle(phaseName: string, handler: PhaseHandler<TData>): this {
if (!this.config.phases.includes(phaseName)) {
throw new Error(`Phase "${phaseName}" is not defined in queue config`);
}
this.handlers.set(phaseName, handler);
return this;
}
public on<TName extends keyof JobQueueEvents<TData> & string>(
event: TName,
listener: (...args: JobQueueEvents<TData>[TName]) => void,
): () => void {
return this.events.on(event, listener);
}
public once<TName extends keyof JobQueueEvents<TData> & string>(
event: TName,
listener: (...args: JobQueueEvents<TData>[TName]) => void,
): () => void {
return this.events.once(event, listener);
}
public async enqueue(data: TData, options: EnqueueOptions = {}): Promise<string> {
this.ensureOpen();
const id = options.id ?? createJobId();
const job = this.storage.createJob(
id,
data,
createInitialPhases(this.config.phases),
options,
options.maxAttempts ?? this.retryStrategy.defaultMaxAttempts(),
);
this.events.emit('job:enqueued', job);
this.requestPump();
return id;
}
public getJob(id: string): JobRecord<TData> | null {
return this.storage.getJob(id);
}
public listJobs(options: ListJobsOptions = {}): JobRecord<TData>[] {
return this.storage.listJobs(options);
}
public async retry(id: string, options: RetryOptions = {}): Promise<JobRecord<TData>> {
this.ensureOpen();
const existing = this.requireJob(id);
if (!['failed', 'cancelled', 'stale'].includes(existing.status)) {
throw new Error(`Job ${id} cannot be retried from status ${existing.status}`);
}
const phases =
options.fromStart === false
? clonePhases(existing.phases).map<JobPhaseState>((phase) =>
phase.status === 'completed'
? phase
: {
...phase,
status: 'pending',
progress: 0,
message: null,
completedAt: null,
error: null,
},
)
: createInitialPhases(this.config.phases);
const retried = this.storage.resetForRetry(
id,
phases,
options.fromStart === false ? existing.phaseResults : {},
existing.maxAttempts,
options.scheduledAt instanceof Date ? options.scheduledAt.toISOString() : options.scheduledAt ?? null,
);
this.events.emit('job:enqueued', retried);
this.requestPump();
return retried;
}
public async cancel(id: string): Promise<JobRecord<TData>> {
const job = this.requireJob(id);
if (!['pending', 'active'].includes(job.status)) {
return job;
}
const phases = this.markUnfinishedPhasesCancelled(clonePhases(job.phases));
const controller = this.controllers.get(id);
controller?.abort(new CancellationError());
const cancelled = this.storage.cancelJob(id, phases);
this.events.emit('job:cancelled', cancelled);
this.queueWebhookDispatch('job:cancelled', cancelled);
this.requestPump();
return cancelled;
}
public createEventStream(options: StreamOptions = {}): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
const keepAliveMs = options.keepAliveMs ?? 30_000;
let cleanup = () => {};
return new ReadableStream<Uint8Array>({
start: (controller) => {
if (options.includeSnapshot !== false) {
if (options.jobId) {
const job = this.getJob(options.jobId);
if (job) {
this.enqueueStreamEvent(controller, encoder, 'snapshot', {
type: 'snapshot',
jobId: job.id,
job,
timestamp: now(),
});
}
} else {
for (const job of this.listJobs({ limit: 100 })) {
this.enqueueStreamEvent(controller, encoder, 'snapshot', {
type: 'snapshot',
jobId: job.id,
job,
timestamp: now(),
});
}
}
}
const unsubscribers = [
this.on('job:enqueued', (job) => this.pushJobEvent(controller, encoder, 'job:enqueued', job, options.jobId)),
this.on('job:started', (job) => this.pushJobEvent(controller, encoder, 'job:started', job, options.jobId)),
this.on('job:progress', (job, progress) => {
if (!this.matchesStreamJob(options.jobId, job.id)) {
return;
}
this.enqueueStreamEvent(controller, encoder, 'job:progress', {
type: 'job:progress',
jobId: job.id,
job,
progress,
timestamp: now(),
});
}),
this.on('job:phase:completed', (job, phase) => {
if (!this.matchesStreamJob(options.jobId, job.id)) {
return;
}
this.enqueueStreamEvent(controller, encoder, 'job:phase:completed', {
type: 'job:phase:completed',
jobId: job.id,
job,
phase,
timestamp: now(),
});
}),
this.on('job:completed', (job) => this.pushJobEvent(controller, encoder, 'job:completed', job, options.jobId)),
this.on('job:failed', (job, failure) => {
if (!this.matchesStreamJob(options.jobId, job.id)) {
return;
}
this.enqueueStreamEvent(controller, encoder, 'job:failed', {
type: 'job:failed',
jobId: job.id,
job,
failure,
timestamp: now(),
});
}),
this.on('job:retrying', (job, retry) => {
if (!this.matchesStreamJob(options.jobId, job.id)) {
return;
}
this.enqueueStreamEvent(controller, encoder, 'job:retrying', {
type: 'job:retrying',
jobId: job.id,
job,
retry,
timestamp: now(),
});
}),
this.on('job:cancelled', (job) => this.pushJobEvent(controller, encoder, 'job:cancelled', job, options.jobId)),
this.on('job:stale', (job) => this.pushJobEvent(controller, encoder, 'job:stale', job, options.jobId)),
this.on('job:deleted', (jobId) => {
if (!this.matchesStreamJob(options.jobId, jobId)) {
return;
}
this.enqueueStreamEvent(controller, encoder, 'job:deleted', {
type: 'job:deleted',
deletedJobId: jobId,
jobId,
timestamp: now(),
});
}),
this.on('job:webhook:delivered', (job, webhook) => {
if (!this.matchesStreamJob(options.jobId, job.id)) {
return;
}
this.enqueueStreamEvent(controller, encoder, 'job:webhook:delivered', {
type: 'job:webhook:delivered',
jobId: job.id,
job,
webhook,
timestamp: now(),
});
}),
this.on('job:webhook:failed', (job, webhook) => {
if (!this.matchesStreamJob(options.jobId, job.id)) {
return;
}
this.enqueueStreamEvent(controller, encoder, 'job:webhook:failed', {
type: 'job:webhook:failed',
jobId: job.id,
job,
webhook,
timestamp: now(),
});
}),
];
const keepAlive = setInterval(() => {
this.enqueueStreamEvent(controller, encoder, 'ping', {
type: 'ping',
timestamp: now(),
});
}, keepAliveMs);
keepAlive.unref?.();
cleanup = () => {
clearInterval(keepAlive);
for (const unsubscribe of unsubscribers) {
unsubscribe();
}
};
},
cancel: () => {
cleanup();
return undefined;
},
});
}
public async shutdown(timeoutMs = this.config.shutdownTimeoutMs): Promise<void> {
if (this.closed) {
return;
}
this.closed = true;
this.retentionScheduler?.stop();
if (this.wakeupTimer) {
clearTimeout(this.wakeupTimer);
this.wakeupTimer = null;
}
let drainError: unknown;
try {
await this.workerPool.drain(timeoutMs);
} catch (error) {
drainError = error;
for (const controller of this.controllers.values()) {
if (!controller.signal.aborted) {
controller.abort(new CancellationError('Job queue shut down before active jobs finished'));
}
}
try {
await this.workerPool.drain(250);
} catch {
// Best effort: cleanup still needs to run after timeout.
}
} finally {
await this.drainWebhookDispatches();
this.events.removeAllListeners();
this.storage.close();
}
if (drainError) {
throw drainError;
}
}
private async pump(): Promise<void> {
if (this.closed) {
return;
}
if (this.pumping) {
this.repumpRequested = true;
return;
}
this.pumping = true;
try {
do {
this.repumpRequested = false;
while (this.workerPool.hasCapacity()) {
const candidates = this.storage.listRunnableJobs(this.workerPool.availableSlots);
if (candidates.length === 0) {
break;
}
let dispatched = 0;
for (const candidate of candidates) {
if (!this.workerPool.hasCapacity()) {
break;
}
if (!this.storage.claimPendingJob(candidate.id)) {
continue;
}
const claimed = this.requireJob(candidate.id);
this.events.emit('job:started', claimed);
this.workerPool
.run(async () => this.processJob(claimed.id))
.finally(() => {
this.requestPump();
});
dispatched += 1;
}
if (dispatched === 0) {
break;
}
}
this.scheduleWakeup();
} while (this.repumpRequested);
} finally {
this.pumping = false;
}
}
private requestPump(): void {
if (this.closed) {
return;
}
void this.pump();
}
private scheduleWakeup(): void {
if (this.wakeupTimer) {
clearTimeout(this.wakeupTimer);
this.wakeupTimer = null;
}
const nextScheduledAt = this.storage.getNextScheduledAt();
if (!nextScheduledAt) {
return;
}
const delay = Math.max(new Date(nextScheduledAt).getTime() - Date.now(), 0);
this.wakeupTimer = setTimeout(() => {
this.wakeupTimer = null;
this.requestPump();
}, delay);
this.wakeupTimer.unref?.();
}
private async processJob(jobId: string): Promise<void> {
const job = this.requireJob(jobId);
const controller = new AbortController();
this.controllers.set(jobId, controller);
try {
const runner = new PhaseRunner<TData>({
handlers: this.handlers,
phases: this.config.phases,
onPhaseStarted: async (phaseName, phases) =>
this.storage.saveProgress(jobId, phaseName, phases, this.requireJob(jobId).progress, null),
onProgress: async (phaseName, phases, phaseProgress, overallProgress, message, details) => {
const updated = this.storage.saveProgress(jobId, phaseName, phases, overallProgress, message ?? null);
const progressEvent: JobProgressEvent = {
jobId,
phase: phaseName,
phaseProgress,
overallProgress,
message: message ?? null,
timestamp: now(),
...(details !== undefined ? { details } : {}),
};
this.events.emit('job:progress', updated, progressEvent);
},
onPhaseCompleted: async (phaseName, phases, phaseResults, overallProgress) => {
const updated = this.storage.savePhaseCompletion(
jobId,
phases,
phaseResults,
null,
overallProgress,
null,
);
const completedPhase = updated.phases.find((phase) => phase.name === phaseName);
if (!completedPhase) {
throw new Error(`Completed phase ${phaseName} not found`);
}
this.events.emit('job:phase:completed', updated, completedPhase);
return updated;
},
onCancelled: async (_phaseName, phases) => {
const current = this.requireJob(jobId);
if (current.status === 'cancelled') {
return;
}
const cancelled = this.storage.cancelJob(
jobId,
this.markUnfinishedPhasesCancelled(clonePhases(phases)),
);
this.events.emit('job:cancelled', cancelled);
this.queueWebhookDispatch('job:cancelled', cancelled);
},
});
const result = await runner.run(job, controller.signal);
const latest = this.requireJob(jobId);
if (latest.status === 'cancelled') {
return;
}
const completed = this.storage.completeJob(jobId, result.phases, result.phaseResults);
this.events.emit('job:completed', completed);
this.queueWebhookDispatch('job:completed', completed);
} catch (error) {
if (error instanceof CancellationError) {
const current = this.requireJob(jobId);
if (current.status !== 'cancelled') {
const cancelled = this.storage.cancelJob(
jobId,
this.markUnfinishedPhasesCancelled(clonePhases(current.phases)),
);
this.events.emit('job:cancelled', cancelled);
this.queueWebhookDispatch('job:cancelled', cancelled);
}
return;
}
await this.handleFailure(jobId, error);
} finally {
this.controllers.delete(jobId);
}
}
private async handleFailure(jobId: string, error: unknown): Promise<void> {
const current = this.requireJob(jobId);
const recoverability = await this.retryStrategy.shouldRetry(error, current);
const phaseName = current.currentPhase;
const phases = clonePhases(current.phases);
const activePhase = phaseName ? phases.find((phase) => phase.name === phaseName) : undefined;
if (activePhase) {
activePhase.error = error instanceof Error ? error.message : String(error);
}
const failure: JobFailure = {
message: error instanceof Error ? error.message : String(error),
phase: phaseName,
recoverable: recoverability.disposition === 'recoverable',
timestamp: now(),
attempt: current.retryCount + 1,
};
if (recoverability.retry && activePhase) {
activePhase.status = 'pending';
activePhase.progress = 0;
activePhase.message = null;
activePhase.completedAt = null;
const nextRunAt = new Date(Date.now() + recoverability.delayMs).toISOString();
const pending = this.storage.scheduleRetry(
jobId,
phases,
current.progress,
failure,
current.retryCount + 1,
nextRunAt,
);
const retry = {
jobId,
phase: phaseName,
attempt: pending.retryCount,
delayMs: recoverability.delayMs,
nextRunAt,
timestamp: now(),
};
this.events.emit('job:retrying', pending, retry);
this.queueWebhookDispatch('job:retrying', pending);
this.requestPump();
return;
}
if (activePhase) {
activePhase.status = 'failed';
activePhase.completedAt = now();
}
const failed = this.storage.failJob(jobId, phases, failure);
this.events.emit('job:failed', failed, failure);
this.queueWebhookDispatch('job:failed', failed);
}
private async markStaleJobs(cutoffIso: string): Promise<JobRecord<TData>[]> {
const staleJobs = this.storage.markStaleJobs(cutoffIso);
for (const job of staleJobs) {
if (this.config.retention?.onStale) {
await this.config.retention.onStale(job);
}
this.events.emit('job:stale', job);
this.queueWebhookDispatch('job:stale', job);
}
return staleJobs;
}
private async deleteStaleJobs(cutoffIso: string): Promise<JobRecord<TData>[]> {
const deletedJobs = this.storage.deleteStaleJobs(cutoffIso);
for (const job of deletedJobs) {
if (this.config.retention?.onDelete) {
await this.config.retention.onDelete(job);
}
this.events.emit('job:deleted', job.id);
}
return deletedJobs;
}
private async dispatchWebhook(event: WebhookEventName, job: JobRecord<TData>): Promise<void> {
if (!this.webhookDispatcher?.supports(event, job)) {
return;
}
try {
const result = await this.webhookDispatcher.dispatch(event, job);
this.storage.markWebhookSent(job.id);
const refreshed = this.getJob(job.id);
if (refreshed) {
this.events.emit('job:webhook:delivered', refreshed, result);
}
} catch (error) {
const dispatchError =
error instanceof Error && 'dispatchError' in error
? ((error as { dispatchError?: WebhookDispatchError }).dispatchError ?? {
event,
jobId: job.id,
message: error.message,
finalAttempt: 1,
})
: {
event,
jobId: job.id,
message: error instanceof Error ? error.message : String(error),
finalAttempt: 1,
};
const refreshed = this.getJob(job.id);
if (refreshed) {
this.events.emit('job:webhook:failed', refreshed, dispatchError);
}
}
}
private queueWebhookDispatch(event: WebhookEventName, job: JobRecord<TData>): void {
if (!this.webhookDispatcher?.supports(event, job)) {
return;
}
const pending = this.dispatchWebhook(event, job).finally(() => {
this.pendingWebhookDispatches.delete(pending);
});
this.pendingWebhookDispatches.add(pending);
}
private async drainWebhookDispatches(): Promise<void> {
while (this.pendingWebhookDispatches.size > 0) {
await Promise.allSettled([...this.pendingWebhookDispatches]);
}
}
private markUnfinishedPhasesCancelled(phases: JobPhaseState[]): JobPhaseState[] {
const cancelledAt = now();
return phases.map((phase) => {
if (phase.status === 'completed' || phase.status === 'failed' || phase.status === 'cancelled') {
return phase;
}
return {
...phase,
status: 'cancelled',
completedAt: phase.completedAt ?? cancelledAt,
error: null,
};
});
}
private pushJobEvent(
controller: ReadableStreamDefaultController<Uint8Array>,
encoder: { encode: (input?: string) => Uint8Array },
type: Exclude<QueueStreamEvent['type'], 'snapshot' | 'job:progress' | 'job:phase:completed' | 'job:failed' | 'job:retrying' | 'job:deleted' | 'job:webhook:delivered' | 'job:webhook:failed' | 'ping'>,
job: JobRecord<TData>,
jobIdFilter?: string,
): void {
if (!this.matchesStreamJob(jobIdFilter, job.id)) {
return;
}
this.enqueueStreamEvent(controller, encoder, type, {
type,
jobId: job.id,
job,
timestamp: now(),
});
}
private enqueueStreamEvent(
controller: ReadableStreamDefaultController<Uint8Array>,
encoder: { encode: (input?: string) => Uint8Array },
eventName: string,
payload: QueueStreamEvent<TData>,
): void {
controller.enqueue(encoder.encode(this.serializer.event(eventName, payload)));
}
private matchesStreamJob(jobIdFilter: string | undefined, jobId: string): boolean {
return !jobIdFilter || jobIdFilter === jobId;
}
private requireJob(id: string): JobRecord<TData> {
const job = this.storage.getJob(id);
if (!job) {
throw new Error(`Job ${id} not found`);
}
return job;
}
private ensureOpen(): void {
if (this.closed) {
throw new Error('JobQueue is shut down');
}
}
}