- Preserve phase results on partial retry and keep interrupted phase context after restart. - Avoid webhook bookkeeping crashes when retention deletes stale jobs. - Add deeper unit, integration, and e2e coverage around queue seams. - Require verify job to pass before publish runs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
256 lines
5.4 KiB
Markdown
256 lines
5.4 KiB
Markdown
# jobqueue
|
|
|
|
Framework-agnostic async job queue for Node.js with:
|
|
|
|
- **SQLite persistence** via `better-sqlite3` + WAL mode
|
|
- **Multi-phase pipelines** with per-phase progress and phase result passing
|
|
- **Concurrency control** with a worker pool
|
|
- **Automatic retry** with fixed, linear, or exponential backoff
|
|
- **SSE helpers** for progress/completion streaming
|
|
- **Outbound webhooks** with optional HMAC-SHA256 signing
|
|
- **Retention cleanup** for stale and expired jobs
|
|
- **ESM-only**, **Node 20+**
|
|
|
|
Detailed architecture and runtime docs live in [`docs/`](./docs/README.md).
|
|
|
|
## Install
|
|
|
|
```bash
|
|
npm install jobqueue --registry=https://git.sal.giize.com/api/packages/mozempk/npm/
|
|
```
|
|
|
|
To make installs/publishes use Gitea by default:
|
|
|
|
```ini
|
|
registry=https://git.sal.giize.com/api/packages/mozempk/npm/
|
|
//git.sal.giize.com/api/packages/mozempk/npm/:_authToken=${NODE_AUTH_TOKEN}
|
|
```
|
|
|
|
## Quick start
|
|
|
|
```ts
|
|
import { JobQueue } from 'jobqueue';
|
|
|
|
const queue = new JobQueue<{ url: string }>({
|
|
dbPath: './data/jobs.db',
|
|
phases: ['download', 'process', 'upload'],
|
|
concurrency: 2,
|
|
retry: {
|
|
maxAttempts: 3,
|
|
strategy: 'exponential',
|
|
baseDelayMs: 1_000,
|
|
maxDelayMs: 30_000,
|
|
classifyError: async (error) => {
|
|
if (error instanceof Error && error.message.includes('ECONNREFUSED')) {
|
|
return 'recoverable';
|
|
}
|
|
|
|
return 'fatal';
|
|
},
|
|
},
|
|
webhook: {
|
|
url: 'https://example.com/hooks/jobs',
|
|
events: ['job:completed', 'job:failed'],
|
|
secret: process.env.JOBQUEUE_WEBHOOK_SECRET,
|
|
},
|
|
retention: {
|
|
staleAfterMs: 7 * 24 * 60 * 60 * 1000,
|
|
deleteAfterMs: 30 * 24 * 60 * 60 * 1000,
|
|
onDelete: async (job) => {
|
|
console.log(`Deleting ${job.id}`);
|
|
},
|
|
},
|
|
});
|
|
|
|
queue.handle('download', async (job, ctx) => {
|
|
await ctx.progress(10, 'Downloading source');
|
|
return { filePath: `/tmp/${job.id}.mp4` };
|
|
});
|
|
|
|
queue.handle('process', async (_job, ctx) => {
|
|
const download = ctx.phaseResult<{ filePath: string }>('download');
|
|
await ctx.progress(50, 'Processing file');
|
|
return { outputPath: `${download?.filePath}.json` };
|
|
});
|
|
|
|
queue.handle('upload', async (_job, ctx) => {
|
|
const processed = ctx.phaseResult<{ outputPath: string }>('process');
|
|
await ctx.progress(100, 'Uploading result');
|
|
return { uploaded: Boolean(processed?.outputPath) };
|
|
});
|
|
|
|
queue.on('job:completed', (job) => {
|
|
console.log('completed', job.id, job.phaseResults);
|
|
});
|
|
|
|
const jobId = await queue.enqueue({
|
|
url: 'https://example.com/video',
|
|
});
|
|
```
|
|
|
|
## API
|
|
|
|
### `new JobQueue(config)`
|
|
|
|
```ts
|
|
interface QueueConfig<TData> {
|
|
dbPath: string;
|
|
phases: readonly string[];
|
|
concurrency?: number;
|
|
retry?: RetryConfig<TData>;
|
|
retention?: RetentionConfig<TData>;
|
|
webhook?: WebhookConfig;
|
|
shutdownTimeoutMs?: number;
|
|
}
|
|
```
|
|
|
|
### `queue.handle(phaseName, handler)`
|
|
|
|
Register one handler per phase.
|
|
|
|
```ts
|
|
type PhaseHandler<TData> = (
|
|
job: JobRecord<TData>,
|
|
context: PhaseContext<TData>,
|
|
) => Promise<JsonValue | undefined> | JsonValue | undefined;
|
|
```
|
|
|
|
### `queue.enqueue(data, options?)`
|
|
|
|
Creates a new job and returns its ID.
|
|
|
|
```ts
|
|
await queue.enqueue(
|
|
{ url: 'https://example.com/video' },
|
|
{
|
|
scheduledAt: new Date(Date.now() + 5_000),
|
|
maxAttempts: 5,
|
|
webhookUrl: 'https://example.com/job-specific-hook',
|
|
},
|
|
);
|
|
```
|
|
|
|
### `queue.getJob(id)` / `queue.listJobs(options)`
|
|
|
|
Inspect persisted jobs.
|
|
|
|
### `queue.retry(id, options?)`
|
|
|
|
Manual retry. By default it restarts from the first phase.
|
|
|
|
```ts
|
|
await queue.retry(jobId, { fromStart: true });
|
|
```
|
|
|
|
### `queue.cancel(id)`
|
|
|
|
Cooperative cancellation. Active handlers receive an `AbortSignal` at `ctx.signal`.
|
|
|
|
### `queue.createEventStream(options?)`
|
|
|
|
Returns a web `ReadableStream<Uint8Array>` ready to plug into Express, Hono, SvelteKit, Fastify, or plain Node responses.
|
|
|
|
```ts
|
|
const stream = queue.createEventStream({
|
|
jobId,
|
|
includeSnapshot: true,
|
|
});
|
|
```
|
|
|
|
Use headers from `SseSerializer.headers()`.
|
|
|
|
### `queue.shutdown()`
|
|
|
|
Stops retention timers, waits for active jobs, then closes SQLite cleanly.
|
|
|
|
## Retry behavior
|
|
|
|
`maxAttempts` includes the first attempt.
|
|
|
|
- `maxAttempts: 1` → never auto-retry
|
|
- `maxAttempts: 3` → first run + 2 retries
|
|
|
|
Error classification decides whether a failure is retryable:
|
|
|
|
```ts
|
|
retry: {
|
|
maxAttempts: 3,
|
|
classifyError: async (error) => {
|
|
if (error instanceof Error && /timeout|econnrefused/i.test(error.message)) {
|
|
return 'recoverable';
|
|
}
|
|
|
|
return 'fatal';
|
|
},
|
|
}
|
|
```
|
|
|
|
## Webhooks
|
|
|
|
Supported outbound events:
|
|
|
|
- `job:completed`
|
|
- `job:failed`
|
|
- `job:retrying`
|
|
- `job:cancelled`
|
|
- `job:stale`
|
|
|
|
Payload shape:
|
|
|
|
```json
|
|
{
|
|
"event": "job:completed",
|
|
"emittedAt": "2026-05-15T22:00:00.000Z",
|
|
"job": {
|
|
"id": "..."
|
|
}
|
|
}
|
|
```
|
|
|
|
If `secret` is configured, jobqueue sends:
|
|
|
|
```text
|
|
X-JobQueue-Event: job:completed
|
|
X-JobQueue-Signature: <hex hmac sha256>
|
|
```
|
|
|
|
## SSE events
|
|
|
|
Stream payloads are emitted with these event names:
|
|
|
|
- `snapshot`
|
|
- `job:enqueued`
|
|
- `job:started`
|
|
- `job:progress`
|
|
- `job:phase:completed`
|
|
- `job:completed`
|
|
- `job:failed`
|
|
- `job:retrying`
|
|
- `job:cancelled`
|
|
- `job:stale`
|
|
- `job:deleted`
|
|
- `job:webhook:delivered`
|
|
- `job:webhook:failed`
|
|
- `ping`
|
|
|
|
## Development
|
|
|
|
```bash
|
|
npm install
|
|
npm run lint
|
|
npm test
|
|
npm run build
|
|
```
|
|
|
|
## Publish from Gitea Actions
|
|
|
|
Create repo secret:
|
|
|
|
- `JOBQUEUE_NPM_TOKEN` → token with package publish permissions
|
|
|
|
Publishing flow:
|
|
|
|
1. Push code to `mozempk/jobqueue`
|
|
2. Create tag: `git tag v0.1.0 && git push origin v0.1.0`
|
|
3. Gitea Actions runs tests, build, and `npm publish`
|