- Preserve phase results on partial retry and keep interrupted phase context after restart. - Avoid webhook bookkeeping crashes when retention deletes stale jobs. - Add deeper unit, integration, and e2e coverage around queue seams. - Require verify job to pass before publish runs. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
7.7 KiB
Architecture
This document explains how jobqueue is structured, what each module owns, and how data moves through the system.
1. Top-level structure
JobQueue is the orchestrator. Everything else is a collaborator around four concerns:
- Persistence -
SqliteStorage - Execution -
WorkerPool+PhaseRunner - Notifications -
TypedEventBus,SseSerializer,WebhookDispatcher - Lifecycle management -
RetryStrategy,RetentionScheduler, shutdown logic
flowchart LR
A[Consumer app] -->|enqueue / retry / cancel / list| B[JobQueue]
B --> C[SqliteStorage]
B --> D[WorkerPool]
D --> E[PhaseRunner]
E --> F[Phase handlers registered by consumer]
B --> G[TypedEventBus]
G --> H[SSE stream subscribers]
B --> I[WebhookDispatcher]
I --> J[Webhook endpoints]
B --> K[RetentionScheduler]
K --> C
2. Module responsibilities
| Module | Responsibility | Key behavior |
|---|---|---|
src/JobQueue.ts |
Public API and orchestration | Owns startup, enqueue, cancel, retry, pumping, event emission, webhook dispatch, shutdown |
src/storage/SqliteStorage.ts |
SQLite persistence | Creates schema, claims jobs, persists progress, completion, failure, retry, cancellation, stale/delete |
src/processor/WorkerPool.ts |
Concurrency limit | Wraps p-limit, tracks running promises, supports drain with timeout |
src/processor/PhaseRunner.ts |
Multi-phase execution | Runs handlers in order, computes overall progress, stops on cancellation |
src/retry/RetryStrategy.ts |
Retry policy | Classifies errors, computes backoff, decides retry vs fail |
src/events/EventBus.ts |
In-process pub/sub | Strongly typed wrapper around Node EventEmitter |
src/events/SseSerializer.ts |
SSE formatting | Serializes event name + JSON payload into SSE wire format |
src/webhook/WebhookDispatcher.ts |
Outbound HTTP callbacks | Sends POST requests, signs payloads, retries transient failures |
src/retention/RetentionScheduler.ts |
Background cleanup | Periodically marks old jobs stale and later deletes them |
3. Public API surface
Exports from src/index.ts expose both high-level and low-level building blocks:
JobQueueSqliteStorageWorkerPoolPhaseRunnerRetryStrategyTypedEventBusSseSerializerWebhookDispatcherRetentionScheduler- shared queue/job/event types
That split makes the package usable in two modes:
- Normal mode - instantiate
JobQueueand let it coordinate everything - Advanced mode - reuse lower-level pieces independently in custom orchestration
4. Persistence model
SqliteStorage keeps a single jobs table. The queue is effectively modeled as persisted state transitions on that table.
Important columns
| Column | Meaning |
|---|---|
id |
Stable job identifier |
status |
pending, active, completed, failed, cancelled, stale |
data |
Original enqueue payload, JSON-encoded |
current_phase |
Phase currently executing or last failed/retried phase |
phases_json |
Array of per-phase state objects |
phase_results |
JSON object keyed by phase name |
progress / progress_message |
Latest overall progress snapshot |
error_json |
Persisted failure metadata |
retry_count / max_attempts |
Retry bookkeeping |
webhook_url / webhook_sent |
Delivery configuration and latest success flag |
scheduled_at |
Delayed execution / retry wake-up time |
completed_at / cancelled_at / updated_at |
Lifecycle timestamps |
Why SQLite works well here
- queue selection is simple and local
- state transitions are small, synchronous writes
- WAL mode supports concurrent reads while jobs are executing
- no separate broker is required for a single-process runtime
5. Phase model
Each job stores an array of JobPhaseState entries:
| Field | Meaning |
|---|---|
name |
Phase identifier from QueueConfig.phases |
status |
pending, active, completed, failed, cancelled |
progress |
Per-phase progress percentage |
message |
Human-oriented phase status |
startedAt / completedAt |
Phase timestamps |
error |
Last phase-level error string |
PhaseRunner walks those phases sequentially and computes overall progress as:
((phaseIndex + phaseProgress / 100) / totalPhases) * 100
That design gives one stable persisted representation for:
- single-step jobs (
phases: ['run']) - multi-step pipelines (
['download', 'process', 'upload']) - retries that restart only unfinished phases
6. Event model
JobQueue emits in-process typed events first. The SSE stream and webhook flow are adapters on top of that state machine.
Core queue events
job:enqueuedjob:startedjob:progressjob:phase:completedjob:completedjob:failedjob:retryingjob:cancelledjob:stalejob:deletedjob:webhook:deliveredjob:webhook:failed
Event ordering rule
The queue persists state before emitting the corresponding event. That means listeners observe already-persisted state, not speculative state.
This is important for consumers that mix:
queue.on(...)queue.getJob(id)queue.listJobs(...)queue.createEventStream(...)
7. SSE model
createEventStream() creates a web ReadableStream<Uint8Array>.
Behavior:
- Optional snapshot of current jobs is sent first
- Queue subscribes the stream to in-process events
- Each event is serialized as
event: <name>+ JSONdata: ... - Keepalive
pingevents are emitted on an interval - Cancelling the reader removes subscriptions and the keepalive timer
Payload shapes
Most runtime events include a full job object. Two notable exceptions:
| Event | Payload detail |
|---|---|
job:deleted |
Includes deletedJobId because the record was removed from storage |
ping |
No job payload, only a timestamp |
8. Webhook model
Webhooks are outbound notifications, not part of the core execution loop.
Flow
JobQueuedecides whether an event should trigger a webhookWebhookDispatcherPOSTs JSON to queue-level or job-level URL- Optional HMAC SHA-256 signature is attached as
X-JobQueue-Signature - 5xx and transport failures retry with exponential backoff
- Success sets
webhook_sent = 1and emitsjob:webhook:delivered
Scope
Supported webhook-triggering events:
job:completedjob:failedjob:retryingjob:cancelledjob:stale
9. Retention model
Retention is deliberately two-stage:
flowchart LR
A[completed / failed / cancelled] -->|older than staleAfterMs| B[stale]
B -->|older than deleteAfterMs| C[deleted]
Why two stages:
- consumers get a visible grace period before deletion
onStaleandonDeletehooks can clean external artifactsjob:staleis externally observable before hard deletion
10. State machine
stateDiagram-v2
[*] --> pending : enqueue
pending --> active : claimPendingJob
pending --> cancelled : cancel
active --> completed : all phases succeed
active --> failed : fatal error / retries exhausted
active --> pending : recoverable error + retry
active --> cancelled : cancel / abort
completed --> stale : retention mark
failed --> stale : retention mark
cancelled --> stale : retention mark
stale --> [*] : retention delete
failed --> pending : manual retry
cancelled --> pending : manual retry
stale --> pending : manual retry
11. Build and packaging
- ESM only
- Node 20 target
- bundled with
tsup - type declarations emitted from the same entrypoint
- tests run under Vitest in Node environment
12. LikeC4 companion charts
See jobqueue.c4 for:
- system landscape
- library/container view
- runtime/component view
- enqueue-to-complete dynamic flow
- retry dynamic flow