diff --git a/.gitea/workflows/publish.yml b/.gitea/workflows/publish.yml index c8a3577..b1c8bcc 100644 --- a/.gitea/workflows/publish.yml +++ b/.gitea/workflows/publish.yml @@ -6,7 +6,7 @@ on: - 'v*' jobs: - test-and-publish: + verify: runs-on: ubuntu-latest steps: - name: Checkout @@ -30,6 +30,25 @@ jobs: - name: Build run: npm run build + publish: + runs-on: ubuntu-latest + needs: verify + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Node + uses: actions/setup-node@v4 + with: + node-version: 20 + registry-url: https://git.sal.giize.com/api/packages/mozempk/npm/ + + - name: Install deps + run: npm ci + + - name: Build + run: npm run build + - name: Publish run: npm publish env: diff --git a/README.md b/README.md index 57c9179..7930f1b 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ Framework-agnostic async job queue for Node.js with: - **Retention cleanup** for stale and expired jobs - **ESM-only**, **Node 20+** +Detailed architecture and runtime docs live in [`docs/`](./docs/README.md). + ## Install ```bash diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..132a7f9 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,39 @@ +# jobqueue docs + +Detailed architecture and runtime docs for `jobqueue`. + +## Doc map + +| File | Purpose | +| --- | --- | +| [`architecture.md`](./architecture.md) | Static architecture, module boundaries, data model, event model, state model | +| [`runtime-lifecycle.md`](./runtime-lifecycle.md) | Step-by-step runtime behavior from startup through shutdown | +| [`integration-findings.md`](./integration-findings.md) | Multi-agent scan results, verified bugs, fixes, and remaining behavioral notes | +| [`jobqueue.c4`](./jobqueue.c4) | LikeC4 source for landscape, container, component, and runtime views | + +## Mental model + +`jobqueue` is a **single-process orchestrator** around a SQLite-backed job table. + +1. Consumer code creates a `JobQueue`. +2. Jobs are persisted immediately in SQLite. +3. A pump claims runnable jobs and hands them to a concurrency-limited worker pool. +4. `PhaseRunner` executes configured phases and reports progress back through `JobQueue`. +5. `JobQueue` persists each state transition, emits typed events, formats SSE payloads, and optionally sends webhooks. +6. A retention scheduler can mark old jobs as `stale` and later delete them. + +## Rendering LikeC4 views + +The repository stores LikeC4 source in [`jobqueue.c4`](./jobqueue.c4). + +```bash +npx likec4 start docs/jobqueue.c4 +``` + +Recommended views in the file: + +- `index` - system landscape +- `library` - container view of `jobqueue` +- `runtime` - internal runtime/component view +- `enqueue-to-complete` - dynamic happy path +- `retry-flow` - dynamic retry path diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..59f0922 --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,241 @@ +# Architecture + +This document explains how `jobqueue` is structured, what each module owns, and how data moves through the system. + +## 1. Top-level structure + +`JobQueue` is the orchestrator. Everything else is a collaborator around four concerns: + +1. **Persistence** - `SqliteStorage` +2. **Execution** - `WorkerPool` + `PhaseRunner` +3. **Notifications** - `TypedEventBus`, `SseSerializer`, `WebhookDispatcher` +4. **Lifecycle management** - `RetryStrategy`, `RetentionScheduler`, shutdown logic + +```mermaid +flowchart LR + A[Consumer app] -->|enqueue / retry / cancel / list| B[JobQueue] + B --> C[SqliteStorage] + B --> D[WorkerPool] + D --> E[PhaseRunner] + E --> F[Phase handlers registered by consumer] + B --> G[TypedEventBus] + G --> H[SSE stream subscribers] + B --> I[WebhookDispatcher] + I --> J[Webhook endpoints] + B --> K[RetentionScheduler] + K --> C +``` + +## 2. Module responsibilities + +| Module | Responsibility | Key behavior | +| --- | --- | --- | +| `src/JobQueue.ts` | Public API and orchestration | Owns startup, enqueue, cancel, retry, pumping, event emission, webhook dispatch, shutdown | +| `src/storage/SqliteStorage.ts` | SQLite persistence | Creates schema, claims jobs, persists progress, completion, failure, retry, cancellation, stale/delete | +| `src/processor/WorkerPool.ts` | Concurrency limit | Wraps `p-limit`, tracks running promises, supports drain with timeout | +| `src/processor/PhaseRunner.ts` | Multi-phase execution | Runs handlers in order, computes overall progress, stops on cancellation | +| `src/retry/RetryStrategy.ts` | Retry policy | Classifies errors, computes backoff, decides retry vs fail | +| `src/events/EventBus.ts` | In-process pub/sub | Strongly typed wrapper around Node `EventEmitter` | +| `src/events/SseSerializer.ts` | SSE formatting | Serializes event name + JSON payload into SSE wire format | +| `src/webhook/WebhookDispatcher.ts` | Outbound HTTP callbacks | Sends POST requests, signs payloads, retries transient failures | +| `src/retention/RetentionScheduler.ts` | Background cleanup | Periodically marks old jobs stale and later deletes them | + +## 3. Public API surface + +Exports from `src/index.ts` expose both high-level and low-level building blocks: + +- `JobQueue` +- `SqliteStorage` +- `WorkerPool` +- `PhaseRunner` +- `RetryStrategy` +- `TypedEventBus` +- `SseSerializer` +- `WebhookDispatcher` +- `RetentionScheduler` +- shared queue/job/event types + +That split makes the package usable in two modes: + +1. **Normal mode** - instantiate `JobQueue` and let it coordinate everything +2. **Advanced mode** - reuse lower-level pieces independently in custom orchestration + +## 4. Persistence model + +`SqliteStorage` keeps a single `jobs` table. The queue is effectively modeled as persisted state transitions on that table. + +### Important columns + +| Column | Meaning | +| --- | --- | +| `id` | Stable job identifier | +| `status` | `pending`, `active`, `completed`, `failed`, `cancelled`, `stale` | +| `data` | Original enqueue payload, JSON-encoded | +| `current_phase` | Phase currently executing or last failed/retried phase | +| `phases_json` | Array of per-phase state objects | +| `phase_results` | JSON object keyed by phase name | +| `progress` / `progress_message` | Latest overall progress snapshot | +| `error_json` | Persisted failure metadata | +| `retry_count` / `max_attempts` | Retry bookkeeping | +| `webhook_url` / `webhook_sent` | Delivery configuration and latest success flag | +| `scheduled_at` | Delayed execution / retry wake-up time | +| `completed_at` / `cancelled_at` / `updated_at` | Lifecycle timestamps | + +### Why SQLite works well here + +- queue selection is simple and local +- state transitions are small, synchronous writes +- WAL mode supports concurrent reads while jobs are executing +- no separate broker is required for a single-process runtime + +## 5. Phase model + +Each job stores an array of `JobPhaseState` entries: + +| Field | Meaning | +| --- | --- | +| `name` | Phase identifier from `QueueConfig.phases` | +| `status` | `pending`, `active`, `completed`, `failed`, `cancelled` | +| `progress` | Per-phase progress percentage | +| `message` | Human-oriented phase status | +| `startedAt` / `completedAt` | Phase timestamps | +| `error` | Last phase-level error string | + +`PhaseRunner` walks those phases sequentially and computes overall progress as: + +```text +((phaseIndex + phaseProgress / 100) / totalPhases) * 100 +``` + +That design gives one stable persisted representation for: + +- single-step jobs (`phases: ['run']`) +- multi-step pipelines (`['download', 'process', 'upload']`) +- retries that restart only unfinished phases + +## 6. Event model + +`JobQueue` emits in-process typed events first. The SSE stream and webhook flow are adapters on top of that state machine. + +### Core queue events + +- `job:enqueued` +- `job:started` +- `job:progress` +- `job:phase:completed` +- `job:completed` +- `job:failed` +- `job:retrying` +- `job:cancelled` +- `job:stale` +- `job:deleted` +- `job:webhook:delivered` +- `job:webhook:failed` + +### Event ordering rule + +The queue persists state before emitting the corresponding event. That means listeners observe already-persisted state, not speculative state. + +This is important for consumers that mix: + +- `queue.on(...)` +- `queue.getJob(id)` +- `queue.listJobs(...)` +- `queue.createEventStream(...)` + +## 7. SSE model + +`createEventStream()` creates a web `ReadableStream`. + +Behavior: + +1. Optional snapshot of current jobs is sent first +2. Queue subscribes the stream to in-process events +3. Each event is serialized as `event: ` + JSON `data: ...` +4. Keepalive `ping` events are emitted on an interval +5. Cancelling the reader removes subscriptions and the keepalive timer + +### Payload shapes + +Most runtime events include a full `job` object. Two notable exceptions: + +| Event | Payload detail | +| --- | --- | +| `job:deleted` | Includes `deletedJobId` because the record was removed from storage | +| `ping` | No job payload, only a timestamp | + +## 8. Webhook model + +Webhooks are outbound notifications, not part of the core execution loop. + +### Flow + +1. `JobQueue` decides whether an event should trigger a webhook +2. `WebhookDispatcher` POSTs JSON to queue-level or job-level URL +3. Optional HMAC SHA-256 signature is attached as `X-JobQueue-Signature` +4. 5xx and transport failures retry with exponential backoff +5. Success sets `webhook_sent = 1` and emits `job:webhook:delivered` + +### Scope + +Supported webhook-triggering events: + +- `job:completed` +- `job:failed` +- `job:retrying` +- `job:cancelled` +- `job:stale` + +## 9. Retention model + +Retention is deliberately two-stage: + +```mermaid +flowchart LR + A[completed / failed / cancelled] -->|older than staleAfterMs| B[stale] + B -->|older than deleteAfterMs| C[deleted] +``` + +Why two stages: + +- consumers get a visible grace period before deletion +- `onStale` and `onDelete` hooks can clean external artifacts +- `job:stale` is externally observable before hard deletion + +## 10. State machine + +```mermaid +stateDiagram-v2 + [*] --> pending : enqueue + pending --> active : claimPendingJob + pending --> cancelled : cancel + active --> completed : all phases succeed + active --> failed : fatal error / retries exhausted + active --> pending : recoverable error + retry + active --> cancelled : cancel / abort + completed --> stale : retention mark + failed --> stale : retention mark + cancelled --> stale : retention mark + stale --> [*] : retention delete + failed --> pending : manual retry + cancelled --> pending : manual retry + stale --> pending : manual retry +``` + +## 11. Build and packaging + +- ESM only +- Node 20 target +- bundled with `tsup` +- type declarations emitted from the same entrypoint +- tests run under Vitest in Node environment + +## 12. LikeC4 companion charts + +See [`jobqueue.c4`](./jobqueue.c4) for: + +- system landscape +- library/container view +- runtime/component view +- enqueue-to-complete dynamic flow +- retry dynamic flow diff --git a/docs/integration-findings.md b/docs/integration-findings.md new file mode 100644 index 0000000..b4e34f1 --- /dev/null +++ b/docs/integration-findings.md @@ -0,0 +1,141 @@ +# Integration findings + +This document records what the multi-agent scan found, what was verified directly in source, and what changed. + +## 1. Scan method + +The repository was scanned through three independent passes: + +1. **lifecycle scan** - enqueue, scheduling, execution, retry, cancel, shutdown +2. **storage/events scan** - persistence, SSE, webhook, retention interaction +3. **code review scan** - cross-component defects only + +Those results were then checked against source before documenting or patching anything. + +## 2. Confirmed and fixed issues + +### A. Duplicate `job:cancelled` event at phase boundary + +**Observed risk** + +If `cancel()` landed after one phase finished but before the next phase started, two different code paths could emit `job:cancelled`: + +- direct `cancel(id)` call +- `PhaseRunner` cancellation callback on next loop iteration + +**Why it mattered** + +- duplicate event bus notifications +- duplicate SSE `job:cancelled` events +- duplicate `job:cancelled` webhooks + +**Fix** + +`JobQueue` now checks whether the job is already cancelled before the `onCancelled` callback persists or emits anything. + +### B. Pending-job cancellation left phases in `pending` + +**Observed risk** + +Cancelling a job before it started produced: + +- job status: `cancelled` +- phase states: still `pending` + +**Why it mattered** + +- persisted lifecycle shape was contradictory +- dashboards and tooling reading phases could not trust phase status + +**Fix** + +`JobQueue` now marks all unfinished phases as `cancelled` whenever a cancellation is persisted. + +### C. Shutdown could close before in-flight webhook bookkeeping finished + +**Observed risk** + +Webhook dispatch was previously fire-and-forget. A completed job could still be mid-delivery when `shutdown()` closed SQLite. + +**Why it mattered** + +- `webhook_sent` might not be written +- `job:webhook:delivered` / `job:webhook:failed` could be lost +- delivery bookkeeping could throw against a closed database + +**Fix** + +`JobQueue` now tracks pending webhook promises and drains them during shutdown before closing storage. + +### D. Shutdown timeout skipped cleanup + +**Observed risk** + +If `WorkerPool.drain(timeout)` threw, `shutdown()` exited before: + +- removing listeners +- closing storage + +**Why it mattered** + +- leaked resources +- left queue internals half-open after failed shutdown + +**Fix** + +Cleanup now runs in a `finally` path. On timeout, active controllers are aborted, cleanup still executes, and the timeout error is rethrown after teardown. + +## 3. Behavioral notes kept as documentation, not code changes + +These are real integration characteristics, but not all are bugs. + +### `job:completed` precedes `job:webhook:delivered` + +This is expected ordering: + +1. job completion is persisted +2. `job:completed` emits +3. webhook dispatch happens +4. `job:webhook:delivered` emits on success + +So `webhookSent` may still be `false` in the earlier completion event. Consumers should treat webhook delivery as a separate lifecycle step. + +### `job:deleted` does not contain full job payload + +This is intentional and pragmatic. Once a stale record is deleted, the queue only emits `deletedJobId`. The SSE contract reflects deletion, not a resurrected snapshot. + +### Webhooks are best-effort, not durable outbox delivery + +The package retries transient delivery errors, but it does **not** persist a webhook outbox with replay semantics. If a process dies after job completion and before webhook delivery completes, there is no durable re-dispatch queue. + +## 4. Regression coverage added + +New tests now cover: + +- cancelling a pending job marks unfinished phases cancelled +- cancelling on a phase boundary emits one cancellation event +- shutdown waits for in-flight webhooks +- shutdown cleanup still happens when worker drain times out + +## 5. Remaining risk areas + +No new blocking integration bugs were confirmed after patching, but these seams still deserve attention as the library grows: + +1. **Durable outbound delivery** - webhook outbox/idempotency keys if delivery guarantees become stronger +2. **Long-running non-cooperative handlers** - handlers that ignore `AbortSignal` can still force shutdown timeouts +3. **SSE scaling** - each stream currently subscribes directly to the in-process event bus +4. **Storage portability** - queue semantics are tightly coupled to SQLite row-level state transitions + +## 6. Second-scan fixes and coverage expansion + +The deeper follow-up scan confirmed three more issues that were patched: + +1. **Webhook completion after retention deletion** could throw when delivery bookkeeping re-fetched a deleted job. +2. **Partial retry (`fromStart: false`)** dropped completed phase results because retry reset cleared `phase_results`. +3. **Process restart recovery** dropped interrupted phase context in failure metadata. + +Coverage was expanded at three levels: + +- **Unit**: retry strategy, webhook retry policy, worker-pool drain timeout, storage retry/reset semantics +- **Integration**: partial retry behavior, scheduled wakeups, restart recovery, queue lifecycle edges +- **E2E harness**: realistic workflows covering SSE + webhooks + retries + retention deletion diff --git a/docs/jobqueue.c4 b/docs/jobqueue.c4 new file mode 100644 index 0000000..ffbd936 --- /dev/null +++ b/docs/jobqueue.c4 @@ -0,0 +1,140 @@ +specification { + element actor { + style { + shape person + } + } + + element system { + style { + shape rectangle + } + } + + element container { + style { + shape rectangle + } + } + + element component { + style { + shape component + } + } + + element database { + style { + shape storage + } + } + + relationship async { + color amber + line dotted + } +} + +model { + consumer = actor "Consumer application" + webhookReceiver = system "Webhook receiver" + + jobqueue = system "jobqueue" { + api = container "Public API" { + technology "ESM / TypeScript" + description "JobQueue constructor plus enqueue, retry, cancel, query, stream, shutdown APIs" + } + + runtime = container "Runtime orchestrator" { + technology "Node.js" + description "Coordinates persistence, execution, retries, events, SSE, webhooks, and shutdown" + + queue = component "JobQueue" + storage = component "SqliteStorage" + pool = component "WorkerPool" + runner = component "PhaseRunner" + retry = component "RetryStrategy" + events = component "TypedEventBus" + sse = component "SseSerializer" + retention = component "RetentionScheduler" + webhooks = component "WebhookDispatcher" + + queue -> storage "persists job state" + queue -> pool "dispatches runnable jobs" + queue -> runner "executes phase pipeline" + queue -> retry "classifies failures" + queue -> events "emits typed queue events" + queue -> sse "serializes SSE payloads" + queue -> retention "runs stale/delete cycle" + queue -[async]-> webhooks "dispatches outbound callbacks" + } + + sqlite = database "SQLite jobs database" { + technology "better-sqlite3 + WAL" + } + + handlers = container "Registered phase handlers" { + technology "Consumer-provided async functions" + } + + streams = container "SSE subscribers" { + technology "ReadableStream consumers" + } + + api -> runtime.queue "constructs and invokes" + runtime.storage -> sqlite "reads/writes rows" + runtime.runner -> handlers "invokes phase handlers" + runtime.events -> streams "pushes queue events" + } + + consumer -> jobqueue.api "enqueue / retry / cancel / inspect / subscribe" + jobqueue.runtime.webhooks -[async]-> webhookReceiver "POST job events" +} + +views { + view index { + title "jobqueue landscape" + include * + autoLayout LeftRight + } + + view library of jobqueue { + title "jobqueue containers" + include * + autoLayout LeftRight + } + + view runtime of jobqueue.runtime { + title "jobqueue runtime components" + include * + autoLayout LeftRight + } + + dynamic view enqueue-to-complete { + title "Enqueue to successful completion" + consumer -> jobqueue.api "enqueue()" + jobqueue.api -> jobqueue.runtime.queue "create job" + jobqueue.runtime.queue -> jobqueue.runtime.storage "persist pending row" + jobqueue.runtime.queue -> jobqueue.runtime.pool "schedule worker" + jobqueue.runtime.pool -> jobqueue.runtime.runner "run phases" + jobqueue.runtime.runner -> jobqueue.handlers "invoke handler(s)" + jobqueue.runtime.runner -> jobqueue.runtime.storage "persist progress + phase results" + jobqueue.runtime.queue -> jobqueue.runtime.events "emit queue events" + jobqueue.runtime.events -> jobqueue.streams "push SSE" + jobqueue.runtime.queue -> jobqueue.runtime.webhooks "send completion webhook" + jobqueue.runtime.webhooks -> webhookReceiver "POST payload" + jobqueue.runtime.queue -> jobqueue.runtime.storage "mark webhook_sent" + } + + dynamic view retry-flow { + title "Failure and retry flow" + jobqueue.runtime.runner -> jobqueue.handlers "invoke handler" + jobqueue.handlers -> jobqueue.runtime.queue "throw recoverable error" + jobqueue.runtime.queue -> jobqueue.runtime.retry "classify error" + jobqueue.runtime.retry -> jobqueue.runtime.queue "retry with delay" + jobqueue.runtime.queue -> jobqueue.runtime.storage "persist pending retry" + jobqueue.runtime.queue -> jobqueue.runtime.events "emit job:retrying" + jobqueue.runtime.events -> jobqueue.streams "push SSE" + jobqueue.runtime.queue -[async]-> jobqueue.runtime.webhooks "dispatch retry webhook" + } +} diff --git a/docs/runtime-lifecycle.md b/docs/runtime-lifecycle.md new file mode 100644 index 0000000..22127f5 --- /dev/null +++ b/docs/runtime-lifecycle.md @@ -0,0 +1,239 @@ +# Runtime lifecycle + +This document follows one queue instance from construction through shutdown. + +## 1. Construction + +When `new JobQueue(config)` runs, the constructor does more than store config: + +1. normalizes config defaults +2. opens SQLite and enables WAL mode +3. creates retry strategy +4. creates worker pool +5. optionally creates webhook dispatcher +6. resets any previously `active` jobs to `failed` +7. optionally starts retention scheduler +8. requests an initial pump + +### Why `resetActiveJobs()` exists + +`jobqueue` is single-process. If the process dies mid-job, there is no other worker that can safely finish that in-flight job. On next boot, the queue marks those orphaned jobs failed so they do not stay stuck in `active`. + +## 2. Enqueue path + +```mermaid +sequenceDiagram + participant App as Consumer app + participant Queue as JobQueue + participant DB as SqliteStorage + participant Pump as Pump loop + + App->>Queue: enqueue(data, options) + Queue->>DB: createJob(...) + DB-->>Queue: JobRecord(status=pending) + Queue-->>App: jobId + Queue->>Queue: emit job:enqueued + Queue->>Pump: requestPump() +``` + +Key points: + +- enqueue is durable first, asynchronous execution second +- a job can be scheduled for the future with `scheduledAt` +- a per-job webhook URL can override queue-level webhook URL + +## 3. Pumping and dispatch + +The queue uses a **pump loop**, not a constantly-blocking worker thread. + +### Pump rules + +1. stop immediately if queue is closed +2. if another pump is already running, request a repump and return +3. while worker pool has capacity: + - read runnable `pending` jobs whose `scheduled_at <= now` + - try to claim each job atomically + - emit `job:started` + - hand claimed job to `WorkerPool` +4. schedule a wake-up for the next delayed job + +### Why `claimPendingJob()` matters + +The queue lists candidates first, then claims them one by one with a status transition in SQLite. That second step is what prevents the same pending row from being started twice. + +## 4. Job execution + +Each claimed job gets its own `AbortController`. `PhaseRunner` then executes configured phases in order. + +```mermaid +sequenceDiagram + participant Queue as JobQueue + participant Runner as PhaseRunner + participant Handler as Phase handler + participant DB as SqliteStorage + participant Events as TypedEventBus + + Queue->>Runner: run(job, signal) + loop for each phase + Runner->>DB: saveProgress(on phase start) + Runner->>Handler: handler(job, context) + Handler->>Runner: ctx.progress(...) + Runner->>DB: saveProgress(...) + Runner->>Events: job:progress + Handler-->>Runner: phase result + Runner->>DB: savePhaseCompletion(...) + Runner->>Events: job:phase:completed + end + Queue->>DB: completeJob(...) + Queue->>Events: job:completed +``` + +## 5. Progress semantics + +Progress exists at two levels: + +- **phase progress** - what the current handler reports +- **overall progress** - computed from phase index + phase progress + +Example for three phases: + +| Phase | Reported phase progress | Computed overall progress | +| --- | --- | --- | +| `download` | 50 | 17 | +| `process` | 25 | 42 | +| `upload` | 80 | 93 | + +`ctx.progress()` persists that state immediately, then emits `job:progress`. + +## 6. Result passing between phases + +Each handler can return JSON-serializable data. `PhaseRunner` stores that in `phaseResults` and exposes it to later handlers via: + +- `ctx.phaseResult(phaseName)` +- `ctx.phaseResults()` + +This is the mechanism that turns the queue from "single task runner" into "multi-stage pipeline engine". + +## 7. Retry path + +When a handler throws, `JobQueue.handleFailure()` decides between retry and terminal failure. + +```mermaid +sequenceDiagram + participant Handler as Phase handler + participant Queue as JobQueue + participant Retry as RetryStrategy + participant DB as SqliteStorage + participant Events as TypedEventBus + + Handler-->>Queue: throws error + Queue->>Retry: shouldRetry(error, currentJob) + alt recoverable and attempts remain + Queue->>DB: scheduleRetry(...) + Queue->>Events: job:retrying + Queue->>Queue: requestPump() + else fatal or exhausted + Queue->>DB: failJob(...) + Queue->>Events: job:failed + end +``` + +### Retry details + +- `maxAttempts` includes the initial attempt +- default disposition is `fatal` +- backoff can be `fixed`, `linear`, or `exponential` +- recoverable retries keep the job in `pending` with a future `scheduled_at` + +## 8. Cancellation path + +Cancellation is cooperative: + +1. `queue.cancel(id)` aborts the job controller if one exists +2. unfinished phases are persisted as `cancelled` +3. job status becomes `cancelled` +4. `job:cancelled` is emitted +5. later phase-runner cancellation callbacks become no-ops if the job is already cancelled + +That last rule matters. Without it, a cancel request arriving between phase transitions could emit duplicate `job:cancelled` events. The current implementation now guards against that. + +### Pending cancellation + +If a job is cancelled before it starts, **all unfinished phases are also marked `cancelled`**. That keeps the persisted phase graph aligned with the top-level job status. + +## 9. SSE stream lifecycle + +`createEventStream()` creates a stream over queue events. + +### Stream startup + +1. optional snapshot is written first +2. event listeners are attached +3. periodic `ping` keepalive starts + +### Stream shutdown + +- cancelling the reader removes all attached listeners +- keepalive timer is cleared +- no queue state is modified + +## 10. Webhook lifecycle + +Webhooks are triggered off queue events, but they are not the primary source of truth. SQLite is. + +### Completion + webhook ordering + +For a successful job: + +1. `completeJob()` persists `status = completed` +2. `job:completed` is emitted +3. webhook dispatch is scheduled +4. successful delivery marks `webhook_sent = 1` +5. `job:webhook:delivered` is emitted + +This means webhook state becomes visible in a **later event**, not inside the original `job:completed` event. + +### Shutdown interaction + +The queue now tracks in-flight webhook promises and waits for them during shutdown. That avoids closing SQLite while a completed webhook still needs to update `webhook_sent` or emit delivery/failure events. + +## 11. Retention lifecycle + +Retention runs independently from job execution: + +1. compute stale cutoff and delete cutoff +2. mark eligible terminal jobs as `stale` +3. run optional `onStale(job)` callback +4. emit `job:stale` +5. delete stale jobs past delete cutoff +6. run optional `onDelete(job)` callback +7. emit `job:deleted` + +## 12. Shutdown lifecycle + +Shutdown now has two responsibilities: + +1. **stop new work** - mark queue closed, stop retention, clear wake-up timer +2. **tear down safely** - wait for workers, wait for webhooks, remove listeners, close storage + +### Current behavior + +```mermaid +flowchart TD + A[shutdown()] --> B[closed = true] + B --> C[stop retention + clear wakeup timer] + C --> D{workers drained in time?} + D -- yes --> E[drain pending webhooks] + D -- no --> F[abort active controllers] + F --> G[best-effort second drain] + G --> E + E --> H[remove listeners] + H --> I[close SQLite] + I --> J{timeout happened?} + J -- no --> K[resolve] + J -- yes --> L[rethrow timeout error after cleanup] +``` + +### Important nuance + +If a handler ignores `AbortSignal`, shutdown can still time out. The queue now guarantees cleanup still runs, but graceful completion still depends on handler cooperation. diff --git a/src/JobQueue.ts b/src/JobQueue.ts index 3a50e83..ca068ba 100644 --- a/src/JobQueue.ts +++ b/src/JobQueue.ts @@ -74,6 +74,7 @@ export class JobQueue { private readonly handlers = new Map>(); private readonly serializer = new SseSerializer(); private readonly controllers = new Map(); + private readonly pendingWebhookDispatches = new Set>(); private readonly webhookDispatcher?: WebhookDispatcher; private readonly retentionScheduler?: RetentionScheduler; private wakeupTimer: NodeJS.Timeout | null = null; @@ -177,6 +178,7 @@ export class JobQueue { const retried = this.storage.resetForRetry( id, phases, + options.fromStart === false ? existing.phaseResults : {}, existing.maxAttempts, options.scheduledAt instanceof Date ? options.scheduledAt.toISOString() : options.scheduledAt ?? null, ); @@ -194,22 +196,14 @@ export class JobQueue { return job; } - const phases = clonePhases(job.phases); - const activePhaseIndex = phases.findIndex((phase) => phase.status === 'active'); - if (activePhaseIndex >= 0) { - phases[activePhaseIndex] = { - ...phases[activePhaseIndex], - status: 'cancelled', - completedAt: now(), - }; - } + const phases = this.markUnfinishedPhasesCancelled(clonePhases(job.phases)); const controller = this.controllers.get(id); controller?.abort(new CancellationError()); const cancelled = this.storage.cancelJob(id, phases); this.events.emit('job:cancelled', cancelled); - void this.dispatchWebhook('job:cancelled', cancelled); + this.queueWebhookDispatch('job:cancelled', cancelled); this.requestPump(); return cancelled; @@ -378,9 +372,33 @@ export class JobQueue { this.wakeupTimer = null; } - await this.workerPool.drain(timeoutMs); - this.events.removeAllListeners(); - this.storage.close(); + let drainError: unknown; + + try { + await this.workerPool.drain(timeoutMs); + } catch (error) { + drainError = error; + + for (const controller of this.controllers.values()) { + if (!controller.signal.aborted) { + controller.abort(new CancellationError('Job queue shut down before active jobs finished')); + } + } + + try { + await this.workerPool.drain(250); + } catch { + // Best effort: cleanup still needs to run after timeout. + } + } finally { + await this.drainWebhookDispatches(); + this.events.removeAllListeners(); + this.storage.close(); + } + + if (drainError) { + throw drainError; + } } private async pump(): Promise { @@ -508,19 +526,18 @@ export class JobQueue { this.events.emit('job:phase:completed', updated, completedPhase); return updated; }, - onCancelled: async (phaseName, phases) => { - const activePhases = clonePhases(phases); - if (phaseName) { - const active = activePhases.find((phase) => phase.name === phaseName); - if (active) { - active.status = 'cancelled'; - active.completedAt = now(); - } + onCancelled: async (_phaseName, phases) => { + const current = this.requireJob(jobId); + if (current.status === 'cancelled') { + return; } - const cancelled = this.storage.cancelJob(jobId, activePhases); + const cancelled = this.storage.cancelJob( + jobId, + this.markUnfinishedPhasesCancelled(clonePhases(phases)), + ); this.events.emit('job:cancelled', cancelled); - void this.dispatchWebhook('job:cancelled', cancelled); + this.queueWebhookDispatch('job:cancelled', cancelled); }, }); @@ -533,14 +550,17 @@ export class JobQueue { const completed = this.storage.completeJob(jobId, result.phases, result.phaseResults); this.events.emit('job:completed', completed); - void this.dispatchWebhook('job:completed', completed); + this.queueWebhookDispatch('job:completed', completed); } catch (error) { if (error instanceof CancellationError) { const current = this.requireJob(jobId); if (current.status !== 'cancelled') { - const cancelled = this.storage.cancelJob(jobId, clonePhases(current.phases)); + const cancelled = this.storage.cancelJob( + jobId, + this.markUnfinishedPhasesCancelled(clonePhases(current.phases)), + ); this.events.emit('job:cancelled', cancelled); - void this.dispatchWebhook('job:cancelled', cancelled); + this.queueWebhookDispatch('job:cancelled', cancelled); } return; @@ -596,7 +616,7 @@ export class JobQueue { }; this.events.emit('job:retrying', pending, retry); - void this.dispatchWebhook('job:retrying', pending); + this.queueWebhookDispatch('job:retrying', pending); this.requestPump(); return; } @@ -608,7 +628,7 @@ export class JobQueue { const failed = this.storage.failJob(jobId, phases, failure); this.events.emit('job:failed', failed, failure); - void this.dispatchWebhook('job:failed', failed); + this.queueWebhookDispatch('job:failed', failed); } private async markStaleJobs(cutoffIso: string): Promise[]> { @@ -620,7 +640,7 @@ export class JobQueue { } this.events.emit('job:stale', job); - void this.dispatchWebhook('job:stale', job); + this.queueWebhookDispatch('job:stale', job); } return staleJobs; @@ -648,8 +668,10 @@ export class JobQueue { try { const result = await this.webhookDispatcher.dispatch(event, job); this.storage.markWebhookSent(job.id); - const refreshed = this.requireJob(job.id); - this.events.emit('job:webhook:delivered', refreshed, result); + const refreshed = this.getJob(job.id); + if (refreshed) { + this.events.emit('job:webhook:delivered', refreshed, result); + } } catch (error) { const dispatchError = error instanceof Error && 'dispatchError' in error @@ -666,11 +688,48 @@ export class JobQueue { finalAttempt: 1, }; - const refreshed = this.requireJob(job.id); - this.events.emit('job:webhook:failed', refreshed, dispatchError); + const refreshed = this.getJob(job.id); + if (refreshed) { + this.events.emit('job:webhook:failed', refreshed, dispatchError); + } } } + private queueWebhookDispatch(event: WebhookEventName, job: JobRecord): void { + if (!this.webhookDispatcher?.supports(event, job)) { + return; + } + + const pending = this.dispatchWebhook(event, job).finally(() => { + this.pendingWebhookDispatches.delete(pending); + }); + + this.pendingWebhookDispatches.add(pending); + } + + private async drainWebhookDispatches(): Promise { + while (this.pendingWebhookDispatches.size > 0) { + await Promise.allSettled([...this.pendingWebhookDispatches]); + } + } + + private markUnfinishedPhasesCancelled(phases: JobPhaseState[]): JobPhaseState[] { + const cancelledAt = now(); + + return phases.map((phase) => { + if (phase.status === 'completed' || phase.status === 'failed' || phase.status === 'cancelled') { + return phase; + } + + return { + ...phase, + status: 'cancelled', + completedAt: phase.completedAt ?? cancelledAt, + error: null, + }; + }); + } + private pushJobEvent( controller: ReadableStreamDefaultController, encoder: { encode: (input?: string) => Uint8Array }, diff --git a/src/storage/SqliteStorage.ts b/src/storage/SqliteStorage.ts index 9756be1..f2ebf8f 100644 --- a/src/storage/SqliteStorage.ts +++ b/src/storage/SqliteStorage.ts @@ -335,6 +335,7 @@ export class SqliteStorage { public resetForRetry( id: string, phases: JobPhaseState[], + phaseResults: Record, maxAttempts: number, scheduledAt: string | null, ): JobRecord { @@ -346,7 +347,7 @@ export class SqliteStorage { SET status = 'pending', current_phase = NULL, phases_json = ?, - phase_results = '{}', + phase_results = ?, progress = 0, progress_message = NULL, error_json = NULL, @@ -360,7 +361,7 @@ export class SqliteStorage { WHERE id = ? `, ) - .run(JSON.stringify(phases), maxAttempts, now, scheduledAt, id); + .run(JSON.stringify(phases), JSON.stringify(phaseResults), maxAttempts, now, scheduledAt, id); return this.mustGetJob(id); } @@ -402,14 +403,6 @@ export class SqliteStorage { } const now = new Date().toISOString(); - const failure: JobFailure = { - message, - phase: null, - recoverable: false, - timestamp: now, - attempt: 0, - }; - const statement = this.db.prepare( ` UPDATE jobs @@ -423,6 +416,14 @@ export class SqliteStorage { const transaction = this.db.transaction((rows: JobRow[]) => { for (const row of rows) { + const failure: JobFailure = { + message, + phase: row.current_phase, + recoverable: false, + timestamp: now, + attempt: row.retry_count + 1, + }; + statement.run(JSON.stringify(failure), now, now, row.id); } }); diff --git a/tests/JobQueue.e2e.test.ts b/tests/JobQueue.e2e.test.ts new file mode 100644 index 0000000..3c0e46a --- /dev/null +++ b/tests/JobQueue.e2e.test.ts @@ -0,0 +1,107 @@ +import { waitFor } from './helpers.js'; +import { JobQueueHarness } from './e2e/JobQueueHarness.js'; + +describe('JobQueue e2e', () => { + it('runs a multi-phase workflow with retry, SSE, and webhooks', async () => { + const harness = new JobQueueHarness<{ url: string }>(); + let processAttempts = 0; + + try { + const queue = await harness.start( + { + phases: ['download', 'process', 'upload'], + concurrency: 2, + retry: { + maxAttempts: 2, + baseDelayMs: 5, + classifyError: async (error) => + error instanceof Error && error.message === 'recoverable' ? 'recoverable' : 'fatal', + }, + webhook: { + events: ['job:retrying', 'job:completed'], + }, + }, + { webhookDelayMs: 5 }, + ); + const stream = await harness.createStream({ includeSnapshot: false }); + + queue.handle('download', async (_job, ctx) => { + await ctx.progress(100, 'downloaded'); + return { filePath: '/tmp/video.mp4' }; + }); + queue.handle('process', async (_job, ctx) => { + processAttempts += 1; + const filePath = ctx.phaseResult<{ filePath: string }>('download')?.filePath; + + if (processAttempts === 1) { + await ctx.progress(25, 'processing'); + throw new Error('recoverable'); + } + + await ctx.progress(100, 'processed'); + return { outputPath: `${filePath}.json` }; + }); + queue.handle('upload', async (_job, ctx) => { + const outputPath = ctx.phaseResult<{ outputPath: string }>('process')?.outputPath; + await ctx.progress(100, 'uploaded'); + return { uploaded: Boolean(outputPath) }; + }); + + const jobId = await queue.enqueue({ url: 'https://example.com/video' }); + await harness.waitForJobStatus(jobId, 'completed'); + await waitFor(() => harness.webhooks.length >= 2); + await stream.stop(); + + const job = queue.getJob(jobId); + expect(job?.phaseResults.download).toEqual({ filePath: '/tmp/video.mp4' }); + expect(job?.phaseResults.process).toEqual({ outputPath: '/tmp/video.mp4.json' }); + expect(job?.phaseResults.upload).toEqual({ uploaded: true }); + expect(harness.webhooks.map((entry) => entry.event)).toEqual(['job:retrying', 'job:completed']); + expect(stream.eventNames).toContain('job:retrying'); + expect(stream.eventNames).toContain('job:completed'); + expect(stream.eventNames.filter((event) => event === 'job:phase:completed')).toHaveLength(3); + } finally { + await harness.cleanup(); + } + }); + + it('survives stale-job webhook completion after retention deletes the job', async () => { + const harness = new JobQueueHarness<{ url: string }>(); + + try { + const queue = await harness.start( + { + phases: ['run'], + concurrency: 1, + webhook: { + events: ['job:stale'], + }, + retention: { + staleAfterMs: 20, + deleteAfterMs: 40, + intervalMs: 10, + }, + }, + { webhookDelayMs: 80 }, + ); + const stream = await harness.createStream({ includeSnapshot: false }); + + queue.handle('run', async () => ({ ok: true })); + + const firstJob = await queue.enqueue({ url: 'https://example.com/one' }); + await harness.waitForJobStatus(firstJob, 'completed'); + await harness.waitForJobDeletion(firstJob); + await waitFor(() => harness.webhooks.length >= 1, { timeoutMs: 4_000 }); + + const secondJob = await queue.enqueue({ url: 'https://example.com/two' }); + await harness.waitForJobStatus(secondJob, 'completed'); + await stream.stop(); + + expect(harness.webhooks[0]?.event).toBe('job:stale'); + expect(stream.eventNames).toContain('job:stale'); + expect(stream.eventNames).toContain('job:deleted'); + } finally { + await harness.cleanup(); + } + }); +}); diff --git a/tests/JobQueue.test.ts b/tests/JobQueue.test.ts index c3a7984..24d8c45 100644 --- a/tests/JobQueue.test.ts +++ b/tests/JobQueue.test.ts @@ -1,4 +1,6 @@ -import { JobQueue } from '../src/index.js'; +import { createServer } from 'node:http'; + +import { JobQueue, SqliteStorage } from '../src/index.js'; import { cleanupDir, createDbPath, createTempDir, waitFor } from './helpers.js'; describe('JobQueue', () => { @@ -132,4 +134,315 @@ describe('JobQueue', () => { expect(chunks.join('\n')).toContain('event: job:completed'); expect(chunks.join('\n')).toContain('event: job:progress'); }); + + it('marks unfinished phases as cancelled when cancelling a pending job', async () => { + const dir = createTempDir(); + const queue = new JobQueue<{ url: string }>({ + dbPath: createDbPath(dir), + phases: ['download', 'process'], + concurrency: 1, + }); + + try { + const jobId = await queue.enqueue( + { url: 'https://example.com/video' }, + { scheduledAt: new Date(Date.now() + 60_000) }, + ); + const cancelled = await queue.cancel(jobId); + + expect(cancelled.status).toBe('cancelled'); + expect(cancelled.phases.map((phase) => phase.status)).toEqual(['cancelled', 'cancelled']); + } finally { + await queue.shutdown(); + cleanupDir(dir); + } + }); + + it('emits job:cancelled once when cancelling after a phase completes', async () => { + const dir = createTempDir(); + const queue = new JobQueue<{ url: string }>({ + dbPath: createDbPath(dir), + phases: ['download', 'process'], + concurrency: 1, + }); + let processStarted = false; + let cancelledEvents = 0; + + queue.on('job:phase:completed', (job, phase) => { + if (phase.name === 'download') { + void queue.cancel(job.id); + } + }); + queue.on('job:cancelled', () => { + cancelledEvents += 1; + }); + + queue.handle('download', async (_job, ctx) => { + await ctx.progress(100, 'downloaded'); + return { filePath: '/tmp/video.mp4' }; + }); + queue.handle('process', async () => { + processStarted = true; + return { outputPath: '/tmp/video.txt' }; + }); + + try { + const jobId = await queue.enqueue({ url: 'https://example.com/video' }); + await waitFor(() => queue.getJob(jobId)?.status === 'cancelled'); + + expect(cancelledEvents).toBe(1); + expect(processStarted).toBe(false); + expect(queue.getJob(jobId)?.phases.map((phase) => phase.status)).toEqual([ + 'completed', + 'cancelled', + ]); + } finally { + await queue.shutdown(); + cleanupDir(dir); + } + }); + + it('waits for in-flight webhooks during shutdown', async () => { + const dir = createTempDir(); + const deliveries: string[] = []; + const server = createServer((request, response) => { + let body = ''; + request.on('data', (chunk) => { + body += chunk.toString(); + }); + request.on('end', () => { + setTimeout(() => { + deliveries.push(body); + response.writeHead(202).end(); + }, 50); + }); + }); + + await new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => resolve()); + }); + + const address = server.address(); + if (!address || typeof address === 'string') { + throw new Error('Server address unavailable'); + } + + const queue = new JobQueue<{ url: string }>({ + dbPath: createDbPath(dir), + phases: ['run'], + concurrency: 1, + webhook: { + url: `http://127.0.0.1:${address.port}/hook`, + events: ['job:completed'], + }, + }); + let deliveredEvents = 0; + + queue.on('job:webhook:delivered', () => { + deliveredEvents += 1; + }); + queue.handle('run', async () => ({ ok: true })); + + try { + const jobId = await queue.enqueue({ url: 'https://example.com/video' }); + await waitFor(() => queue.getJob(jobId)?.status === 'completed'); + + const startedAt = Date.now(); + await queue.shutdown(); + + expect(Date.now() - startedAt).toBeGreaterThanOrEqual(40); + expect(deliveredEvents).toBe(1); + expect(deliveries).toHaveLength(1); + } finally { + await new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + return; + } + + resolve(); + }); + }); + cleanupDir(dir); + } + }); + + it('cleans up listeners and storage when shutdown times out', async () => { + const dir = createTempDir(); + const queue = new JobQueue<{ url: string }>({ + dbPath: createDbPath(dir), + phases: ['run'], + concurrency: 1, + }); + const internalQueue = queue as JobQueue<{ url: string }> & { + storage: { close: () => void }; + events: { removeAllListeners: () => void }; + }; + const closeSpy = vi.spyOn(internalQueue.storage, 'close'); + const removeAllListenersSpy = vi.spyOn(internalQueue.events, 'removeAllListeners'); + + queue.handle( + 'run', + async (_job, ctx) => + await new Promise((resolve) => { + ctx.signal.addEventListener( + 'abort', + () => { + resolve({ ok: false }); + }, + { once: true }, + ); + }), + ); + + try { + const jobId = await queue.enqueue({ url: 'https://example.com/video' }); + await waitFor(() => queue.getJob(jobId)?.status === 'active'); + + await expect(queue.shutdown(10)).rejects.toThrow(/Timed out waiting for workers to drain/); + expect(removeAllListenersSpy).toHaveBeenCalledTimes(1); + expect(closeSpy).toHaveBeenCalledTimes(1); + } finally { + cleanupDir(dir); + } + }); + + it('preserves completed phase results when retrying from partial progress', async () => { + const dir = createTempDir(); + const queue = new JobQueue<{ url: string }>({ + dbPath: createDbPath(dir), + phases: ['download', 'process'], + concurrency: 1, + }); + let downloadRuns = 0; + let processRuns = 0; + let resumedFilePath: string | undefined; + + queue.handle('download', async () => { + downloadRuns += 1; + return { filePath: '/tmp/video.mp4' }; + }); + queue.handle('process', async (_job, ctx) => { + processRuns += 1; + const filePath = ctx.phaseResult<{ filePath: string }>('download')?.filePath; + + if (processRuns === 1) { + expect(filePath).toBe('/tmp/video.mp4'); + throw new Error('fatal'); + } + + resumedFilePath = filePath; + return { outputPath: `${filePath}.json` }; + }); + + try { + const jobId = await queue.enqueue({ url: 'https://example.com/video' }); + await waitFor(() => queue.getJob(jobId)?.status === 'failed'); + + await queue.retry(jobId, { fromStart: false }); + await waitFor(() => queue.getJob(jobId)?.status === 'completed'); + + expect(downloadRuns).toBe(1); + expect(processRuns).toBe(2); + expect(resumedFilePath).toBe('/tmp/video.mp4'); + expect(queue.getJob(jobId)?.phaseResults.download).toEqual({ filePath: '/tmp/video.mp4' }); + } finally { + await queue.shutdown(); + cleanupDir(dir); + } + }); + + it('restores interrupted active jobs as failed with phase context on restart', async () => { + const dir = createTempDir(); + const dbPath = createDbPath(dir); + const storage = new SqliteStorage<{ url: string }>(dbPath); + + try { + const job = storage.createJob( + 'job-1', + { url: 'https://example.com/video' }, + [ + { + name: 'run', + status: 'pending', + progress: 0, + message: null, + startedAt: null, + completedAt: null, + error: null, + }, + ], + {}, + 1, + ); + + expect(storage.claimPendingJob(job.id)).toBe(true); + storage.saveProgress( + job.id, + 'run', + [ + { + name: 'run', + status: 'active', + progress: 25, + message: 'working', + startedAt: new Date().toISOString(), + completedAt: null, + error: null, + }, + ], + 25, + 'working', + ); + storage.close(); + + const restarted = new JobQueue<{ url: string }>({ + dbPath, + phases: ['run'], + concurrency: 1, + }); + + try { + const restartedJob = restarted.getJob(job.id); + expect(restartedJob?.status).toBe('failed'); + expect(restartedJob?.error?.phase).toBe('run'); + expect(restartedJob?.error?.attempt).toBe(1); + } finally { + await restarted.shutdown(); + } + } finally { + cleanupDir(dir); + } + }); + + it('executes scheduled jobs when their wakeup time arrives', async () => { + const dir = createTempDir(); + const queue = new JobQueue<{ url: string }>({ + dbPath: createDbPath(dir), + phases: ['run'], + concurrency: 1, + }); + const startedAt: number[] = []; + const scheduledAt = Date.now() + 50; + + queue.handle('run', async () => { + startedAt.push(Date.now()); + return { ok: true }; + }); + + try { + const jobId = await queue.enqueue( + { url: 'https://example.com/video' }, + { scheduledAt: new Date(scheduledAt) }, + ); + await waitFor(() => queue.getJob(jobId)?.status === 'completed', { timeoutMs: 4_000 }); + + expect(startedAt).toHaveLength(1); + expect(startedAt[0]).toBeGreaterThanOrEqual(scheduledAt - 10); + } finally { + await queue.shutdown(); + cleanupDir(dir); + } + }); }); diff --git a/tests/RetryStrategy.test.ts b/tests/RetryStrategy.test.ts index 8b3d04e..47aea14 100644 --- a/tests/RetryStrategy.test.ts +++ b/tests/RetryStrategy.test.ts @@ -1,5 +1,29 @@ import { RetryStrategy } from '../src/index.js'; +function createJob(retryCount = 0, maxAttempts = 4) { + return { + id: 'job-1', + status: 'active' as const, + data: {}, + currentPhase: 'run', + phases: [], + phaseResults: {}, + progress: 0, + progressMessage: null, + error: null, + retryCount, + maxAttempts, + webhookUrl: null, + webhookSent: false, + createdAt: new Date().toISOString(), + startedAt: null, + completedAt: null, + updatedAt: new Date().toISOString(), + scheduledAt: null, + cancelledAt: null, + }; +} + describe('RetryStrategy', () => { it('uses exponential backoff for recoverable errors', async () => { const strategy = new RetryStrategy({ @@ -8,60 +32,64 @@ describe('RetryStrategy', () => { classifyError: async () => 'recoverable', }); - const decision = await strategy.shouldRetry(new Error('boom'), { - id: 'job-1', - status: 'active', - data: {}, - currentPhase: 'run', - phases: [], - phaseResults: {}, - progress: 0, - progressMessage: null, - error: null, - retryCount: 1, - maxAttempts: 4, - webhookUrl: null, - webhookSent: false, - createdAt: new Date().toISOString(), - startedAt: null, - completedAt: null, - updatedAt: new Date().toISOString(), - scheduledAt: null, - cancelledAt: null, - }); + const decision = await strategy.shouldRetry(new Error('boom'), createJob(1)); expect(decision.retry).toBe(true); expect(decision.delayMs).toBe(200); expect(decision.disposition).toBe('recoverable'); }); + it('uses fixed backoff when configured', async () => { + const strategy = new RetryStrategy({ + maxAttempts: 4, + strategy: 'fixed', + baseDelayMs: 75, + classifyError: async () => 'recoverable', + }); + + const decision = await strategy.shouldRetry(new Error('boom'), createJob(2)); + + expect(decision.retry).toBe(true); + expect(decision.delayMs).toBe(75); + }); + + it('uses linear backoff and caps to maxDelayMs', async () => { + const strategy = new RetryStrategy({ + maxAttempts: 6, + strategy: 'linear', + baseDelayMs: 100, + maxDelayMs: 250, + classifyError: async () => 'recoverable', + }); + + const decision = await strategy.shouldRetry(new Error('boom'), createJob(4, 6)); + + expect(decision.retry).toBe(true); + expect(decision.delayMs).toBe(250); + }); + + it('stops retrying when max attempts are exhausted', async () => { + const strategy = new RetryStrategy({ + maxAttempts: 3, + classifyError: async () => 'recoverable', + }); + + const decision = await strategy.shouldRetry(new Error('boom'), createJob(2, 3)); + + expect(decision).toEqual({ + retry: false, + delayMs: 0, + disposition: 'recoverable', + }); + }); + it('does not retry fatal errors', async () => { const strategy = new RetryStrategy({ maxAttempts: 4, classifyError: async () => 'fatal', }); - const decision = await strategy.shouldRetry(new Error('fatal'), { - id: 'job-1', - status: 'active', - data: {}, - currentPhase: 'run', - phases: [], - phaseResults: {}, - progress: 0, - progressMessage: null, - error: null, - retryCount: 0, - maxAttempts: 4, - webhookUrl: null, - webhookSent: false, - createdAt: new Date().toISOString(), - startedAt: null, - completedAt: null, - updatedAt: new Date().toISOString(), - scheduledAt: null, - cancelledAt: null, - }); + const decision = await strategy.shouldRetry(new Error('fatal'), createJob()); expect(decision).toEqual({ retry: false, diff --git a/tests/SqliteStorage.test.ts b/tests/SqliteStorage.test.ts index e2a3a06..1a74330 100644 --- a/tests/SqliteStorage.test.ts +++ b/tests/SqliteStorage.test.ts @@ -73,4 +73,126 @@ describe('SqliteStorage', () => { cleanupDir(dir); } }); + + it('preserves phase results when resetting for partial retry', () => { + const dir = createTempDir(); + const storage = new SqliteStorage<{ url: string }>(createDbPath(dir)); + + try { + const job = storage.createJob( + 'job-1', + { url: 'https://example.com' }, + [ + { + name: 'download', + status: 'pending', + progress: 0, + message: null, + startedAt: null, + completedAt: null, + error: null, + }, + { + name: 'process', + status: 'pending', + progress: 0, + message: null, + startedAt: null, + completedAt: null, + error: null, + }, + ], + {}, + 3, + ); + + const retried = storage.resetForRetry( + job.id, + [ + { + name: 'download', + status: 'completed', + progress: 100, + message: null, + startedAt: new Date().toISOString(), + completedAt: new Date().toISOString(), + error: null, + }, + { + name: 'process', + status: 'pending', + progress: 0, + message: null, + startedAt: null, + completedAt: null, + error: null, + }, + ], + { download: { filePath: '/tmp/file' } }, + 3, + null, + ); + + expect(retried.status).toBe('pending'); + expect(retried.progress).toBe(0); + expect(retried.phaseResults.download).toEqual({ filePath: '/tmp/file' }); + } finally { + storage.close(); + cleanupDir(dir); + } + }); + + it('preserves interrupted phase when resetting active jobs on restart', () => { + const dir = createTempDir(); + const storage = new SqliteStorage<{ url: string }>(createDbPath(dir)); + + try { + const job = storage.createJob( + 'job-1', + { url: 'https://example.com' }, + [ + { + name: 'download', + status: 'pending', + progress: 0, + message: null, + startedAt: null, + completedAt: null, + error: null, + }, + ], + {}, + 3, + ); + + expect(storage.claimPendingJob(job.id)).toBe(true); + + storage.saveProgress( + job.id, + 'download', + [ + { + name: 'download', + status: 'active', + progress: 25, + message: 'working', + startedAt: new Date().toISOString(), + completedAt: null, + error: null, + }, + ], + 25, + 'working', + ); + + const reset = storage.resetActiveJobs('Interrupted by process restart'); + expect(reset).toHaveLength(1); + expect(reset[0]?.status).toBe('failed'); + expect(reset[0]?.error?.phase).toBe('download'); + expect(reset[0]?.error?.attempt).toBe(1); + } finally { + storage.close(); + cleanupDir(dir); + } + }); }); diff --git a/tests/WebhookDispatcher.test.ts b/tests/WebhookDispatcher.test.ts index cfe0dac..17debd1 100644 --- a/tests/WebhookDispatcher.test.ts +++ b/tests/WebhookDispatcher.test.ts @@ -72,4 +72,128 @@ describe('WebhookDispatcher', () => { }); } }); + + it('retries retryable webhook responses until success', async () => { + let attempts = 0; + const server = createServer((_request, response) => { + attempts += 1; + response.writeHead(attempts < 3 ? 503 : 202).end(); + }); + + await new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => resolve()); + }); + + const address = server.address(); + if (!address || typeof address === 'string') { + throw new Error('Server address unavailable'); + } + + const dispatcher = new WebhookDispatcher({ + url: `http://127.0.0.1:${address.port}/hook`, + events: ['job:completed'], + maxAttempts: 4, + baseDelayMs: 1, + maxDelayMs: 1, + }); + + try { + const result = await dispatcher.dispatch('job:completed', { + id: 'job-1', + status: 'completed', + data: { url: 'https://example.com' }, + currentPhase: null, + phases: [], + phaseResults: {}, + progress: 100, + progressMessage: null, + error: null, + retryCount: 0, + maxAttempts: 1, + webhookUrl: null, + webhookSent: false, + createdAt: new Date().toISOString(), + startedAt: null, + completedAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + scheduledAt: null, + cancelledAt: null, + }); + + expect(result.status).toBe(202); + expect(attempts).toBe(3); + } finally { + await new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); + } + }); + + it('does not retry non-retryable webhook responses', async () => { + let attempts = 0; + const server = createServer((_request, response) => { + attempts += 1; + response.writeHead(400).end(); + }); + + await new Promise((resolve) => { + server.listen(0, '127.0.0.1', () => resolve()); + }); + + const address = server.address(); + if (!address || typeof address === 'string') { + throw new Error('Server address unavailable'); + } + + const dispatcher = new WebhookDispatcher({ + url: `http://127.0.0.1:${address.port}/hook`, + events: ['job:completed'], + maxAttempts: 4, + baseDelayMs: 1, + maxDelayMs: 1, + }); + + try { + await expect( + dispatcher.dispatch('job:completed', { + id: 'job-1', + status: 'completed', + data: { url: 'https://example.com' }, + currentPhase: null, + phases: [], + phaseResults: {}, + progress: 100, + progressMessage: null, + error: null, + retryCount: 0, + maxAttempts: 1, + webhookUrl: null, + webhookSent: false, + createdAt: new Date().toISOString(), + startedAt: null, + completedAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + scheduledAt: null, + cancelledAt: null, + }), + ).rejects.toThrow(/non-retryable status 400/); + expect(attempts).toBe(1); + } finally { + await new Promise((resolve, reject) => { + server.close((error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); + } + }); }); diff --git a/tests/WorkerPool.test.ts b/tests/WorkerPool.test.ts index e8d063d..d7f8dcf 100644 --- a/tests/WorkerPool.test.ts +++ b/tests/WorkerPool.test.ts @@ -23,4 +23,22 @@ describe('WorkerPool', () => { expect(maxActive).toBe(2); }); + + it('times out while draining slow tasks', async () => { + const pool = new WorkerPool(1); + let release!: () => void; + const blocked = new Promise((resolve) => { + release = resolve; + }); + + void pool.run(async () => { + await blocked; + }); + + await waitFor(() => pool.activeCount === 1); + await expect(pool.drain(10)).rejects.toThrow(/Timed out waiting for workers to drain/); + + release(); + await pool.drain(); + }); }); diff --git a/tests/e2e/JobQueueHarness.ts b/tests/e2e/JobQueueHarness.ts new file mode 100644 index 0000000..33342c3 --- /dev/null +++ b/tests/e2e/JobQueueHarness.ts @@ -0,0 +1,187 @@ +import { createServer } from 'node:http'; + +import { JobQueue, type JobData, type QueueConfig, type QueueStreamEvent, type StreamOptions } from '../../src/index.js'; +import { cleanupDir, createDbPath, createTempDir, waitFor } from '../helpers.js'; + +interface HarnessStream { + events: QueueStreamEvent[]; + eventNames: string[]; + stop: () => Promise; +} + +interface WebhookCapture { + event: string; + body: string; + payload: unknown; +} + +function parseSseChunk(chunk: string): Array<{ event: string; payload: QueueStreamEvent }> { + return chunk + .split('\n\n') + .map((block) => block.trim()) + .filter(Boolean) + .map((block) => { + const lines = block.split('\n'); + const event = lines.find((line) => line.startsWith('event: '))?.slice(7); + const data = lines.find((line) => line.startsWith('data: '))?.slice(6); + + if (!event || !data) { + throw new Error(`Unable to parse SSE block: ${block}`); + } + + return { + event, + payload: JSON.parse(data) as QueueStreamEvent, + }; + }); +} + +export class JobQueueHarness { + public readonly dir = createTempDir('jobqueue-e2e-'); + public readonly webhooks: WebhookCapture[] = []; + + private queue: JobQueue | null = null; + private server: + | ReturnType + | null = null; + private webhookUrl: string | null = null; + private readonly streams: Array<{ stop: () => Promise }> = []; + + public async start( + config: Omit, 'dbPath'>, + options: { webhookDelayMs?: number } = {}, + ): Promise> { + if (config.webhook) { + await this.startWebhookServer(options.webhookDelayMs ?? 0); + } + + this.queue = new JobQueue({ + ...config, + dbPath: createDbPath(this.dir), + webhook: + config.webhook && this.webhookUrl + ? { + ...config.webhook, + url: config.webhook.url ?? this.webhookUrl, + } + : config.webhook, + }); + + return this.queue; + } + + public getQueue(): JobQueue { + if (!this.queue) { + throw new Error('Harness queue has not been started'); + } + + return this.queue; + } + + public async createStream(options: StreamOptions = {}): Promise> { + const events: QueueStreamEvent[] = []; + const eventNames: string[] = []; + const reader = this.getQueue().createEventStream(options).getReader(); + let active = true; + + const readLoop = (async () => { + while (active) { + const { value, done } = await reader.read(); + if (done) { + return; + } + + if (!value) { + continue; + } + + const chunk = new TextDecoder().decode(value); + for (const parsed of parseSseChunk(chunk)) { + eventNames.push(parsed.event); + events.push(parsed.payload); + } + } + })(); + + const stop = async () => { + active = false; + await reader.cancel(); + await readLoop; + }; + + this.streams.push({ stop }); + return { events, eventNames, stop }; + } + + public async waitForJobStatus(id: string, status: string, timeoutMs = 4_000): Promise { + await waitFor(() => this.getQueue().getJob(id)?.status === status, { timeoutMs }); + } + + public async waitForJobDeletion(id: string, timeoutMs = 4_000): Promise { + await waitFor(() => this.getQueue().getJob(id) === null, { timeoutMs }); + } + + public async cleanup(): Promise { + for (const stream of this.streams.splice(0)) { + await stream.stop(); + } + + if (this.queue) { + await this.queue.shutdown(); + this.queue = null; + } + + if (this.server) { + await new Promise((resolve, reject) => { + this.server?.close((error) => { + if (error) { + reject(error); + return; + } + + resolve(); + }); + }); + this.server = null; + } + + cleanupDir(this.dir); + } + + private async startWebhookServer(delayMs: number): Promise { + this.server = createServer((request, response) => { + let body = ''; + request.on('data', (chunk) => { + body += chunk.toString(); + }); + request.on('end', () => { + setTimeout(() => { + let payload: unknown = null; + try { + payload = JSON.parse(body); + } catch { + payload = body; + } + + this.webhooks.push({ + event: String(request.headers['x-jobqueue-event'] ?? ''), + body, + payload, + }); + response.writeHead(202).end(); + }, delayMs); + }); + }); + + await new Promise((resolve) => { + this.server?.listen(0, '127.0.0.1', () => resolve()); + }); + + const address = this.server.address(); + if (!address || typeof address === 'string') { + throw new Error('Webhook server address unavailable'); + } + + this.webhookUrl = `http://127.0.0.1:${address.port}/hook`; + } +}