Giancarmine Salucci a9429e2118 fix: harden queue lifecycle and publish gate
- Preserve phase results on partial retry and keep interrupted phase
  context after restart.
- Avoid webhook bookkeeping crashes when retention deletes stale jobs.
- Add deeper unit, integration, and e2e coverage around queue seams.
- Require verify job to pass before publish runs.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-16 18:39:19 +02:00
2026-05-16 00:51:54 +02:00
2026-05-16 00:51:54 +02:00

jobqueue

Framework-agnostic async job queue for Node.js with:

  • SQLite persistence via better-sqlite3 + WAL mode
  • Multi-phase pipelines with per-phase progress and phase result passing
  • Concurrency control with a worker pool
  • Automatic retry with fixed, linear, or exponential backoff
  • SSE helpers for progress/completion streaming
  • Outbound webhooks with optional HMAC-SHA256 signing
  • Retention cleanup for stale and expired jobs
  • ESM-only, Node 20+

Detailed architecture and runtime docs live in docs/.

Install

npm install jobqueue --registry=https://git.sal.giize.com/api/packages/mozempk/npm/

To make installs/publishes use Gitea by default:

registry=https://git.sal.giize.com/api/packages/mozempk/npm/
//git.sal.giize.com/api/packages/mozempk/npm/:_authToken=${NODE_AUTH_TOKEN}

Quick start

import { JobQueue } from 'jobqueue';

const queue = new JobQueue<{ url: string }>({
  dbPath: './data/jobs.db',
  phases: ['download', 'process', 'upload'],
  concurrency: 2,
  retry: {
    maxAttempts: 3,
    strategy: 'exponential',
    baseDelayMs: 1_000,
    maxDelayMs: 30_000,
    classifyError: async (error) => {
      if (error instanceof Error && error.message.includes('ECONNREFUSED')) {
        return 'recoverable';
      }

      return 'fatal';
    },
  },
  webhook: {
    url: 'https://example.com/hooks/jobs',
    events: ['job:completed', 'job:failed'],
    secret: process.env.JOBQUEUE_WEBHOOK_SECRET,
  },
  retention: {
    staleAfterMs: 7 * 24 * 60 * 60 * 1000,
    deleteAfterMs: 30 * 24 * 60 * 60 * 1000,
    onDelete: async (job) => {
      console.log(`Deleting ${job.id}`);
    },
  },
});

queue.handle('download', async (job, ctx) => {
  await ctx.progress(10, 'Downloading source');
  return { filePath: `/tmp/${job.id}.mp4` };
});

queue.handle('process', async (_job, ctx) => {
  const download = ctx.phaseResult<{ filePath: string }>('download');
  await ctx.progress(50, 'Processing file');
  return { outputPath: `${download?.filePath}.json` };
});

queue.handle('upload', async (_job, ctx) => {
  const processed = ctx.phaseResult<{ outputPath: string }>('process');
  await ctx.progress(100, 'Uploading result');
  return { uploaded: Boolean(processed?.outputPath) };
});

queue.on('job:completed', (job) => {
  console.log('completed', job.id, job.phaseResults);
});

const jobId = await queue.enqueue({
  url: 'https://example.com/video',
});

API

new JobQueue(config)

interface QueueConfig<TData> {
  dbPath: string;
  phases: readonly string[];
  concurrency?: number;
  retry?: RetryConfig<TData>;
  retention?: RetentionConfig<TData>;
  webhook?: WebhookConfig;
  shutdownTimeoutMs?: number;
}

queue.handle(phaseName, handler)

Register one handler per phase.

type PhaseHandler<TData> = (
  job: JobRecord<TData>,
  context: PhaseContext<TData>,
) => Promise<JsonValue | undefined> | JsonValue | undefined;

queue.enqueue(data, options?)

Creates a new job and returns its ID.

await queue.enqueue(
  { url: 'https://example.com/video' },
  {
    scheduledAt: new Date(Date.now() + 5_000),
    maxAttempts: 5,
    webhookUrl: 'https://example.com/job-specific-hook',
  },
);

queue.getJob(id) / queue.listJobs(options)

Inspect persisted jobs.

queue.retry(id, options?)

Manual retry. By default it restarts from the first phase.

await queue.retry(jobId, { fromStart: true });

queue.cancel(id)

Cooperative cancellation. Active handlers receive an AbortSignal at ctx.signal.

queue.createEventStream(options?)

Returns a web ReadableStream<Uint8Array> ready to plug into Express, Hono, SvelteKit, Fastify, or plain Node responses.

const stream = queue.createEventStream({
  jobId,
  includeSnapshot: true,
});

Use headers from SseSerializer.headers().

queue.shutdown()

Stops retention timers, waits for active jobs, then closes SQLite cleanly.

Retry behavior

maxAttempts includes the first attempt.

  • maxAttempts: 1 → never auto-retry
  • maxAttempts: 3 → first run + 2 retries

Error classification decides whether a failure is retryable:

retry: {
  maxAttempts: 3,
  classifyError: async (error) => {
    if (error instanceof Error && /timeout|econnrefused/i.test(error.message)) {
      return 'recoverable';
    }

    return 'fatal';
  },
}

Webhooks

Supported outbound events:

  • job:completed
  • job:failed
  • job:retrying
  • job:cancelled
  • job:stale

Payload shape:

{
  "event": "job:completed",
  "emittedAt": "2026-05-15T22:00:00.000Z",
  "job": {
    "id": "..."
  }
}

If secret is configured, jobqueue sends:

X-JobQueue-Event: job:completed
X-JobQueue-Signature: <hex hmac sha256>

SSE events

Stream payloads are emitted with these event names:

  • snapshot
  • job:enqueued
  • job:started
  • job:progress
  • job:phase:completed
  • job:completed
  • job:failed
  • job:retrying
  • job:cancelled
  • job:stale
  • job:deleted
  • job:webhook:delivered
  • job:webhook:failed
  • ping

Development

npm install
npm run lint
npm test
npm run build

Publish from Gitea Actions

Create repo secret:

  • JOBQUEUE_NPM_TOKEN → token with package publish permissions

Publishing flow:

  1. Push code to mozempk/jobqueue
  2. Create tag: git tag v0.1.0 && git push origin v0.1.0
  3. Gitea Actions runs tests, build, and npm publish
Description
Framework-agnostic async job queue for Node.js
Readme MIT 122 KiB
Languages
TypeScript 100%