Files
jobqueue/docs/runtime-lifecycle.md
Giancarmine Salucci a9429e2118 fix: harden queue lifecycle and publish gate
- Preserve phase results on partial retry and keep interrupted phase
  context after restart.
- Avoid webhook bookkeeping crashes when retention deletes stale jobs.
- Add deeper unit, integration, and e2e coverage around queue seams.
- Require verify job to pass before publish runs.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-16 18:39:19 +02:00

240 lines
7.2 KiB
Markdown

# Runtime lifecycle
This document follows one queue instance from construction through shutdown.
## 1. Construction
When `new JobQueue(config)` runs, the constructor does more than store config:
1. normalizes config defaults
2. opens SQLite and enables WAL mode
3. creates retry strategy
4. creates worker pool
5. optionally creates webhook dispatcher
6. resets any previously `active` jobs to `failed`
7. optionally starts retention scheduler
8. requests an initial pump
### Why `resetActiveJobs()` exists
`jobqueue` is single-process. If the process dies mid-job, there is no other worker that can safely finish that in-flight job. On next boot, the queue marks those orphaned jobs failed so they do not stay stuck in `active`.
## 2. Enqueue path
```mermaid
sequenceDiagram
participant App as Consumer app
participant Queue as JobQueue
participant DB as SqliteStorage
participant Pump as Pump loop
App->>Queue: enqueue(data, options)
Queue->>DB: createJob(...)
DB-->>Queue: JobRecord(status=pending)
Queue-->>App: jobId
Queue->>Queue: emit job:enqueued
Queue->>Pump: requestPump()
```
Key points:
- enqueue is durable first, asynchronous execution second
- a job can be scheduled for the future with `scheduledAt`
- a per-job webhook URL can override queue-level webhook URL
## 3. Pumping and dispatch
The queue uses a **pump loop**, not a constantly-blocking worker thread.
### Pump rules
1. stop immediately if queue is closed
2. if another pump is already running, request a repump and return
3. while worker pool has capacity:
- read runnable `pending` jobs whose `scheduled_at <= now`
- try to claim each job atomically
- emit `job:started`
- hand claimed job to `WorkerPool`
4. schedule a wake-up for the next delayed job
### Why `claimPendingJob()` matters
The queue lists candidates first, then claims them one by one with a status transition in SQLite. That second step is what prevents the same pending row from being started twice.
## 4. Job execution
Each claimed job gets its own `AbortController`. `PhaseRunner` then executes configured phases in order.
```mermaid
sequenceDiagram
participant Queue as JobQueue
participant Runner as PhaseRunner
participant Handler as Phase handler
participant DB as SqliteStorage
participant Events as TypedEventBus
Queue->>Runner: run(job, signal)
loop for each phase
Runner->>DB: saveProgress(on phase start)
Runner->>Handler: handler(job, context)
Handler->>Runner: ctx.progress(...)
Runner->>DB: saveProgress(...)
Runner->>Events: job:progress
Handler-->>Runner: phase result
Runner->>DB: savePhaseCompletion(...)
Runner->>Events: job:phase:completed
end
Queue->>DB: completeJob(...)
Queue->>Events: job:completed
```
## 5. Progress semantics
Progress exists at two levels:
- **phase progress** - what the current handler reports
- **overall progress** - computed from phase index + phase progress
Example for three phases:
| Phase | Reported phase progress | Computed overall progress |
| --- | --- | --- |
| `download` | 50 | 17 |
| `process` | 25 | 42 |
| `upload` | 80 | 93 |
`ctx.progress()` persists that state immediately, then emits `job:progress`.
## 6. Result passing between phases
Each handler can return JSON-serializable data. `PhaseRunner` stores that in `phaseResults` and exposes it to later handlers via:
- `ctx.phaseResult(phaseName)`
- `ctx.phaseResults()`
This is the mechanism that turns the queue from "single task runner" into "multi-stage pipeline engine".
## 7. Retry path
When a handler throws, `JobQueue.handleFailure()` decides between retry and terminal failure.
```mermaid
sequenceDiagram
participant Handler as Phase handler
participant Queue as JobQueue
participant Retry as RetryStrategy
participant DB as SqliteStorage
participant Events as TypedEventBus
Handler-->>Queue: throws error
Queue->>Retry: shouldRetry(error, currentJob)
alt recoverable and attempts remain
Queue->>DB: scheduleRetry(...)
Queue->>Events: job:retrying
Queue->>Queue: requestPump()
else fatal or exhausted
Queue->>DB: failJob(...)
Queue->>Events: job:failed
end
```
### Retry details
- `maxAttempts` includes the initial attempt
- default disposition is `fatal`
- backoff can be `fixed`, `linear`, or `exponential`
- recoverable retries keep the job in `pending` with a future `scheduled_at`
## 8. Cancellation path
Cancellation is cooperative:
1. `queue.cancel(id)` aborts the job controller if one exists
2. unfinished phases are persisted as `cancelled`
3. job status becomes `cancelled`
4. `job:cancelled` is emitted
5. later phase-runner cancellation callbacks become no-ops if the job is already cancelled
That last rule matters. Without it, a cancel request arriving between phase transitions could emit duplicate `job:cancelled` events. The current implementation now guards against that.
### Pending cancellation
If a job is cancelled before it starts, **all unfinished phases are also marked `cancelled`**. That keeps the persisted phase graph aligned with the top-level job status.
## 9. SSE stream lifecycle
`createEventStream()` creates a stream over queue events.
### Stream startup
1. optional snapshot is written first
2. event listeners are attached
3. periodic `ping` keepalive starts
### Stream shutdown
- cancelling the reader removes all attached listeners
- keepalive timer is cleared
- no queue state is modified
## 10. Webhook lifecycle
Webhooks are triggered off queue events, but they are not the primary source of truth. SQLite is.
### Completion + webhook ordering
For a successful job:
1. `completeJob()` persists `status = completed`
2. `job:completed` is emitted
3. webhook dispatch is scheduled
4. successful delivery marks `webhook_sent = 1`
5. `job:webhook:delivered` is emitted
This means webhook state becomes visible in a **later event**, not inside the original `job:completed` event.
### Shutdown interaction
The queue now tracks in-flight webhook promises and waits for them during shutdown. That avoids closing SQLite while a completed webhook still needs to update `webhook_sent` or emit delivery/failure events.
## 11. Retention lifecycle
Retention runs independently from job execution:
1. compute stale cutoff and delete cutoff
2. mark eligible terminal jobs as `stale`
3. run optional `onStale(job)` callback
4. emit `job:stale`
5. delete stale jobs past delete cutoff
6. run optional `onDelete(job)` callback
7. emit `job:deleted`
## 12. Shutdown lifecycle
Shutdown now has two responsibilities:
1. **stop new work** - mark queue closed, stop retention, clear wake-up timer
2. **tear down safely** - wait for workers, wait for webhooks, remove listeners, close storage
### Current behavior
```mermaid
flowchart TD
A[shutdown()] --> B[closed = true]
B --> C[stop retention + clear wakeup timer]
C --> D{workers drained in time?}
D -- yes --> E[drain pending webhooks]
D -- no --> F[abort active controllers]
F --> G[best-effort second drain]
G --> E
E --> H[remove listeners]
H --> I[close SQLite]
I --> J{timeout happened?}
J -- no --> K[resolve]
J -- yes --> L[rethrow timeout error after cleanup]
```
### Important nuance
If a handler ignores `AbortSignal`, shutdown can still time out. The queue now guarantees cleanup still runs, but graceful completion still depends on handler cooperation.