feat: add reusable jobqueue library
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
721
src/JobQueue.ts
Normal file
721
src/JobQueue.ts
Normal file
@@ -0,0 +1,721 @@
|
||||
import { CancellationError } from './util/errors.js';
|
||||
import { TypedEventBus } from './events/EventBus.js';
|
||||
import { SseSerializer } from './events/SseSerializer.js';
|
||||
import { WorkerPool } from './processor/WorkerPool.js';
|
||||
import { PhaseRunner } from './processor/PhaseRunner.js';
|
||||
import { RetryStrategy } from './retry/RetryStrategy.js';
|
||||
import { SqliteStorage } from './storage/SqliteStorage.js';
|
||||
import { RetentionScheduler } from './retention/RetentionScheduler.js';
|
||||
import { WebhookDispatcher } from './webhook/WebhookDispatcher.js';
|
||||
import { createJobId } from './util/id.js';
|
||||
|
||||
import type {
|
||||
EnqueueOptions,
|
||||
JobData,
|
||||
JobFailure,
|
||||
JobPhaseState,
|
||||
JobProgressEvent,
|
||||
JobQueueEvents,
|
||||
JobRecord,
|
||||
ListJobsOptions,
|
||||
PhaseHandler,
|
||||
QueueConfig,
|
||||
QueueStreamEvent,
|
||||
RetryOptions,
|
||||
StreamOptions,
|
||||
WebhookDispatchError,
|
||||
WebhookEventName,
|
||||
} from './types.js';
|
||||
|
||||
const DEFAULT_CONCURRENCY = 1;
|
||||
const DEFAULT_SHUTDOWN_TIMEOUT_MS = 30_000;
|
||||
|
||||
function now(): string {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function createInitialPhases(phaseNames: readonly string[]): JobPhaseState[] {
|
||||
return phaseNames.map((name) => ({
|
||||
name,
|
||||
status: 'pending',
|
||||
progress: 0,
|
||||
message: null,
|
||||
startedAt: null,
|
||||
completedAt: null,
|
||||
error: null,
|
||||
}));
|
||||
}
|
||||
|
||||
function clonePhases(phases: JobPhaseState[]): JobPhaseState[] {
|
||||
return phases.map((phase) => ({ ...phase }));
|
||||
}
|
||||
|
||||
function normalizeQueueConfig<TData extends JobData>(config: QueueConfig<TData>): QueueConfig<TData> {
|
||||
const phases = [...new Set(config.phases)];
|
||||
|
||||
if (phases.length === 0) {
|
||||
throw new Error('QueueConfig.phases must contain at least one phase');
|
||||
}
|
||||
|
||||
return {
|
||||
...config,
|
||||
phases,
|
||||
concurrency: config.concurrency ?? DEFAULT_CONCURRENCY,
|
||||
shutdownTimeoutMs: config.shutdownTimeoutMs ?? DEFAULT_SHUTDOWN_TIMEOUT_MS,
|
||||
};
|
||||
}
|
||||
|
||||
export class JobQueue<TData extends JobData = JobData> {
|
||||
private readonly config: QueueConfig<TData>;
|
||||
private readonly storage: SqliteStorage<TData>;
|
||||
private readonly events = new TypedEventBus<JobQueueEvents<TData>>();
|
||||
private readonly retryStrategy: RetryStrategy<TData>;
|
||||
private readonly workerPool: WorkerPool;
|
||||
private readonly handlers = new Map<string, PhaseHandler<TData>>();
|
||||
private readonly serializer = new SseSerializer();
|
||||
private readonly controllers = new Map<string, AbortController>();
|
||||
private readonly webhookDispatcher?: WebhookDispatcher<TData>;
|
||||
private readonly retentionScheduler?: RetentionScheduler<TData>;
|
||||
private wakeupTimer: NodeJS.Timeout | null = null;
|
||||
private closed = false;
|
||||
private pumping = false;
|
||||
private repumpRequested = false;
|
||||
|
||||
public constructor(queueConfig: QueueConfig<TData>) {
|
||||
this.config = normalizeQueueConfig(queueConfig);
|
||||
this.storage = new SqliteStorage<TData>(this.config.dbPath);
|
||||
this.retryStrategy = new RetryStrategy(this.config.retry);
|
||||
this.workerPool = new WorkerPool(this.config.concurrency ?? DEFAULT_CONCURRENCY);
|
||||
this.webhookDispatcher = this.config.webhook
|
||||
? new WebhookDispatcher<TData>(this.config.webhook)
|
||||
: undefined;
|
||||
|
||||
this.storage.resetActiveJobs('Interrupted by process restart');
|
||||
|
||||
if (this.config.retention) {
|
||||
this.retentionScheduler = new RetentionScheduler(this.config.retention, {
|
||||
markStale: async (cutoffIso) => this.markStaleJobs(cutoffIso),
|
||||
deleteStale: async (cutoffIso) => this.deleteStaleJobs(cutoffIso),
|
||||
});
|
||||
this.retentionScheduler.start();
|
||||
}
|
||||
|
||||
this.requestPump();
|
||||
}
|
||||
|
||||
public handle(phaseName: string, handler: PhaseHandler<TData>): this {
|
||||
if (!this.config.phases.includes(phaseName)) {
|
||||
throw new Error(`Phase "${phaseName}" is not defined in queue config`);
|
||||
}
|
||||
|
||||
this.handlers.set(phaseName, handler);
|
||||
return this;
|
||||
}
|
||||
|
||||
public on<TName extends keyof JobQueueEvents<TData> & string>(
|
||||
event: TName,
|
||||
listener: (...args: JobQueueEvents<TData>[TName]) => void,
|
||||
): () => void {
|
||||
return this.events.on(event, listener);
|
||||
}
|
||||
|
||||
public once<TName extends keyof JobQueueEvents<TData> & string>(
|
||||
event: TName,
|
||||
listener: (...args: JobQueueEvents<TData>[TName]) => void,
|
||||
): () => void {
|
||||
return this.events.once(event, listener);
|
||||
}
|
||||
|
||||
public async enqueue(data: TData, options: EnqueueOptions = {}): Promise<string> {
|
||||
this.ensureOpen();
|
||||
const id = options.id ?? createJobId();
|
||||
const job = this.storage.createJob(
|
||||
id,
|
||||
data,
|
||||
createInitialPhases(this.config.phases),
|
||||
options,
|
||||
options.maxAttempts ?? this.retryStrategy.defaultMaxAttempts(),
|
||||
);
|
||||
|
||||
this.events.emit('job:enqueued', job);
|
||||
this.requestPump();
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
public getJob(id: string): JobRecord<TData> | null {
|
||||
return this.storage.getJob(id);
|
||||
}
|
||||
|
||||
public listJobs(options: ListJobsOptions = {}): JobRecord<TData>[] {
|
||||
return this.storage.listJobs(options);
|
||||
}
|
||||
|
||||
public async retry(id: string, options: RetryOptions = {}): Promise<JobRecord<TData>> {
|
||||
this.ensureOpen();
|
||||
const existing = this.requireJob(id);
|
||||
|
||||
if (!['failed', 'cancelled', 'stale'].includes(existing.status)) {
|
||||
throw new Error(`Job ${id} cannot be retried from status ${existing.status}`);
|
||||
}
|
||||
|
||||
const phases =
|
||||
options.fromStart === false
|
||||
? clonePhases(existing.phases).map<JobPhaseState>((phase) =>
|
||||
phase.status === 'completed'
|
||||
? phase
|
||||
: {
|
||||
...phase,
|
||||
status: 'pending',
|
||||
progress: 0,
|
||||
message: null,
|
||||
completedAt: null,
|
||||
error: null,
|
||||
},
|
||||
)
|
||||
: createInitialPhases(this.config.phases);
|
||||
const retried = this.storage.resetForRetry(
|
||||
id,
|
||||
phases,
|
||||
existing.maxAttempts,
|
||||
options.scheduledAt instanceof Date ? options.scheduledAt.toISOString() : options.scheduledAt ?? null,
|
||||
);
|
||||
|
||||
this.events.emit('job:enqueued', retried);
|
||||
this.requestPump();
|
||||
|
||||
return retried;
|
||||
}
|
||||
|
||||
public async cancel(id: string): Promise<JobRecord<TData>> {
|
||||
const job = this.requireJob(id);
|
||||
|
||||
if (!['pending', 'active'].includes(job.status)) {
|
||||
return job;
|
||||
}
|
||||
|
||||
const phases = clonePhases(job.phases);
|
||||
const activePhaseIndex = phases.findIndex((phase) => phase.status === 'active');
|
||||
if (activePhaseIndex >= 0) {
|
||||
phases[activePhaseIndex] = {
|
||||
...phases[activePhaseIndex],
|
||||
status: 'cancelled',
|
||||
completedAt: now(),
|
||||
};
|
||||
}
|
||||
|
||||
const controller = this.controllers.get(id);
|
||||
controller?.abort(new CancellationError());
|
||||
|
||||
const cancelled = this.storage.cancelJob(id, phases);
|
||||
this.events.emit('job:cancelled', cancelled);
|
||||
void this.dispatchWebhook('job:cancelled', cancelled);
|
||||
this.requestPump();
|
||||
|
||||
return cancelled;
|
||||
}
|
||||
|
||||
public createEventStream(options: StreamOptions = {}): ReadableStream<Uint8Array> {
|
||||
const encoder = new TextEncoder();
|
||||
const keepAliveMs = options.keepAliveMs ?? 30_000;
|
||||
let cleanup = () => {};
|
||||
|
||||
return new ReadableStream<Uint8Array>({
|
||||
start: (controller) => {
|
||||
if (options.includeSnapshot !== false) {
|
||||
if (options.jobId) {
|
||||
const job = this.getJob(options.jobId);
|
||||
if (job) {
|
||||
this.enqueueStreamEvent(controller, encoder, 'snapshot', {
|
||||
type: 'snapshot',
|
||||
jobId: job.id,
|
||||
job,
|
||||
timestamp: now(),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
for (const job of this.listJobs({ limit: 100 })) {
|
||||
this.enqueueStreamEvent(controller, encoder, 'snapshot', {
|
||||
type: 'snapshot',
|
||||
jobId: job.id,
|
||||
job,
|
||||
timestamp: now(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const unsubscribers = [
|
||||
this.on('job:enqueued', (job) => this.pushJobEvent(controller, encoder, 'job:enqueued', job, options.jobId)),
|
||||
this.on('job:started', (job) => this.pushJobEvent(controller, encoder, 'job:started', job, options.jobId)),
|
||||
this.on('job:progress', (job, progress) => {
|
||||
if (!this.matchesStreamJob(options.jobId, job.id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.enqueueStreamEvent(controller, encoder, 'job:progress', {
|
||||
type: 'job:progress',
|
||||
jobId: job.id,
|
||||
job,
|
||||
progress,
|
||||
timestamp: now(),
|
||||
});
|
||||
}),
|
||||
this.on('job:phase:completed', (job, phase) => {
|
||||
if (!this.matchesStreamJob(options.jobId, job.id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.enqueueStreamEvent(controller, encoder, 'job:phase:completed', {
|
||||
type: 'job:phase:completed',
|
||||
jobId: job.id,
|
||||
job,
|
||||
phase,
|
||||
timestamp: now(),
|
||||
});
|
||||
}),
|
||||
this.on('job:completed', (job) => this.pushJobEvent(controller, encoder, 'job:completed', job, options.jobId)),
|
||||
this.on('job:failed', (job, failure) => {
|
||||
if (!this.matchesStreamJob(options.jobId, job.id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.enqueueStreamEvent(controller, encoder, 'job:failed', {
|
||||
type: 'job:failed',
|
||||
jobId: job.id,
|
||||
job,
|
||||
failure,
|
||||
timestamp: now(),
|
||||
});
|
||||
}),
|
||||
this.on('job:retrying', (job, retry) => {
|
||||
if (!this.matchesStreamJob(options.jobId, job.id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.enqueueStreamEvent(controller, encoder, 'job:retrying', {
|
||||
type: 'job:retrying',
|
||||
jobId: job.id,
|
||||
job,
|
||||
retry,
|
||||
timestamp: now(),
|
||||
});
|
||||
}),
|
||||
this.on('job:cancelled', (job) => this.pushJobEvent(controller, encoder, 'job:cancelled', job, options.jobId)),
|
||||
this.on('job:stale', (job) => this.pushJobEvent(controller, encoder, 'job:stale', job, options.jobId)),
|
||||
this.on('job:deleted', (jobId) => {
|
||||
if (!this.matchesStreamJob(options.jobId, jobId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.enqueueStreamEvent(controller, encoder, 'job:deleted', {
|
||||
type: 'job:deleted',
|
||||
deletedJobId: jobId,
|
||||
jobId,
|
||||
timestamp: now(),
|
||||
});
|
||||
}),
|
||||
this.on('job:webhook:delivered', (job, webhook) => {
|
||||
if (!this.matchesStreamJob(options.jobId, job.id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.enqueueStreamEvent(controller, encoder, 'job:webhook:delivered', {
|
||||
type: 'job:webhook:delivered',
|
||||
jobId: job.id,
|
||||
job,
|
||||
webhook,
|
||||
timestamp: now(),
|
||||
});
|
||||
}),
|
||||
this.on('job:webhook:failed', (job, webhook) => {
|
||||
if (!this.matchesStreamJob(options.jobId, job.id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.enqueueStreamEvent(controller, encoder, 'job:webhook:failed', {
|
||||
type: 'job:webhook:failed',
|
||||
jobId: job.id,
|
||||
job,
|
||||
webhook,
|
||||
timestamp: now(),
|
||||
});
|
||||
}),
|
||||
];
|
||||
|
||||
const keepAlive = setInterval(() => {
|
||||
this.enqueueStreamEvent(controller, encoder, 'ping', {
|
||||
type: 'ping',
|
||||
timestamp: now(),
|
||||
});
|
||||
}, keepAliveMs);
|
||||
keepAlive.unref?.();
|
||||
|
||||
cleanup = () => {
|
||||
clearInterval(keepAlive);
|
||||
for (const unsubscribe of unsubscribers) {
|
||||
unsubscribe();
|
||||
}
|
||||
};
|
||||
},
|
||||
cancel: () => {
|
||||
cleanup();
|
||||
return undefined;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
public async shutdown(timeoutMs = this.config.shutdownTimeoutMs): Promise<void> {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.closed = true;
|
||||
this.retentionScheduler?.stop();
|
||||
|
||||
if (this.wakeupTimer) {
|
||||
clearTimeout(this.wakeupTimer);
|
||||
this.wakeupTimer = null;
|
||||
}
|
||||
|
||||
await this.workerPool.drain(timeoutMs);
|
||||
this.events.removeAllListeners();
|
||||
this.storage.close();
|
||||
}
|
||||
|
||||
private async pump(): Promise<void> {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.pumping) {
|
||||
this.repumpRequested = true;
|
||||
return;
|
||||
}
|
||||
|
||||
this.pumping = true;
|
||||
|
||||
try {
|
||||
do {
|
||||
this.repumpRequested = false;
|
||||
|
||||
while (this.workerPool.hasCapacity()) {
|
||||
const candidates = this.storage.listRunnableJobs(this.workerPool.availableSlots);
|
||||
|
||||
if (candidates.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
let dispatched = 0;
|
||||
|
||||
for (const candidate of candidates) {
|
||||
if (!this.workerPool.hasCapacity()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!this.storage.claimPendingJob(candidate.id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const claimed = this.requireJob(candidate.id);
|
||||
this.events.emit('job:started', claimed);
|
||||
this.workerPool
|
||||
.run(async () => this.processJob(claimed.id))
|
||||
.finally(() => {
|
||||
this.requestPump();
|
||||
});
|
||||
dispatched += 1;
|
||||
}
|
||||
|
||||
if (dispatched === 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
this.scheduleWakeup();
|
||||
} while (this.repumpRequested);
|
||||
} finally {
|
||||
this.pumping = false;
|
||||
}
|
||||
}
|
||||
|
||||
private requestPump(): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
void this.pump();
|
||||
}
|
||||
|
||||
private scheduleWakeup(): void {
|
||||
if (this.wakeupTimer) {
|
||||
clearTimeout(this.wakeupTimer);
|
||||
this.wakeupTimer = null;
|
||||
}
|
||||
|
||||
const nextScheduledAt = this.storage.getNextScheduledAt();
|
||||
if (!nextScheduledAt) {
|
||||
return;
|
||||
}
|
||||
|
||||
const delay = Math.max(new Date(nextScheduledAt).getTime() - Date.now(), 0);
|
||||
this.wakeupTimer = setTimeout(() => {
|
||||
this.wakeupTimer = null;
|
||||
this.requestPump();
|
||||
}, delay);
|
||||
this.wakeupTimer.unref?.();
|
||||
}
|
||||
|
||||
private async processJob(jobId: string): Promise<void> {
|
||||
const job = this.requireJob(jobId);
|
||||
const controller = new AbortController();
|
||||
this.controllers.set(jobId, controller);
|
||||
|
||||
try {
|
||||
const runner = new PhaseRunner<TData>({
|
||||
handlers: this.handlers,
|
||||
phases: this.config.phases,
|
||||
onPhaseStarted: async (phaseName, phases) =>
|
||||
this.storage.saveProgress(jobId, phaseName, phases, this.requireJob(jobId).progress, null),
|
||||
onProgress: async (phaseName, phases, phaseProgress, overallProgress, message, details) => {
|
||||
const updated = this.storage.saveProgress(jobId, phaseName, phases, overallProgress, message ?? null);
|
||||
const progressEvent: JobProgressEvent = {
|
||||
jobId,
|
||||
phase: phaseName,
|
||||
phaseProgress,
|
||||
overallProgress,
|
||||
message: message ?? null,
|
||||
timestamp: now(),
|
||||
...(details !== undefined ? { details } : {}),
|
||||
};
|
||||
|
||||
this.events.emit('job:progress', updated, progressEvent);
|
||||
},
|
||||
onPhaseCompleted: async (phaseName, phases, phaseResults, overallProgress) => {
|
||||
const updated = this.storage.savePhaseCompletion(
|
||||
jobId,
|
||||
phases,
|
||||
phaseResults,
|
||||
null,
|
||||
overallProgress,
|
||||
null,
|
||||
);
|
||||
const completedPhase = updated.phases.find((phase) => phase.name === phaseName);
|
||||
if (!completedPhase) {
|
||||
throw new Error(`Completed phase ${phaseName} not found`);
|
||||
}
|
||||
|
||||
this.events.emit('job:phase:completed', updated, completedPhase);
|
||||
return updated;
|
||||
},
|
||||
onCancelled: async (phaseName, phases) => {
|
||||
const activePhases = clonePhases(phases);
|
||||
if (phaseName) {
|
||||
const active = activePhases.find((phase) => phase.name === phaseName);
|
||||
if (active) {
|
||||
active.status = 'cancelled';
|
||||
active.completedAt = now();
|
||||
}
|
||||
}
|
||||
|
||||
const cancelled = this.storage.cancelJob(jobId, activePhases);
|
||||
this.events.emit('job:cancelled', cancelled);
|
||||
void this.dispatchWebhook('job:cancelled', cancelled);
|
||||
},
|
||||
});
|
||||
|
||||
const result = await runner.run(job, controller.signal);
|
||||
const latest = this.requireJob(jobId);
|
||||
|
||||
if (latest.status === 'cancelled') {
|
||||
return;
|
||||
}
|
||||
|
||||
const completed = this.storage.completeJob(jobId, result.phases, result.phaseResults);
|
||||
this.events.emit('job:completed', completed);
|
||||
void this.dispatchWebhook('job:completed', completed);
|
||||
} catch (error) {
|
||||
if (error instanceof CancellationError) {
|
||||
const current = this.requireJob(jobId);
|
||||
if (current.status !== 'cancelled') {
|
||||
const cancelled = this.storage.cancelJob(jobId, clonePhases(current.phases));
|
||||
this.events.emit('job:cancelled', cancelled);
|
||||
void this.dispatchWebhook('job:cancelled', cancelled);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.handleFailure(jobId, error);
|
||||
} finally {
|
||||
this.controllers.delete(jobId);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleFailure(jobId: string, error: unknown): Promise<void> {
|
||||
const current = this.requireJob(jobId);
|
||||
const recoverability = await this.retryStrategy.shouldRetry(error, current);
|
||||
const phaseName = current.currentPhase;
|
||||
const phases = clonePhases(current.phases);
|
||||
const activePhase = phaseName ? phases.find((phase) => phase.name === phaseName) : undefined;
|
||||
|
||||
if (activePhase) {
|
||||
activePhase.error = error instanceof Error ? error.message : String(error);
|
||||
}
|
||||
|
||||
const failure: JobFailure = {
|
||||
message: error instanceof Error ? error.message : String(error),
|
||||
phase: phaseName,
|
||||
recoverable: recoverability.disposition === 'recoverable',
|
||||
timestamp: now(),
|
||||
attempt: current.retryCount + 1,
|
||||
};
|
||||
|
||||
if (recoverability.retry && activePhase) {
|
||||
activePhase.status = 'pending';
|
||||
activePhase.progress = 0;
|
||||
activePhase.message = null;
|
||||
activePhase.completedAt = null;
|
||||
|
||||
const nextRunAt = new Date(Date.now() + recoverability.delayMs).toISOString();
|
||||
const pending = this.storage.scheduleRetry(
|
||||
jobId,
|
||||
phases,
|
||||
current.progress,
|
||||
failure,
|
||||
current.retryCount + 1,
|
||||
nextRunAt,
|
||||
);
|
||||
const retry = {
|
||||
jobId,
|
||||
phase: phaseName,
|
||||
attempt: pending.retryCount,
|
||||
delayMs: recoverability.delayMs,
|
||||
nextRunAt,
|
||||
timestamp: now(),
|
||||
};
|
||||
|
||||
this.events.emit('job:retrying', pending, retry);
|
||||
void this.dispatchWebhook('job:retrying', pending);
|
||||
this.requestPump();
|
||||
return;
|
||||
}
|
||||
|
||||
if (activePhase) {
|
||||
activePhase.status = 'failed';
|
||||
activePhase.completedAt = now();
|
||||
}
|
||||
|
||||
const failed = this.storage.failJob(jobId, phases, failure);
|
||||
this.events.emit('job:failed', failed, failure);
|
||||
void this.dispatchWebhook('job:failed', failed);
|
||||
}
|
||||
|
||||
private async markStaleJobs(cutoffIso: string): Promise<JobRecord<TData>[]> {
|
||||
const staleJobs = this.storage.markStaleJobs(cutoffIso);
|
||||
|
||||
for (const job of staleJobs) {
|
||||
if (this.config.retention?.onStale) {
|
||||
await this.config.retention.onStale(job);
|
||||
}
|
||||
|
||||
this.events.emit('job:stale', job);
|
||||
void this.dispatchWebhook('job:stale', job);
|
||||
}
|
||||
|
||||
return staleJobs;
|
||||
}
|
||||
|
||||
private async deleteStaleJobs(cutoffIso: string): Promise<JobRecord<TData>[]> {
|
||||
const deletedJobs = this.storage.deleteStaleJobs(cutoffIso);
|
||||
|
||||
for (const job of deletedJobs) {
|
||||
if (this.config.retention?.onDelete) {
|
||||
await this.config.retention.onDelete(job);
|
||||
}
|
||||
|
||||
this.events.emit('job:deleted', job.id);
|
||||
}
|
||||
|
||||
return deletedJobs;
|
||||
}
|
||||
|
||||
private async dispatchWebhook(event: WebhookEventName, job: JobRecord<TData>): Promise<void> {
|
||||
if (!this.webhookDispatcher?.supports(event, job)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await this.webhookDispatcher.dispatch(event, job);
|
||||
this.storage.markWebhookSent(job.id);
|
||||
const refreshed = this.requireJob(job.id);
|
||||
this.events.emit('job:webhook:delivered', refreshed, result);
|
||||
} catch (error) {
|
||||
const dispatchError =
|
||||
error instanceof Error && 'dispatchError' in error
|
||||
? ((error as { dispatchError?: WebhookDispatchError }).dispatchError ?? {
|
||||
event,
|
||||
jobId: job.id,
|
||||
message: error.message,
|
||||
finalAttempt: 1,
|
||||
})
|
||||
: {
|
||||
event,
|
||||
jobId: job.id,
|
||||
message: error instanceof Error ? error.message : String(error),
|
||||
finalAttempt: 1,
|
||||
};
|
||||
|
||||
const refreshed = this.requireJob(job.id);
|
||||
this.events.emit('job:webhook:failed', refreshed, dispatchError);
|
||||
}
|
||||
}
|
||||
|
||||
private pushJobEvent(
|
||||
controller: ReadableStreamDefaultController<Uint8Array>,
|
||||
encoder: { encode: (input?: string) => Uint8Array },
|
||||
type: Exclude<QueueStreamEvent['type'], 'snapshot' | 'job:progress' | 'job:phase:completed' | 'job:failed' | 'job:retrying' | 'job:deleted' | 'job:webhook:delivered' | 'job:webhook:failed' | 'ping'>,
|
||||
job: JobRecord<TData>,
|
||||
jobIdFilter?: string,
|
||||
): void {
|
||||
if (!this.matchesStreamJob(jobIdFilter, job.id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.enqueueStreamEvent(controller, encoder, type, {
|
||||
type,
|
||||
jobId: job.id,
|
||||
job,
|
||||
timestamp: now(),
|
||||
});
|
||||
}
|
||||
|
||||
private enqueueStreamEvent(
|
||||
controller: ReadableStreamDefaultController<Uint8Array>,
|
||||
encoder: { encode: (input?: string) => Uint8Array },
|
||||
eventName: string,
|
||||
payload: QueueStreamEvent<TData>,
|
||||
): void {
|
||||
controller.enqueue(encoder.encode(this.serializer.event(eventName, payload)));
|
||||
}
|
||||
|
||||
private matchesStreamJob(jobIdFilter: string | undefined, jobId: string): boolean {
|
||||
return !jobIdFilter || jobIdFilter === jobId;
|
||||
}
|
||||
|
||||
private requireJob(id: string): JobRecord<TData> {
|
||||
const job = this.storage.getJob(id);
|
||||
|
||||
if (!job) {
|
||||
throw new Error(`Job ${id} not found`);
|
||||
}
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
private ensureOpen(): void {
|
||||
if (this.closed) {
|
||||
throw new Error('JobQueue is shut down');
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user