10 Commits
v0.0.1 ... main

Author SHA1 Message Date
7c01d7f77f fix(model): skip stale unload command
All checks were successful
Build & Push Docker Image / test (push) Successful in 6m9s
Build & Push Docker Image / build-and-push (push) Successful in 6m33s
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-12 01:08:35 +02:00
d8a73e150a fix(worker): port final segment cleanup
All checks were successful
Build & Push Docker Image / test (push) Successful in 6m2s
Build & Push Docker Image / build-and-push (push) Successful in 6m31s
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-12 00:10:32 +02:00
cb0b07b2ff fix(worker): collapse incremental segments
All checks were successful
Build & Push Docker Image / test (push) Successful in 6m20s
Build & Push Docker Image / build-and-push (push) Successful in 6m29s
Normalize rolling partial-hypothesis chains before final job persistence so downstream clients receive stable transcript segments instead of echoed continuations.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-11 22:46:38 +02:00
mozempk
d3a67f11b3 fix: four worker.rs bugs found during E2E testing
All checks were successful
Build & Push Docker Image / test (push) Successful in 6m21s
Build & Push Docker Image / build-and-push (push) Successful in 6m32s
2026-05-10 16:28:03 +02:00
mozempk
bcaf8680db docs: add FRONTEND_INTEGRATION.md developer guide
All checks were successful
Build & Push Docker Image / test (push) Successful in 5m54s
Build & Push Docker Image / build-and-push (push) Successful in 17s
Comprehensive integration guide for frontend/full-stack developers:

- Architecture overview diagram
- Quick start (submit + poll in ~20 lines)
- Model lifecycle: state machine diagram, all 4 /model/* endpoints,
  SSE event subscription with JS examples
- Job submission: multipart fields, 503 model_not_ready handling,
  retry-with-auto-load pattern
- Job progress: polling vs SSE, all event types with payloads
- Webhooks: job completion + model lifecycle, Express receiver example,
  how to distinguish job vs model payloads
- Health check field reference
- Cancellation semantics (GPU inference not interruptible)
- Full TypeScript type definitions for all API shapes
- React hooks: useModelStatus, useJobStream, useTranscribe
- Complete WhisperClient class example with ensureModelReady,
  streamProgress, and end-to-end transcribe()
- Error reference table with all 400/404/409/503/500 shapes

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-08 23:47:13 +02:00
mozempk
d0148260e3 test: add unit test infrastructure (Docker tester stage + CI)
All checks were successful
Build & Push Docker Image / test (push) Successful in 5m42s
Build & Push Docker Image / build-and-push (push) Successful in 21s
- Add Dockerfile 'tester' stage (FROM builder):
  - Symlinks /usr/local/cuda/lib64/stubs/libcuda.so → libcuda.so.1
    so the test binary can satisfy the dynamic linker without a real GPU
  - Runs `cargo test --release` reusing the cached release build artifacts
    (no recompilation — tests complete in ~6s)
  - docker build --target tester . to run all 30 unit tests

- Add 'test' job to .gitea/workflows/docker-build.yml:
  - Runs before build-and-push (build-and-push needs: test)
  - Builds --target tester with registry build cache
  - Gate: build-and-push only runs when all tests pass

- Add run_tests.sh convenience script for local use:
  - Accepts optional test name filter as first argument
  - Respects CUDA_VERSION / UBUNTU_VERSION env overrides

All 30 unit tests pass:
  error::tests     — 7 tests (OOM detection, ModelNotReady HTTP shape)
  models::tests    — 17 tests (state machine, serialization, retry-after)
  worker::tests    — 6 tests (chunk ranges, silence snap/trim)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-08 18:20:52 +02:00
mozempk
b191fbe200 feat: dynamic model loading/unloading with GPU polling
All checks were successful
Build & Push Docker Image / build-and-push (push) Successful in 8m41s
- Model starts unloaded (lazy); loads on first job or POST /model/load
- Auto-unloads after IDLE_TIMEOUT_SECS (default 300) of inactivity
- POST /model/unload for immediate manual release
- GPU-busy detection: on VRAM OOM, enters WaitingForGpu and retries
  every GPU_POLL_INTERVAL_SECS (default 30) indefinitely
- POST /jobs when unloaded → 503 + Retry-After header, triggers load
- AppError::OutOfMemory and AppError::ModelNotReady variants
- WorkerCmd channel (SyncSender<WorkerCmd>) replaces bare tx_req channel
- Idle timer via recv_timeout(1s) tick inside OS thread (no extra thread)
- Model lifecycle events broadcast via tokio broadcast channel (SSE + webhooks)
- webhook_registry: all clients that ever submitted a webhook_url receive
  model_ready and model_unloaded webhooks
- GPU warmup retained on every (re)load

New routes:
  GET  /model/status  — current state + VRAM stats
  POST /model/load    — trigger load (idempotent)
  POST /model/unload  — immediate unload
  GET  /model/events  — SSE stream of model lifecycle events

New env vars:
  IDLE_TIMEOUT_SECS       (default 300)
  GPU_POLL_INTERVAL_SECS  (default 30)

Tests:
  tests/test_model_lifecycle.sh — 18 integration tests (full state machine,
    SSE events, webhooks, concurrency, unload-during-load)
  tests/test_idle_timeout.sh    — 5 tests with short IDLE_TIMEOUT_SECS=5
  test_all.sh updated: loads model before job submission, asserts
    model_state in /health, adds POST /model/unload at end

Docs:
  docs/USAGE.md: model lifecycle section, new env vars, 503 retry pattern,
    updated /health response shape

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-08 17:57:20 +02:00
mozempk
78c6fab81b fix: remove duplicate old test suite and fix step 9 pipe/heredoc bug
All checks were successful
Build & Push Docker Image / build-and-push (push) Successful in 16s
Step 9 used 'echo $RESULT | python3 - << HEREDOC' which is a bash gotcha:
the heredoc takes over stdin (as the script source), so the pipe is
silently ignored and sys.stdin.read() returns empty string → JSONDecodeError.

Fix: write RESULT to a temp file and pass it as sys.argv[1] to the script.

Also removed the old buggy test suite that was accidentally left appended
at lines 181-327 (had language=auto, ['id'] field, wrong DELETE assertion).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-06 12:13:15 +02:00
mozempk
fd8d4deefb fix: GPU warmup on startup + fix test_all.sh + document cold-GPU finding
All checks were successful
Build & Push Docker Image / build-and-push (push) Successful in 6m39s
GPU warmup (src/transcriber.rs):
  After creating WhisperState, run a 1s silent inference pass in load().
  CUDA JIT-compiles device kernels on the first whisper_full_with_state call.
  On a cold GPU this compilation disrupts the decode pipeline mid-inference,
  returning 0 segments in ~0.5s. The warmup forces all kernel compilation at
  startup so the first real job runs on fully compiled kernels.

test_all.sh:
  - Fix submit response field: 'id' → 'job_id' (was breaking all downstream steps)
  - Remove language=auto: not a valid ISO 639-1 code; omit field for auto-detect
  - Make BASE and AUDIO configurable via env vars (WHISPER_BASE_URL, TEST_AUDIO)
  - Fix DELETE assertion: completed jobs return 409 Conflict, not 204
  - Add explicit zero-segments failure check in quality inspection (step 9)
  - Add progress reporting to poll loop

docs/FINDINGS.md + KNOWLEDGE.md:
  Document cold GPU warmup issue, root cause, and fix.
  Document language=auto as invalid API usage.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-06 11:57:30 +02:00
mozempk
d5a88d1866 fix: create WhisperState once at load time, reuse across all chunks
Some checks failed
Build & Push Docker Image / build-and-push (push) Has been cancelled
Previously create_state() was called for every 60s audio chunk, triggering
whisper_init_state() each time. This allocates ~700 MB of GPU compute buffers
(KV caches, CUDA workspace) and re-initialises the CUDA backend per chunk.

For a 101-minute audio (102 chunks), this caused 102 GPU re-initialisations
and VRAM allocation cycles. Under VRAM pressure from concurrent processes,
CUDA allocation failures occurred silently — whisper returned language
detection results but 0 segments.

Fix: create WhisperState once in Transcriber::load() and reuse it for every
transcription call. GPU memory is stable; no_context=true prevents KV-cache
contamination between chunks.

WhisperState is Send+Sync (explicitly declared in whisper-rs) and holds its
own Arc<WhisperInnerContext>, so the model weights stay alive even after
WhisperContext is dropped.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-06 11:51:33 +02:00
20 changed files with 4130 additions and 370 deletions

View File

@@ -18,7 +18,30 @@ env:
UBUNTU_VERSION: ${{ vars.UBUNTU_VERSION || '22.04' }}
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Run unit tests
uses: docker/build-push-action@v6
with:
context: .
file: ./Dockerfile
target: tester
push: false
build-args: |
CUDA_VERSION=${{ env.CUDA_VERSION }}
UBUNTU_VERSION=${{ env.UBUNTU_VERSION }}
cache-from: type=registry,ref=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:buildcache
build-and-push:
needs: test
runs-on: ubuntu-latest
steps:

View File

@@ -82,6 +82,27 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
&& cp target/release/whisper-server /usr/local/bin/whisper-server
# ╔══════════════════════════════════════════════════════════╗
# ║ STAGE 1b — tester ║
# ║ Runs unit tests against the release build artifacts ║
# ║ Uses CUDA stubs so tests run without a physical GPU ║
# ║ ║
# ║ Usage: ║
# ║ docker build --target tester . ║
# ╚══════════════════════════════════════════════════════════╝
FROM builder AS tester
# libcuda.so.1 stub — satisfies the dynamic linker without a real driver
RUN ln -sf /usr/local/cuda/lib64/stubs/libcuda.so \
/usr/local/cuda/lib64/stubs/libcuda.so.1
# Reuse the same cache mounts so no recompilation is needed
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/build/target \
LD_LIBRARY_PATH=/usr/local/cuda/lib64/stubs \
cargo test --release
# ╔══════════════════════════════════════════════════════════╗
# ║ STAGE 2 — runtime ║
# ║ Minimal CUDA runtime image — no build tools ║

View File

@@ -15,7 +15,27 @@ Model: ggml-large-v3, chunking at 60s on silence boundaries
---
## Critical Bugs Found & Fixed
## Cold GPU Warmup — First Job Returns 0 Segments in ~0.5s
**Severity: Critical (production issue, intermittent, hard to diagnose)**
**Symptom:** After a container restart, the very first submitted job completes in ~0.5 seconds and returns 0 segments. Subsequent jobs work correctly.
**Root cause:** CUDA JIT-compiles its kernels on the **first** call to `whisper_full_with_state`. On a cold GPU, this compilation happens mid-inference and blocks/disrupts the decode pipeline, causing whisper to return immediately with 0 segments.
**Why language detection can still succeed:** Language detection uses only a small mel-spectrogram + encoder pass on the first 30 seconds of audio. Some of these kernels may already be compiled or cached from a prior session. The full decoder kernels (the heavier ones) are what get JIT-compiled on the first full inference.
**Fix:** In `Transcriber::load()`, after creating the state, run a 1-second silent inference pass:
```rust
let silence = vec![0.0f32; 16_000]; // 1s @ 16 kHz
let mut wp = FullParams::new(SamplingStrategy::Greedy { best_of: 1 });
wp.set_language(Some("en"));
let _ = state.full(wp, &silence); // forces CUDA JIT — 0 segments expected
tracing::info!("GPU warmup complete");
```
This forces all CUDA kernel compilation at startup. The first real job then runs on fully compiled kernels. Startup takes a few seconds longer but every job is reliable.
---
### `set_detect_language(true)` is NOT "auto-detect and transcribe"
- `whisper.cpp` source: `if (params.detect_language) { return 0; }` — it exits immediately after language detection, returns 0 segments
@@ -38,7 +58,14 @@ Model: ggml-large-v3, chunking at 60s on silence boundaries
- **Possible future fix**: post-process to collapse consecutive identical segments (user declined this for now — raw output only)
- `compression_ratio_thold` may also help but wasn't tested
### 2. Five significant content gaps (~1600 words total)
### 4. Cold GPU: first job returns 0 segments in ~0.5s (intermittent, after container restart)
CUDA JIT-compiles kernels on the first call to `whisper_full_with_state`. On a cold GPU this compilation blocks/disrupts the decode pipeline mid-inference, causing an immediate return with 0 segments.
**Fix**: Run a 1-second silent warmup inference in `Transcriber::load()`. This forces JIT compilation at startup so the first real job runs on fully compiled kernels.
---
- Largest: 439 words at ~68 min, 328 words at ~80 min, then 3 × ~293-250 word gaps
- These are chunks where whisper produced off-topic or repetitive output instead of real content
- Likely caused by: speaker overlap, audience noise, or poor audio quality in those windows

View File

@@ -4,7 +4,39 @@ This document records all non-obvious behaviour, surprising bugs, hardware quirk
---
## whisper.cpp
### Cold GPU: first job returns 0 segments in ~0.5s after container restart
**Symptom:** After container restart, the first submitted job completes in ~0.5s and returns 0 segments. Language is detected correctly. All subsequent jobs work fine.
**Root cause:** CUDA JIT-compiles its device kernels on the first call to `whisper_full_with_state`. On a cold GPU, this compilation happens synchronously mid-inference and disrupts the decode pipeline, causing it to return immediately with 0 results.
**Why subsequent jobs are fine:** Compiled kernels are cached in the CUDA driver for the lifetime of the process. Once the first (warmup) call completes, all further calls use the cached compiled kernels.
**Why language detection can succeed on the same call:** Language detection uses a mel-spectrogram + encoder pass on the first 30s of audio. These lighter kernels may compile faster or be partially cached, while the full decoder kernels (the heavier path) are what causes the failure.
**Fix (in `Transcriber::load()`):**
```rust
let silence = vec![0.0f32; 16_000]; // 1s @ 16 kHz — just enough to trigger kernel compilation
let mut wp = FullParams::new(SamplingStrategy::Greedy { best_of: 1 });
wp.set_language(Some("en"));
wp.set_print_progress(false);
let _ = state.full(wp, &silence); // 0 segments expected; side-effect is the goal
tracing::info!("GPU warmup complete");
```
**Also fixed simultaneously:** `create_state()` was called per-chunk (~700 MB GPU allocation each time), causing VRAM churn under concurrent processes. State is now created once and reused. See `WhisperState` reuse section above.
---
### `language=auto` is not a valid API parameter
Passing `language=auto` in the multipart form is silently incorrect. The `language` field expects an ISO 639-1 code (e.g. `en`, `fr`) or should be **omitted entirely** for auto-detection. Passing "auto" causes whisper-rs to pass the string "auto" as a language code, which whisper.cpp does not recognise and may fallback in undefined ways.
**Correct usage:**
- Auto-detect: omit the `language` field entirely
- Explicit: `language=en`
---
### `detect_language=true` is a language-ID-only mode — NOT "auto-detect and transcribe"

View File

@@ -0,0 +1,941 @@
# Frontend Integration Guide
> **Audience:** Frontend / full-stack developers integrating the whisper transcription API into a web application.
> **Base URL:** `http://your-server:8080` (configurable via the `PORT` env var on the server).
> **Interactive docs:** `http://your-server:8080/docs` (Swagger UI — try every endpoint live).
---
## Table of Contents
1. [Architecture Overview](#1-architecture-overview)
2. [Quick Start — submit and poll](#2-quick-start--submit-and-poll)
3. [Model Lifecycle](#3-model-lifecycle)
- 3.1 [State machine](#31-state-machine)
- 3.2 [GET /model/status](#32-get-modelstatus)
- 3.3 [POST /model/load](#33-post-modelload)
- 3.4 [POST /model/unload](#34-post-modelunload)
- 3.5 [GET /model/events (SSE)](#35-get-modelevents-sse)
4. [Submitting Jobs](#4-submitting-jobs)
- 4.1 [POST /jobs](#41-post-jobs)
- 4.2 [Handling 503 Model Not Ready](#42-handling-503-model-not-ready)
- 4.3 [Retry pattern with auto-load](#43-retry-pattern-with-auto-load)
5. [Tracking Job Progress](#5-tracking-job-progress)
- 5.1 [GET /jobs/:id (poll)](#51-get-jobsid-poll)
- 5.2 [GET /jobs/:id/stream (SSE)](#52-get-jobsidstream-sse)
6. [Webhooks](#6-webhooks)
- 6.1 [Job completion webhook](#61-job-completion-webhook)
- 6.2 [Model lifecycle webhooks](#62-model-lifecycle-webhooks)
7. [Health Check](#7-health-check)
8. [Cancelling Jobs](#8-cancelling-jobs)
9. [TypeScript Types](#9-typescript-types)
10. [React Hooks](#10-react-hooks)
11. [Complete Integration Example](#11-complete-integration-example)
12. [Error Reference](#12-error-reference)
---
## 1. Architecture Overview
```
┌─────────────────────────────────────────────────────────┐
│ whisper-server │
│ │
│ HTTP / SSE Worker thread (GPU) │
│ ──────────── ─────────────────── │
│ POST /jobs ───► job queue (FIFO) │
│ GET /jobs/:id ↕ │
│ GET /jobs/:id/stream ◄── progress broadcast │
│ │
│ POST /model/load ─► load whisper into VRAM │
│ POST /model/unload ► free VRAM │
│ GET /model/status read state │
│ GET /model/events ◄── lifecycle SSE broadcast │
└─────────────────────────────────────────────────────────┘
```
**Key behaviours to understand before building:**
- The model starts **unloaded** on every server restart. No inference is possible until it loads (~1525 seconds for large-v3 on an RTX 2080).
- Submitting a job when the model is not ready returns `503` with a `Retry-After` header **and automatically triggers a load**. You can retry the submission; no separate load call is needed.
- The worker processes jobs **sequentially** (one at a time). Queue depth is visible via `/health`.
- Long audio is split into silence-bounded chunks internally. SSE `progress` events reflect chunk completion, not raw GPU progress.
---
## 2. Quick Start — submit and poll
The simplest possible integration — no SSE, no model management, just submit and poll:
```typescript
const BASE = 'http://your-server:8080';
async function transcribe(audioBlob: Blob): Promise<Job> {
// 1. Submit
const form = new FormData();
form.append('audio', audioBlob, 'audio.wav');
let submitResp = await fetch(`${BASE}/jobs`, { method: 'POST', body: form });
// 2. If model isn't loaded yet, keep retrying until it is
while (submitResp.status === 503) {
const retryAfter = parseInt(submitResp.headers.get('Retry-After') ?? '15');
await sleep(retryAfter * 1000);
submitResp = await fetch(`${BASE}/jobs`, { method: 'POST', body: form });
}
if (!submitResp.ok) throw new Error(`Submit failed: ${submitResp.status}`);
const { job_id } = await submitResp.json();
// 3. Poll until done
while (true) {
await sleep(2000);
const job: Job = await fetch(`${BASE}/jobs/${job_id}`).then(r => r.json());
if (job.status === 'done') return job;
if (job.status === 'failed') throw new Error(job.error ?? 'transcription failed');
if (job.status === 'cancelled') throw new Error('job was cancelled');
}
}
const sleep = (ms: number) => new Promise(r => setTimeout(r, ms));
```
> For a better UX — real-time progress bar, model state indicator — read the full sections below.
---
## 3. Model Lifecycle
### 3.1 State machine
The model moves through four states:
```
job submit
or POST /model/load
┌──────────▼───────────┐
│ Unloaded │◄──────────────────────────┐
└──────────┬───────────┘ │
│ load triggered │
┌──────────▼───────────┐ │
│ Loading │ │ idle timeout
└──┬──────────────┬────┘ │ or POST /model/unload
│ success │ VRAM full │
│ │ │
┌──▼────┐ ┌──────▼────────────────┐ │
│ Ready │ │ WaitingForGpu │────────────────►│
└──┬────┘ └──────────────┬────────┘ │
│ retry ok ────┘ │
└────────────────────────────────────────────────►┘
```
| State | `state` value | Can accept jobs? |
|-------|--------------|-----------------|
| Unloaded | `"unloaded"` | ❌ → triggers load, returns 503 |
| Loading | `"loading"` | ❌ → returns 503 |
| Waiting for GPU | `"waiting_for_gpu"` | ❌ → returns 503 |
| Ready | `"ready"` | ✅ |
---
### 3.2 `GET /model/status`
Returns the current model state and live VRAM figures (from `nvidia-smi`).
**Unloaded:**
```json
{ "state": "unloaded" }
```
**Loading:**
```json
{ "state": "loading" }
```
**Waiting for GPU (VRAM contention):**
```json
{
"state": "waiting_for_gpu",
"vram_needed_mb": 3951,
"vram_free_mb": 512,
"retry_in_secs": 30
}
```
**Ready:**
```json
{
"state": "ready",
"loaded_at": "2026-05-10T14:00:00.000Z",
"vram_used_mb": 4096,
"vram_total_mb": 8192
}
```
> `vram_used_mb` / `vram_total_mb` are omitted when `nvidia-smi` is unavailable.
---
### 3.3 `POST /model/load`
Tells the server to load the model. **Idempotent** — safe to call multiple times.
```bash
curl -X POST http://your-server:8080/model/load
```
**Responses:**
| Status | Body | Meaning |
|--------|------|---------|
| 202 | `{"status":"load_initiated"}` | Load queued |
| 200 | `{"status":"already_ready"}` | Already loaded |
The load happens asynchronously. Subscribe to `/model/events` or poll `/model/status` to know when ready.
---
### 3.4 `POST /model/unload`
Immediately frees the model from GPU memory. In-flight jobs finish first; the model is dropped after the current inference completes.
```bash
curl -X POST http://your-server:8080/model/unload
```
**Response:** `200 {"status":"unload_requested"}` (always, regardless of current state).
> Use this if you know transcription won't happen for a while and you want to free VRAM for other workloads on the same GPU.
---
### 3.5 `GET /model/events` (SSE)
A persistent Server-Sent Events stream that emits every model lifecycle transition.
```bash
curl -N http://your-server:8080/model/events
```
**Events emitted:**
```
event: model_loading
data: {"type":"model_loading"}
event: model_ready
data: {"type":"model_ready","loaded_at":"2026-05-10T14:00:00.000Z"}
event: model_unloaded
data: {"type":"model_unloaded"}
event: model_waiting_for_gpu
data: {"type":"model_waiting_for_gpu","vram_needed_mb":3951,"vram_free_mb":512,"retry_in_secs":30}
```
**JavaScript:**
```typescript
function subscribeModelEvents(
onReady: (loadedAt: string) => void,
onUnloaded: () => void,
onLoading: () => void,
onWaitingGpu: (info: { vram_needed_mb: number; vram_free_mb: number; retry_in_secs: number }) => void,
): () => void {
const es = new EventSource(`${BASE}/model/events`);
es.addEventListener('model_ready', (e) => onReady(JSON.parse(e.data).loaded_at));
es.addEventListener('model_unloaded', () => onUnloaded());
es.addEventListener('model_loading', () => onLoading());
es.addEventListener('model_waiting_for_gpu',(e) => onWaitingGpu(JSON.parse(e.data)));
es.onerror = () => {
// The browser reconnects automatically with exponential backoff.
// Log the error but don't tear down the listener.
console.warn('model/events connection dropped, reconnecting…');
};
return () => es.close(); // call this to clean up (e.g. in React useEffect return)
}
```
> The server sends an SSE keepalive comment every 15 seconds so proxies don't close idle connections.
---
## 4. Submitting Jobs
### 4.1 `POST /jobs`
**Content-Type:** `multipart/form-data`
| Field | Required | Type | Notes |
|-------|----------|------|-------|
| `audio` | ✅ | file | Any format ffmpeg understands: WAV, MP3, M4A, OGG, FLAC, MP4, MKV … No size limit. |
| `language` | ❌ | string | ISO 639-1 code (`"en"`, `"it"`, `"fr"` …). Omit for auto-detection. |
| `task` | ❌ | string | `"transcribe"` (default) or `"translate"` (→ English) |
| `webhook_url` | ❌ | string | URL to POST the completed job to. Also registers the URL for model lifecycle webhooks. |
**202 Accepted:**
```json
{ "job_id": "550e8400-e29b-41d4-a716-446655440000" }
```
```typescript
async function submitJob(
audio: Blob,
opts: { language?: string; task?: 'transcribe' | 'translate'; webhookUrl?: string } = {}
): Promise<string> {
const form = new FormData();
form.append('audio', audio, 'audio.wav');
if (opts.language) form.append('language', opts.language);
if (opts.task) form.append('task', opts.task);
if (opts.webhookUrl) form.append('webhook_url', opts.webhookUrl);
const resp = await fetch(`${BASE}/jobs`, { method: 'POST', body: form });
if (!resp.ok) throw await toApiError(resp);
const { job_id } = await resp.json();
return job_id;
}
```
---
### 4.2 Handling 503 Model Not Ready
When the model isn't loaded, `POST /jobs` returns:
```
HTTP/1.1 503 Service Unavailable
Retry-After: 30
Content-Type: application/json
```
```json
{
"error": "model_not_ready",
"state": "unloaded",
"retry_after_secs": 30
}
```
**`retry_after_secs` by state:**
| `state` | `retry_after_secs` | Why |
|---------|-------------------|-----|
| `unloaded` | 30 | Load just triggered; RTX 2080 + large-v3 loads in ~1525s |
| `loading` | 10 | Already loading; check again soon |
| `waiting_for_gpu` | `GPU_POLL_INTERVAL_SECS` (default 30) | VRAM busy; retry later |
> **Submitting a job when the model is `unloaded` automatically triggers a load.** You do NOT need a separate `POST /model/load` call for the normal happy path.
---
### 4.3 Retry pattern with auto-load
```typescript
async function submitWithRetry(
audio: Blob,
opts: { language?: string; task?: 'transcribe' | 'translate'; webhookUrl?: string } = {},
maxAttempts = 20,
): Promise<string> {
const form = new FormData();
form.append('audio', audio, 'audio.wav');
if (opts.language) form.append('language', opts.language);
if (opts.task) form.append('task', opts.task);
if (opts.webhookUrl) form.append('webhook_url', opts.webhookUrl);
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
const resp = await fetch(`${BASE}/jobs`, { method: 'POST', body: form });
if (resp.status === 202) {
const { job_id } = await resp.json();
return job_id;
}
if (resp.status === 503) {
const body = await resp.json();
const waitMs = (parseInt(resp.headers.get('Retry-After') ?? '15') + 1) * 1000;
console.log(`Model ${body.state} — waiting ${waitMs / 1000}s (attempt ${attempt}/${maxAttempts})`);
await sleep(waitMs);
continue;
}
throw await toApiError(resp);
}
throw new Error(`Model did not become ready after ${maxAttempts} attempts`);
}
```
> **Tip:** For a better UX, subscribe to `GET /model/events` and wait for the `model_ready` event instead of sleeping blindly — then submit immediately when ready.
---
## 5. Tracking Job Progress
Two patterns: **SSE** (real-time push) or **polling** (simpler). SSE is preferred for UX.
### 5.1 `GET /jobs/:id` (poll)
Returns the full job document. Poll every 25 seconds while `status` is `queued` or `running`.
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"status": "running",
"task": "transcribe",
"language": "en",
"progress": 42,
"duration_secs": 120.5,
"segments": [],
"created_at": "2026-05-10T14:00:00.000Z"
}
```
When `status === "done"`:
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"status": "done",
"task": "transcribe",
"language": "en",
"progress": 100,
"duration_secs": 120.5,
"segments": [
{ "index": 0, "start": 0.0, "end": 3.5, "text": "Hello, world.", "words": [] },
{ "index": 1, "start": 3.6, "end": 7.2, "text": "How are you?", "words": [] }
],
"created_at": "2026-05-10T14:00:00.000Z",
"completed_at": "2026-05-10T14:02:35.000Z"
}
```
**Terminal statuses:** `done`, `failed`, `cancelled` — stop polling when you see one.
---
### 5.2 `GET /jobs/:id/stream` (SSE)
Subscribe immediately after submission. The connection is held open and events are pushed as they occur.
**Event types:**
```
event: progress
data: {"type":"progress","percent":42,"chunk":3,"chunks_total":7}
event: done
data: {"type":"done","job":{...full Job object...}}
event: error
data: {"type":"error","message":"whisper inference failed: ..."}
```
- `percent` — overall job progress 0100 (derived from chunks completed / total).
- `chunk` / `chunks_total` — the audio is split on silences; each chunk is one whisper inference call.
- If you open the stream after the job is already finished, you immediately receive a single `done` event.
```typescript
function streamJobProgress(
jobId: string,
onProgress: (percent: number, chunk: number, total: number) => void,
onDone: (job: Job) => void,
onError: (message: string) => void,
): () => void {
const es = new EventSource(`${BASE}/jobs/${jobId}/stream`);
es.addEventListener('progress', (e) => {
const { percent, chunk, chunks_total } = JSON.parse(e.data);
onProgress(percent, chunk, chunks_total);
});
es.addEventListener('done', (e) => {
const { job } = JSON.parse(e.data);
es.close();
onDone(job);
});
es.addEventListener('error', (e) => {
// SSE protocol error vs application error — check if data exists
if ('data' in e) {
const { message } = JSON.parse((e as MessageEvent).data);
onError(message);
}
es.close();
});
return () => es.close();
}
```
> **Note:** Do not confuse the SSE `error` event (connection drop — no `data`) with the application `error` event (transcription failure — has `data`). The example above handles both.
---
## 6. Webhooks
Webhooks are fired as HTTP `POST` requests with `Content-Type: application/json` to the `webhook_url` you supply at job submission. The server retries up to 3 times with exponential backoff (1s, 2s) on non-2xx responses.
### 6.1 Job completion webhook
Fired when a job reaches `done`, `failed`, or `cancelled`.
**Payload:** the full `Job` object (same as `GET /jobs/:id`).
```json
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"status": "done",
"task": "transcribe",
"language": "en",
"progress": 100,
"duration_secs": 120.5,
"segments": [
{ "index": 0, "start": 0.0, "end": 3.5, "text": "Hello, world.", "words": [] }
],
"created_at": "2026-05-10T14:00:00.000Z",
"completed_at": "2026-05-10T14:02:35.000Z"
}
```
### 6.2 Model lifecycle webhooks
**Any URL that has ever appeared as a `webhook_url` in a job submission** also receives model lifecycle webhooks for the lifetime of the server process. This lets your backend know when the model comes up or goes down without polling.
Only two events are delivered via webhook (the others are SSE-only):
**Model ready:**
```json
{ "type": "model_ready", "loaded_at": "2026-05-10T14:00:00.000Z" }
```
**Model unloaded:**
```json
{ "type": "model_unloaded" }
```
**Express.js receiver example:**
```typescript
import express from 'express';
const app = express();
app.use(express.json());
app.post('/webhooks/whisper', (req, res) => {
res.sendStatus(200); // acknowledge quickly — retries on non-2xx
const body = req.body;
if ('type' in body) {
// Model lifecycle event
if (body.type === 'model_ready') {
console.log('Whisper model ready at', body.loaded_at);
} else if (body.type === 'model_unloaded') {
console.log('Whisper model freed GPU memory');
}
return;
}
// Job completion event — body is a Job object
if (body.status === 'done') {
console.log(`Job ${body.id} done — ${body.segments.length} segments`);
processTranscript(body.segments);
} else if (body.status === 'failed') {
console.error(`Job ${body.id} failed:`, body.error);
}
});
```
> **Distinguish job vs. model webhook:** Job payloads have an `id` and `status` field. Model payloads have a `type` field at the top level (`model_ready` / `model_unloaded`).
---
## 7. Health Check
```bash
curl http://your-server:8080/health
```
```json
{
"status": "ok",
"gpu_name": "NVIDIA GeForce RTX 2080",
"vram_total_mb": 8192,
"model": "large-v3",
"queue_depth": 2,
"model_state": "ready"
}
```
| Field | Notes |
|-------|-------|
| `status` | Always `"ok"` when the server is reachable |
| `gpu_name` | From `nvidia-smi`; `null` if unavailable |
| `vram_total_mb` | Total VRAM in MiB; `null` if unavailable |
| `model` | Model name string (server config) |
| `queue_depth` | Jobs waiting (not counting the currently running one) |
| `model_state` | `"unloaded"` / `"loading"` / `"waiting_for_gpu"` / `"ready"` |
---
## 8. Cancelling Jobs
```bash
curl -X DELETE http://your-server:8080/jobs/550e8400-e29b-41d4-a716-446655440000
```
- `200` — job marked `cancelled`. Returns the updated `Job` object.
- `404` — job not found.
- `409` — job already in a terminal state (`done` / `failed` / `cancelled`).
> **Important:** whisper.cpp does not support mid-inference cancellation. If the job is currently `running`, the GPU inference will finish before the cancellation takes effect — the result is simply discarded and the status set to `cancelled`.
---
## 9. TypeScript Types
```typescript
type ModelStateTag = 'unloaded' | 'loading' | 'waiting_for_gpu' | 'ready';
type JobStatus = 'queued' | 'running' | 'done' | 'failed' | 'cancelled';
type Task = 'transcribe' | 'translate';
interface ModelStatus {
state: ModelStateTag;
// ready only
loaded_at?: string;
// waiting_for_gpu only
vram_needed_mb?: number;
vram_free_mb?: number;
retry_in_secs?: number;
// always (when nvidia-smi available)
vram_used_mb?: number;
vram_total_mb?: number;
}
interface Word {
text: string;
start: number; // seconds
end: number; // seconds
probability: number; // 01
}
interface Segment {
index: number;
start: number; // seconds
end: number; // seconds
text: string;
words: Word[];
}
interface Job {
id: string;
status: JobStatus;
task: Task;
language?: string; // ISO 639-1; null until detected/set
progress: number; // 0100
duration_secs?: number; // null until processing starts
segments: Segment[]; // populated when status = 'done'
error?: string; // populated when status = 'failed'
webhook_url?: string;
filename?: string;
created_at: string; // ISO 8601
completed_at?: string; // ISO 8601; null until terminal
}
// SSE payloads from GET /jobs/:id/stream
type JobSseEvent =
| { type: 'progress'; percent: number; chunk: number; chunks_total: number }
| { type: 'done'; job: Job }
| { type: 'error'; message: string };
// SSE payloads from GET /model/events
type ModelSseEvent =
| { type: 'model_loading' }
| { type: 'model_ready'; loaded_at: string }
| { type: 'model_unloaded' }
| { type: 'model_waiting_for_gpu'; vram_needed_mb: number; vram_free_mb: number; retry_in_secs: number };
// Webhook payload — union of job completion and model lifecycle events
type WebhookPayload = Job | { type: 'model_ready'; loaded_at: string } | { type: 'model_unloaded' };
// Helpers
function isJobPayload(p: WebhookPayload): p is Job {
return 'id' in p && 'status' in p;
}
function isModelPayload(p: WebhookPayload): p is { type: string } {
return 'type' in p;
}
```
---
## 10. React Hooks
```typescript
// useModelStatus.ts
import { useEffect, useState } from 'react';
const BASE = process.env.NEXT_PUBLIC_WHISPER_BASE_URL ?? '';
export function useModelStatus() {
const [status, setStatus] = useState<ModelStatus | null>(null);
// Initial fetch
useEffect(() => {
fetch(`${BASE}/model/status`)
.then(r => r.json())
.then(setStatus)
.catch(console.error);
}, []);
// Live updates via SSE
useEffect(() => {
const es = new EventSource(`${BASE}/model/events`);
const refresh = () => {
fetch(`${BASE}/model/status`)
.then(r => r.json())
.then(setStatus)
.catch(console.error);
};
es.addEventListener('model_loading', refresh);
es.addEventListener('model_ready', refresh);
es.addEventListener('model_unloaded', refresh);
es.addEventListener('model_waiting_for_gpu',refresh);
es.onerror = () => console.warn('model/events reconnecting…');
return () => es.close();
}, []);
return status;
}
```
```typescript
// useJobStream.ts
import { useEffect, useRef, useState } from 'react';
type ProgressState = {
percent: number;
chunk: number;
chunks_total: number;
};
export function useJobStream(jobId: string | null) {
const [progress, setProgress] = useState<ProgressState | null>(null);
const [job, setJob] = useState<Job | null>(null);
const [error, setError] = useState<string | null>(null);
const esRef = useRef<EventSource | null>(null);
useEffect(() => {
if (!jobId) return;
esRef.current?.close();
setProgress(null); setJob(null); setError(null);
const es = new EventSource(`${BASE}/jobs/${jobId}/stream`);
esRef.current = es;
es.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data));
});
es.addEventListener('done', (e) => {
setJob(JSON.parse(e.data).job);
setProgress({ percent: 100, chunk: 0, chunks_total: 0 });
es.close();
});
es.addEventListener('error', (e) => {
if ('data' in e) setError(JSON.parse((e as MessageEvent).data).message);
es.close();
});
return () => es.close();
}, [jobId]);
return { progress, job, error };
}
```
```typescript
// useTranscribe.ts — ties it all together
import { useState, useCallback } from 'react';
export function useTranscribe() {
const [jobId, setJobId] = useState<string | null>(null);
const [loading, setLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const submit = useCallback(async (
audio: Blob,
opts: { language?: string; task?: Task } = {}
) => {
setLoading(true);
setError(null);
setJobId(null);
try {
const id = await submitWithRetry(audio, opts); // see §4.3
setJobId(id);
} catch (e) {
setError(String(e));
} finally {
setLoading(false);
}
}, []);
const { progress, job, error: streamError } = useJobStream(jobId);
return { submit, loading, jobId, progress, job, error: error ?? streamError };
}
```
---
## 11. Complete Integration Example
A full transcription flow with model warm-up indicator and real-time progress:
```typescript
// whisperClient.ts
const BASE = process.env.NEXT_PUBLIC_WHISPER_BASE_URL ?? '';
export class WhisperClient {
/** Wait for the model to be ready, triggering a load if needed. */
async ensureModelReady(timeoutMs = 120_000): Promise<void> {
const status = await this.getModelStatus();
if (status.state === 'ready') return;
// Trigger load (idempotent)
await fetch(`${BASE}/model/load`, { method: 'POST' });
return new Promise((resolve, reject) => {
const deadline = setTimeout(() => {
es.close();
reject(new Error('Model did not become ready within timeout'));
}, timeoutMs);
const es = new EventSource(`${BASE}/model/events`);
es.addEventListener('model_ready', () => {
clearTimeout(deadline);
es.close();
resolve();
});
es.onerror = () => {
// Reconnects automatically; don't reject on transient drops.
};
});
}
async getModelStatus(): Promise<ModelStatus> {
const r = await fetch(`${BASE}/model/status`);
if (!r.ok) throw new Error(`/model/status ${r.status}`);
return r.json();
}
async submit(
audio: Blob,
opts: { language?: string; task?: Task; webhookUrl?: string } = {}
): Promise<string> {
return submitWithRetry(audio, opts);
}
streamProgress(
jobId: string,
callbacks: {
onProgress?: (p: { percent: number; chunk: number; total: number }) => void;
onDone?: (job: Job) => void;
onError?: (msg: string) => void;
}
): () => void {
const es = new EventSource(`${BASE}/jobs/${jobId}/stream`);
es.addEventListener('progress', (e) => {
const d = JSON.parse(e.data);
callbacks.onProgress?.({ percent: d.percent, chunk: d.chunk, total: d.chunks_total });
});
es.addEventListener('done', (e) => {
callbacks.onDone?.(JSON.parse(e.data).job);
es.close();
});
es.addEventListener('error', (e) => {
if ('data' in e) callbacks.onError?.(JSON.parse((e as MessageEvent).data).message);
es.close();
});
return () => es.close();
}
async transcribe(
audio: Blob,
opts: {
language?: string;
task?: Task;
webhookUrl?: string;
onProgress?: (percent: number) => void;
} = {}
): Promise<Job> {
const jobId = await this.submit(audio, opts);
return new Promise((resolve, reject) => {
this.streamProgress(jobId, {
onProgress: (p) => opts.onProgress?.(p.percent),
onDone: resolve,
onError: (msg) => reject(new Error(msg)),
});
});
}
}
// Usage
const whisper = new WhisperClient();
const job = await whisper.transcribe(audioBlob, {
language: 'en',
onProgress: (pct) => console.log(`${pct}%`),
});
for (const seg of job.segments) {
console.log(`[${seg.start.toFixed(1)}s → ${seg.end.toFixed(1)}s] ${seg.text}`);
}
```
---
## 12. Error Reference
All error responses follow this shape:
```json
{ "error": "human-readable message" }
```
With the following additions for specific errors:
**503 model_not_ready:**
```json
{ "error": "model_not_ready", "state": "loading", "retry_after_secs": 10 }
```
| HTTP | `error` value | When | What to do |
|------|--------------|------|-----------|
| 400 | `"missing 'audio' field"` | `audio` not in form | Fix the form |
| 400 | `"audio field is empty"` | Zero-byte file uploaded | Fix the file |
| 400 | `"task must be 'transcribe' or 'translate'"` | Bad `task` value | Fix the value |
| 400 | `"multipart error: …"` | Malformed request | Check content-type header |
| 404 | `"job … not found"` | Unknown job ID | Check the ID |
| 409 | `"job … is already in terminal state …"` | Cancelling a finished job | No action needed |
| 503 | `"model_not_ready"` | Model not loaded | See §4.2 — retry with `Retry-After` |
| 500 | `"worker channel closed"` | Server crash | Contact server admin |
**Network / SSE errors:**
- `EventSource` `onerror` with no `.data` = connection dropped. The browser reconnects automatically — no action needed unless you want to show a UI indicator.
- HTTP 502/503/504 from a reverse proxy = the container is restarting. Wait and retry.
---
*Last updated: 2026-05-08. Corresponds to whisper-server v0.1.0 commit `d014826`.*

View File

@@ -66,6 +66,8 @@ The bundled `docker-compose.yml` mounts named volumes for data and models and se
| `WHISPER_MODEL_PATH` | `/models/ggml-large-v3.bin` | Absolute path to GGML model file |
| `WHISPER_MODEL` | `large-v3` | Model name reported by `/health` (display only) |
| `CUDA_DEVICE` | `0` | CUDA device index to use for inference |
| `IDLE_TIMEOUT_SECS` | `300` | Seconds of idle before the model is automatically unloaded from GPU memory. Set to `0` to disable auto-unload. |
| `GPU_POLL_INTERVAL_SECS` | `30` | Seconds between VRAM-availability retries when a load fails due to insufficient VRAM. |
### Note on CUDA device ordering
Inside Docker, device ordering matches `nvidia-smi` (PCI bus order). On the host without Docker, ordering may differ. See [FINDINGS.md](FINDINGS.md#cuda-device-index-ordering-differs-between-host-and-docker) for details.
@@ -76,6 +78,194 @@ Inside Docker, device ordering matches `nvidia-smi` (PCI bus order). On the host
The interactive Swagger UI is available at `http://localhost:8080/docs`.
---
## Model Lifecycle Management
The model starts **unloaded** on startup (lazy loading). It is loaded into GPU memory on the first job submission or via `POST /model/load`, and automatically unloaded after `IDLE_TIMEOUT_SECS` of inactivity.
### Model State Machine
```
Unloaded ──(job / POST /model/load)──► Loading ──(success)──► Ready
└──(VRAM full)──► WaitingForGpu ──(retry OK)──► Loading
Ready ──(idle timeout / POST /model/unload)──► Unloaded
WaitingForGpu ──(POST /model/unload)──► Unloaded
```
### `GET /model/status`
Returns the current model state and VRAM statistics.
```bash
curl http://localhost:8080/model/status
```
**When unloaded:**
```json
{ "state": "unloaded" }
```
**When loading:**
```json
{ "state": "loading" }
```
**When ready:**
```json
{
"state": "ready",
"loaded_at": "2026-05-10T14:00:00Z",
"vram_used_mb": 4096,
"vram_total_mb": 8192
}
```
**When waiting for VRAM:**
```json
{
"state": "waiting_for_gpu",
"vram_needed_mb": 3951,
"vram_free_mb": 512,
"retry_in_secs": 30
}
```
---
### `POST /model/load`
Request the model to be loaded. Idempotent — if already loading or ready, returns immediately.
```bash
curl -X POST http://localhost:8080/model/load
```
- Returns `202 Accepted` with `{"status":"load_initiated"}` when load is triggered
- Returns `200 OK` with `{"status":"already_ready"}` when model is already ready
- Poll `GET /model/status` or subscribe to `GET /model/events` to know when ready
---
### `POST /model/unload`
Unload the model from GPU memory immediately, freeing VRAM.
```bash
curl -X POST http://localhost:8080/model/unload
```
Returns `200 OK` regardless of current state.
---
### `GET /model/events` — Model SSE stream
Subscribe to model lifecycle events via Server-Sent Events.
```bash
curl -N http://localhost:8080/model/events
```
**Event types:**
```
event: model_loading
data: {"type":"model_loading"}
event: model_ready
data: {"type":"model_ready","loaded_at":"2026-05-10T14:00:00Z"}
event: model_unloaded
data: {"type":"model_unloaded"}
event: model_waiting_for_gpu
data: {"type":"model_waiting_for_gpu","vram_needed_mb":3951,"vram_free_mb":512,"retry_in_secs":30}
```
**JavaScript example:**
```javascript
const es = new EventSource('/model/events');
es.addEventListener('model_ready', () => {
console.log('Model loaded — ready to transcribe');
});
es.addEventListener('model_unloaded', () => {
console.log('Model freed GPU memory');
});
```
---
### Webhooks for model events
When any job is submitted with a `webhook_url`, that URL is registered to receive model lifecycle webhooks for the lifetime of the server process. The following events trigger a webhook POST:
| Event | Fired when |
|-------|-----------|
| `model_ready` | Model finishes loading (after GPU warmup) |
| `model_unloaded` | Model is freed from GPU memory |
**Webhook payload** (`Content-Type: application/json`):
```json
{ "type": "model_ready", "loaded_at": "2026-05-10T14:00:00Z" }
{ "type": "model_unloaded" }
```
Delivery is attempted up to 3 times with exponential backoff (1s, 2s).
---
### Handling 503 Model Not Ready
When you submit a job and the model is not yet loaded, you receive `503 Service Unavailable` with a `Retry-After` header:
```
HTTP/1.1 503 Service Unavailable
Retry-After: 30
Content-Type: application/json
{
"error": "model_not_ready",
"state": "unloaded",
"retry_after_secs": 30
}
```
| State at rejection | `retry_after_secs` | Meaning |
|---|---|---|
| `unloaded` | 30 | Load was triggered; retry after ~30s |
| `loading` | 10 | Check again in 10s |
| `waiting_for_gpu` | `GPU_POLL_INTERVAL_SECS` | VRAM contention; retry later |
A job rejection when the model is `unloaded` **automatically triggers a load** — you do not need to call `POST /model/load` separately.
**Recommended client pattern:**
```javascript
async function submitWithRetry(formData, maxAttempts = 10) {
for (let i = 0; i < maxAttempts; i++) {
const resp = await fetch('/jobs', { method: 'POST', body: formData });
if (resp.ok) return resp.json();
if (resp.status === 503) {
const retryAfter = parseInt(resp.headers.get('Retry-After') ?? '30');
const body = await resp.json();
console.log(`Model ${body.state} — retrying in ${retryAfter}s`);
await new Promise(r => setTimeout(r, retryAfter * 1000));
continue;
}
throw new Error(`Submit failed: ${resp.status}`);
}
throw new Error('Gave up after max attempts');
}
```
---
## API Reference
The interactive Swagger UI is available at `http://localhost:8080/docs`.
### `POST /jobs` — Submit a transcription job
Accepts a multipart/form-data body.
@@ -249,11 +439,12 @@ curl http://localhost:8080/health
"gpu_name": "NVIDIA GeForce RTX 2080",
"vram_total_mb": 8192,
"model": "large-v3",
"queue_depth": 0
"queue_depth": 0,
"model_state": "ready"
}
```
`queue_depth` is the number of jobs waiting to be processed (not counting the one currently running).
`queue_depth` is the number of jobs waiting to be processed (not counting the one currently running). `model_state` reflects the current lifecycle state (`unloaded`, `loading`, `waiting_for_gpu`, `ready`).
---
@@ -340,6 +531,11 @@ curl -X POST http://localhost:8080/jobs \
## Troubleshooting
### Server returns `503 model_not_ready`
- The model starts unloaded. Call `POST /model/load` explicitly, or just retry the job submission — rejection automatically triggers a load.
- If state is `waiting_for_gpu`, another process is using the GPU's VRAM. The server will retry automatically every `GPU_POLL_INTERVAL_SECS` seconds.
- Monitor `GET /model/status` or subscribe to `GET /model/events` to know when the model is ready.
### Server returns 0 segments
- Check that you are **not** setting `language` to an empty string — omit the field entirely for auto-detection
- Verify the audio file is not corrupted: `ffprobe audio.mp3`

37
run_tests.sh Executable file
View File

@@ -0,0 +1,37 @@
#!/usr/bin/env bash
# run_tests.sh — Run the unit test suite inside Docker (no GPU required)
#
# Uses the `tester` Docker stage which:
# 1. Builds the release binary (or reuses cached build)
# 2. Symlinks the CUDA stubs so libcuda.so.1 is satisfied without a driver
# 3. Runs `cargo test --release`
#
# Usage:
# ./run_tests.sh # run all unit tests
# ./run_tests.sh models # run only tests matching "models"
# CUDA_VERSION=12.1.0 ./run_tests.sh
set -euo pipefail
CUDA_VERSION=${CUDA_VERSION:-12.4.1}
UBUNTU_VERSION=${UBUNTU_VERSION:-22.04}
TEST_FILTER=${1:-}
echo "==> Building tester stage (CUDA ${CUDA_VERSION} / Ubuntu ${UBUNTU_VERSION})..."
docker build \
--target tester \
--build-arg CUDA_VERSION="${CUDA_VERSION}" \
--build-arg UBUNTU_VERSION="${UBUNTU_VERSION}" \
--tag whisper-tester:local \
.
if [[ -n "${TEST_FILTER}" ]]; then
echo "==> Running tests matching '${TEST_FILTER}'..."
docker run --rm \
-e LD_LIBRARY_PATH=/usr/local/cuda/lib64/stubs \
whisper-tester:local \
sh -c "cd /build && ln -sf /usr/local/cuda/lib64/stubs/libcuda.so /usr/local/cuda/lib64/stubs/libcuda.so.1 && LD_LIBRARY_PATH=/usr/local/cuda/lib64/stubs cargo test --release '${TEST_FILTER}'"
else
echo "==> All tests ran during docker build (tester stage)."
echo " Build succeeded — all tests passed."
fi

View File

@@ -1,10 +1,10 @@
use thiserror::Error;
use axum::{
http::StatusCode,
http::{header, HeaderValue, StatusCode},
response::{IntoResponse, Response},
Json,
};
use serde_json::json;
use thiserror::Error;
pub type Result<T> = std::result::Result<T, AppError>;
@@ -21,19 +21,156 @@ pub enum AppError {
#[error("internal error: {0}")]
Internal(String),
/// Returned when `whisper_init_state` or `cudaMalloc` fails due to
/// insufficient VRAM. The worker uses this to distinguish a recoverable
/// VRAM-pressure failure from a hard internal error.
#[error("out of GPU memory: {0}")]
OutOfMemory(String),
/// Returned when a job is submitted but the model is not yet loaded.
/// Carries the current state tag and recommended Retry-After seconds.
#[error("model not ready: {state}")]
ModelNotReady {
state: String,
retry_after_secs: u64,
},
}
impl AppError {
/// Returns true if the error string contains patterns emitted by
/// whisper.cpp / GGML when a CUDA memory allocation fails.
pub fn is_oom(msg: &str) -> bool {
msg.contains("cudaMalloc failed")
|| msg.contains("out of memory")
|| msg.contains("CUDA error: out of memory")
|| msg.contains("alloc_buffer")
}
}
impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, message) = match &self {
AppError::NotFound(m) => (StatusCode::NOT_FOUND, m.clone()),
AppError::BadRequest(m) => (StatusCode::BAD_REQUEST, m.clone()),
AppError::Conflict(m) => (StatusCode::CONFLICT, m.clone()),
AppError::Internal(m) => (StatusCode::INTERNAL_SERVER_ERROR, m.clone()),
};
tracing::error!(status = status.as_u16(), error = %message);
(status, Json(json!({ "error": message }))).into_response()
match self {
AppError::NotFound(m) => {
(StatusCode::NOT_FOUND, Json(json!({ "error": m }))).into_response()
}
AppError::BadRequest(m) => {
(StatusCode::BAD_REQUEST, Json(json!({ "error": m }))).into_response()
}
AppError::Conflict(m) => {
(StatusCode::CONFLICT, Json(json!({ "error": m }))).into_response()
}
AppError::Internal(m) => {
tracing::error!(error = %m, "internal error");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": m })),
)
.into_response()
}
AppError::OutOfMemory(m) => {
tracing::warn!(error = %m, "GPU out of memory during model load");
(StatusCode::SERVICE_UNAVAILABLE, Json(json!({ "error": m }))).into_response()
}
AppError::ModelNotReady {
state,
retry_after_secs,
} => {
let body = Json(json!({
"error": "model_not_ready",
"state": state,
"retry_after_secs": retry_after_secs,
}));
let mut resp = (StatusCode::SERVICE_UNAVAILABLE, body).into_response();
resp.headers_mut().insert(
header::RETRY_AFTER,
HeaderValue::from_str(&retry_after_secs.to_string())
.unwrap_or(HeaderValue::from_static("30")),
);
resp
}
}
}
}
// ── Unit tests ───────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use axum::body::to_bytes;
#[test]
fn test_is_oom_cuda_malloc() {
assert!(AppError::is_oom("cudaMalloc failed: out of memory"));
}
#[test]
fn test_is_oom_alloc_buffer() {
// Exact message from ggml_backend_cuda_buffer_type_alloc_buffer
assert!(AppError::is_oom(
"ggml_backend_cuda_buffer_type_alloc_buffer: allocating 2951.01 MiB on device 0: cudaMalloc failed: out of memory"
));
}
#[test]
fn test_is_oom_generic_out_of_memory() {
assert!(AppError::is_oom("CUDA error: out of memory"));
}
#[test]
fn test_is_oom_other_error() {
assert!(!AppError::is_oom("failed to open model file"));
assert!(!AppError::is_oom("invalid model format"));
assert!(!AppError::is_oom(""));
}
#[tokio::test]
async fn test_model_not_ready_response_has_retry_after_header() {
let err = AppError::ModelNotReady {
state: "loading".into(),
retry_after_secs: 10,
};
let resp = err.into_response();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
let retry_after = resp
.headers()
.get(header::RETRY_AFTER)
.expect("Retry-After header missing");
assert_eq!(retry_after, "10");
}
#[tokio::test]
async fn test_model_not_ready_response_body() {
let err = AppError::ModelNotReady {
state: "unloaded".into(),
retry_after_secs: 30,
};
let resp = err.into_response();
let bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
let v: serde_json::Value = serde_json::from_slice(&bytes).unwrap();
assert_eq!(v["error"], "model_not_ready");
assert_eq!(v["state"], "unloaded");
assert_eq!(v["retry_after_secs"], 30);
}
#[tokio::test]
async fn test_model_not_ready_loading_retry_after_10() {
let err = AppError::ModelNotReady {
state: "loading".into(),
retry_after_secs: 10,
};
let resp = err.into_response();
assert_eq!(resp.headers().get(header::RETRY_AFTER).unwrap(), "10");
}
#[tokio::test]
async fn test_model_not_ready_unloaded_retry_after_30() {
let err = AppError::ModelNotReady {
state: "unloaded".into(),
retry_after_secs: 30,
};
let resp = err.into_response();
assert_eq!(resp.headers().get(header::RETRY_AFTER).unwrap(), "30");
}
}

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use axum::Router;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, mpsc, RwLock};
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use utoipa::OpenApi;
@@ -21,8 +21,10 @@ pub use error::{AppError, Result};
#[derive(Clone)]
pub struct AppState {
/// Channel to submit jobs to the single GPU worker.
/// Channel to submit jobs to the single GPU worker (job IDs only).
pub job_tx: mpsc::UnboundedSender<models::JobId>,
/// Channel to send control commands to the worker OS thread.
pub cmd_tx: std::sync::mpsc::SyncSender<worker::WorkerCmd>,
/// Shared handle to the on-disk job store.
pub storage: Arc<storage::Storage>,
/// SSE broadcast registry: job_id → sender.
@@ -33,6 +35,17 @@ pub struct AppState {
pub queue_depth: Arc<std::sync::atomic::AtomicUsize>,
/// CUDA device index used for inference.
pub gpu_device: u32,
/// Current state of the whisper model.
pub model_state: Arc<RwLock<models::ModelState>>,
/// Broadcast channel for model lifecycle events (SSE + webhooks).
pub model_event_tx: broadcast::Sender<models::ModelEvent>,
/// All webhook URLs ever registered via job submission.
/// Used to fire model_ready / model_unloaded notifications.
pub webhook_registry: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
/// How long the model stays loaded with no active jobs.
pub idle_timeout: std::time::Duration,
/// How often to retry loading when GPU is busy.
pub gpu_poll_interval: std::time::Duration,
}
// ── OpenAPI spec root ────────────────────────────────────────────────────────
@@ -50,6 +63,10 @@ pub struct AppState {
routes::jobs::stream_job,
routes::jobs::delete_job,
routes::health::health,
routes::model::model_status,
routes::model::model_load,
routes::model::model_unload,
routes::model::model_events,
),
components(schemas(
models::Job,
@@ -58,10 +75,14 @@ pub struct AppState {
models::Word,
models::SubmitResponse,
models::HealthResponse,
models::ModelState,
models::ModelEvent,
models::ModelStatusResponse,
)),
tags(
(name = "jobs", description = "Transcription job management"),
(name = "system", description = "Service health"),
(name = "model", description = "Model lifecycle management"),
)
)]
struct ApiDoc;
@@ -76,15 +97,29 @@ async fn main() -> anyhow::Result<()> {
.with(tracing_subscriber::fmt::layer().json())
.init();
let data_dir = std::env::var("DATA_DIR").unwrap_or_else(|_| "/data".into());
let model_path = std::env::var("WHISPER_MODEL_PATH")
.unwrap_or_else(|_| "/models/ggml-large-v3.bin".into());
let port = std::env::var("PORT").unwrap_or_else(|_| "8080".into());
let data_dir = std::env::var("DATA_DIR").unwrap_or_else(|_| "/data".into());
let model_path =
std::env::var("WHISPER_MODEL_PATH").unwrap_or_else(|_| "/models/ggml-large-v3.bin".into());
let port = std::env::var("PORT").unwrap_or_else(|_| "8080".into());
let model_name = std::env::var("WHISPER_MODEL").unwrap_or_else(|_| "large-v3".into());
let gpu_device: u32 = std::env::var("CUDA_DEVICE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let idle_timeout_secs: u64 = std::env::var("IDLE_TIMEOUT_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(300);
let gpu_poll_interval_secs: u64 = std::env::var("GPU_POLL_INTERVAL_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(30);
tracing::info!(
idle_timeout_secs,
gpu_poll_interval_secs,
"dynamic model loading configured"
);
let storage = Arc::new(storage::Storage::new(&data_dir).await?);
@@ -94,28 +129,47 @@ async fn main() -> anyhow::Result<()> {
let (job_tx, job_rx) = mpsc::unbounded_channel::<models::JobId>();
let queue_depth = Arc::new(std::sync::atomic::AtomicUsize::new(0));
// Spawn single GPU worker; get back the SSE broadcast registry.
let progress = worker::start(
// Model starts unloaded — lazy load on first job or POST /model/load.
let model_state = Arc::new(RwLock::new(models::ModelState::Unloaded));
let (model_event_tx, _) = broadcast::channel::<models::ModelEvent>(32);
let webhook_registry = Arc::new(std::sync::Mutex::new(
std::collections::HashSet::<String>::new(),
));
// Spawn single GPU worker; get back the SSE broadcast registry and cmd channel.
let (progress, cmd_tx) = worker::start(
job_rx,
Arc::clone(&storage),
model_path.clone().into(),
Arc::clone(&queue_depth),
gpu_device,
Arc::clone(&model_state),
model_event_tx.clone(),
Arc::clone(&webhook_registry),
std::time::Duration::from_secs(idle_timeout_secs),
std::time::Duration::from_secs(gpu_poll_interval_secs),
);
let state = AppState {
job_tx,
cmd_tx,
storage: Arc::clone(&storage),
progress,
model_name: model_name.as_str().into(),
model_name: model_name.as_str().into(),
queue_depth: Arc::clone(&queue_depth),
gpu_device,
model_state,
model_event_tx,
webhook_registry,
idle_timeout: std::time::Duration::from_secs(idle_timeout_secs),
gpu_poll_interval: std::time::Duration::from_secs(gpu_poll_interval_secs),
};
let app = Router::new()
.merge(SwaggerUi::new("/docs").url("/openapi.json", ApiDoc::openapi()))
.merge(routes::jobs_router())
.merge(routes::health_router())
.merge(routes::model_router())
.with_state(state)
.layer(CorsLayer::permissive())
.layer(TraceLayer::new_for_http());

View File

@@ -5,6 +5,117 @@ use uuid::Uuid;
pub type JobId = Uuid;
// ── Model lifecycle state ────────────────────────────────────────────────────
/// Current state of the whisper model in memory.
///
/// State machine:
/// ```
/// Unloaded ──(load trigger)──► Loading ──(ok)──► Ready ──(idle/unload)──► Unloaded
/// └──(VRAM full)──► WaitingForGpu ──(retry)──► Loading
/// WaitingForGpu ──(unload cmd)──► Unloaded
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum ModelState {
/// Model is not in memory. GPU is free.
Unloaded,
/// Model is being loaded (weights transferred to GPU).
Loading,
/// A previous load attempt failed due to insufficient VRAM. The worker is
/// polling at `retry_in_secs` intervals until enough memory is available.
WaitingForGpu {
/// VRAM required to load the model, in MiB.
vram_needed_mb: u64,
/// VRAM currently free on the device, in MiB.
vram_free_mb: u64,
/// How many seconds until the next load attempt.
retry_in_secs: u64,
},
/// Model is loaded and ready to accept inference jobs.
Ready {
/// UTC timestamp of when the model finished loading (post-warmup).
loaded_at: DateTime<Utc>,
},
}
impl ModelState {
/// Returns true if the model can accept inference jobs right now.
pub fn is_ready(&self) -> bool {
matches!(self, ModelState::Ready { .. })
}
/// Suggested `Retry-After` value (seconds) to include in 503 responses.
pub fn retry_after_secs(&self) -> u64 {
match self {
ModelState::Unloaded => 30, // conservative load estimate
ModelState::Loading => 10,
ModelState::WaitingForGpu { retry_in_secs, .. } => *retry_in_secs,
ModelState::Ready { .. } => 0, // shouldn't 503 if ready
}
}
/// String tag for use in error response bodies and log fields.
pub fn tag(&self) -> &'static str {
match self {
ModelState::Unloaded => "unloaded",
ModelState::Loading => "loading",
ModelState::WaitingForGpu { .. } => "waiting_for_gpu",
ModelState::Ready { .. } => "ready",
}
}
}
// ── Model events (SSE + webhooks) ────────────────────────────────────────────
/// Events broadcast over the `GET /model/events` SSE stream and fired as
/// webhooks to registered clients.
///
/// Webhook delivery: only `ModelReady` and `ModelUnloaded` are sent to
/// webhook URLs. All four are broadcast on the SSE stream.
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ModelEvent {
/// Model finished loading and the GPU warmup completed — ready to accept jobs.
ModelReady { loaded_at: DateTime<Utc> },
/// Model was unloaded from GPU memory (idle timeout or manual unload).
ModelUnloaded,
/// Model load initiated.
ModelLoading,
/// Load failed due to insufficient VRAM; retrying after `retry_in_secs`.
ModelWaitingForGpu {
vram_needed_mb: u64,
vram_free_mb: u64,
retry_in_secs: u64,
},
}
impl ModelEvent {
/// Returns true if this event should be delivered via webhook.
pub fn is_webhook_event(&self) -> bool {
matches!(
self,
ModelEvent::ModelReady { .. } | ModelEvent::ModelUnloaded
)
}
}
// ── Model status response ────────────────────────────────────────────────────
/// Response body for `GET /model/status`.
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ModelStatusResponse {
/// Current model state (flattened from `ModelState`).
#[serde(flatten)]
pub state: ModelState,
/// VRAM currently used on the device, in MiB (from nvidia-smi).
#[serde(skip_serializing_if = "Option::is_none")]
pub vram_used_mb: Option<u64>,
/// VRAM total on the device, in MiB.
#[serde(skip_serializing_if = "Option::is_none")]
pub vram_total_mb: Option<u64>,
}
// ── Job status ───────────────────────────────────────────────────────────────
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
@@ -22,11 +133,11 @@ pub enum JobStatus {
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct Word {
/// Word text
pub text: String,
pub text: String,
/// Start time in seconds
pub start: f32,
pub start: f32,
/// End time in seconds
pub end: f32,
pub end: f32,
/// Model confidence (01)
pub probability: f32,
}
@@ -38,9 +149,9 @@ pub struct Segment {
/// Start time in seconds
pub start: f32,
/// End time in seconds
pub end: f32,
pub end: f32,
/// Transcribed text
pub text: String,
pub text: String,
/// Token-level word timestamps (empty when flash_attn is enabled)
#[serde(default)]
pub words: Vec<Word>,
@@ -95,18 +206,23 @@ pub struct Job {
}
impl Job {
pub fn new(id: JobId, task: String, webhook_url: Option<String>, filename: Option<String>) -> Self {
pub fn new(
id: JobId,
task: String,
webhook_url: Option<String>,
filename: Option<String>,
) -> Self {
Self {
id,
status: JobStatus::Queued,
language: None,
status: JobStatus::Queued,
language: None,
task,
duration_secs: None,
segments: vec![],
error: None,
segments: vec![],
error: None,
webhook_url,
progress: 0,
created_at: Utc::now(),
progress: 0,
created_at: Utc::now(),
completed_at: None,
filename,
}
@@ -125,11 +241,13 @@ pub struct SubmitResponse {
/// Response from GET /health.
#[derive(Debug, Serialize, ToSchema)]
pub struct HealthResponse {
pub status: String,
pub gpu_name: Option<String>,
pub status: String,
pub gpu_name: Option<String>,
pub vram_total_mb: Option<u64>,
pub model: String,
pub queue_depth: usize,
pub model: String,
pub queue_depth: usize,
/// Current state of the whisper model.
pub model_state: String,
}
// ── SSE event payload ────────────────────────────────────────────────────────
@@ -145,6 +263,184 @@ pub enum SsePayload {
/// Total number of silence-split chunks in this job.
chunks_total: usize,
},
Done { job: Box<Job> },
Error { message: String },
Done {
job: Box<Job>,
},
Error {
message: String,
},
}
// ── Unit tests ───────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use serde_json::Value;
// ── ModelState serialization ─────────────────────────────────────────────
#[test]
fn test_model_state_unloaded_serializes() {
let v: Value = serde_json::to_value(ModelState::Unloaded).unwrap();
assert_eq!(v["state"], "unloaded");
}
#[test]
fn test_model_state_loading_serializes() {
let v: Value = serde_json::to_value(ModelState::Loading).unwrap();
assert_eq!(v["state"], "loading");
}
#[test]
fn test_model_state_waiting_serializes() {
let s = ModelState::WaitingForGpu {
vram_needed_mb: 3000,
vram_free_mb: 500,
retry_in_secs: 30,
};
let v: Value = serde_json::to_value(&s).unwrap();
assert_eq!(v["state"], "waiting_for_gpu");
assert_eq!(v["vram_needed_mb"], 3000);
assert_eq!(v["vram_free_mb"], 500);
assert_eq!(v["retry_in_secs"], 30);
}
#[test]
fn test_model_state_ready_serializes() {
let ts = Utc::now();
let s = ModelState::Ready { loaded_at: ts };
let v: Value = serde_json::to_value(&s).unwrap();
assert_eq!(v["state"], "ready");
assert!(v["loaded_at"].is_string());
}
#[test]
fn test_model_state_is_ready() {
assert!(!ModelState::Unloaded.is_ready());
assert!(!ModelState::Loading.is_ready());
assert!(!ModelState::WaitingForGpu {
vram_needed_mb: 0,
vram_free_mb: 0,
retry_in_secs: 30
}
.is_ready());
assert!(ModelState::Ready {
loaded_at: Utc::now()
}
.is_ready());
}
#[test]
fn test_retry_after_unloaded() {
assert_eq!(ModelState::Unloaded.retry_after_secs(), 30);
}
#[test]
fn test_retry_after_loading() {
assert_eq!(ModelState::Loading.retry_after_secs(), 10);
}
#[test]
fn test_retry_after_waiting_for_gpu() {
let s = ModelState::WaitingForGpu {
vram_needed_mb: 0,
vram_free_mb: 0,
retry_in_secs: 45,
};
assert_eq!(s.retry_after_secs(), 45);
}
#[test]
fn test_retry_after_ready_is_zero() {
assert_eq!(
ModelState::Ready {
loaded_at: Utc::now()
}
.retry_after_secs(),
0
);
}
// ── ModelEvent serialization ─────────────────────────────────────────────
#[test]
fn test_model_event_ready_serializes() {
let ts = Utc::now();
let e = ModelEvent::ModelReady { loaded_at: ts };
let v: Value = serde_json::to_value(&e).unwrap();
assert_eq!(v["type"], "model_ready");
assert!(v["loaded_at"].is_string());
}
#[test]
fn test_model_event_unloaded_serializes() {
let v: Value = serde_json::to_value(ModelEvent::ModelUnloaded).unwrap();
assert_eq!(v["type"], "model_unloaded");
}
#[test]
fn test_model_event_loading_serializes() {
let v: Value = serde_json::to_value(ModelEvent::ModelLoading).unwrap();
assert_eq!(v["type"], "model_loading");
}
#[test]
fn test_model_event_waiting_serializes() {
let e = ModelEvent::ModelWaitingForGpu {
vram_needed_mb: 3000,
vram_free_mb: 200,
retry_in_secs: 30,
};
let v: Value = serde_json::to_value(&e).unwrap();
assert_eq!(v["type"], "model_waiting_for_gpu");
assert_eq!(v["vram_needed_mb"], 3000);
}
#[test]
fn test_model_event_webhook_filter() {
assert!(ModelEvent::ModelReady {
loaded_at: Utc::now()
}
.is_webhook_event());
assert!(ModelEvent::ModelUnloaded.is_webhook_event());
assert!(!ModelEvent::ModelLoading.is_webhook_event());
assert!(!ModelEvent::ModelWaitingForGpu {
vram_needed_mb: 0,
vram_free_mb: 0,
retry_in_secs: 30
}
.is_webhook_event());
}
// ── ModelStatusResponse ──────────────────────────────────────────────────
#[test]
fn test_model_status_response_roundtrip() {
let r = ModelStatusResponse {
state: ModelState::Ready {
loaded_at: Utc::now(),
},
vram_used_mb: Some(4096),
vram_total_mb: Some(8192),
};
let json_str = serde_json::to_string(&r).unwrap();
let v: Value = serde_json::from_str(&json_str).unwrap();
assert_eq!(v["state"], "ready");
assert_eq!(v["vram_used_mb"], 4096);
assert_eq!(v["vram_total_mb"], 8192);
}
#[test]
fn test_model_status_response_omits_nulls() {
let r = ModelStatusResponse {
state: ModelState::Loading,
vram_used_mb: None,
vram_total_mb: None,
};
let v: Value = serde_json::to_value(&r).unwrap();
assert_eq!(v["state"], "loading");
assert!(v.get("vram_used_mb").is_none());
assert!(v.get("vram_total_mb").is_none());
}
}

View File

@@ -16,13 +16,15 @@ use crate::{models::HealthResponse, AppState, Result};
)]
pub async fn health(State(state): State<AppState>) -> Result<Json<HealthResponse>> {
let (gpu_name, vram_total_mb) = gpu_info(state.gpu_device);
let model_state_tag = state.model_state.read().await.tag().to_string();
Ok(Json(HealthResponse {
status: "ok".into(),
status: "ok".into(),
gpu_name,
vram_total_mb,
model: state.model_name.to_string(),
queue_depth: state.queue_depth.load(Ordering::Relaxed),
model: state.model_name.to_string(),
queue_depth: state.queue_depth.load(Ordering::Relaxed),
model_state: model_state_tag,
}))
}
@@ -48,9 +50,7 @@ fn gpu_info(device: u32) -> (Option<String>, Option<u64>) {
let mut parts = line.splitn(2, ',');
let name = parts.next().map(|s| s.trim().to_owned());
let vram = parts
.next()
.and_then(|s| s.trim().parse::<u64>().ok());
let vram = parts.next().and_then(|s| s.trim().parse::<u64>().ok());
(name, vram)
}

View File

@@ -19,11 +19,12 @@ use uuid::Uuid;
use crate::{
models::{Job, JobId, JobStatus, SubmitResponse},
worker::{audio_path_for, ProgressEvent},
worker::{audio_path_for, ProgressEvent, WorkerCmd},
AppError, AppState, Result,
};
type SseStream = Pin<Box<dyn Stream<Item = std::result::Result<Event, std::convert::Infallible>> + Send>>;
type SseStream =
Pin<Box<dyn Stream<Item = std::result::Result<Event, std::convert::Infallible>> + Send>>;
// ── POST /jobs ───────────────────────────────────────────────────────────────
@@ -53,18 +54,20 @@ pub async fn submit_job(
State(state): State<AppState>,
mut multipart: Multipart,
) -> Result<impl IntoResponse> {
let mut language: Option<String> = None;
let mut task: String = "transcribe".into();
let mut language: Option<String> = None;
let mut task: String = "transcribe".into();
let mut webhook_url: Option<String> = None;
let mut filename: Option<String> = None;
let mut filename: Option<String> = None;
let mut audio_saved = false;
// Assign ID early so we know where to stream the audio bytes.
let id = Uuid::new_v4();
let audio_path = audio_path_for(&id);
while let Some(field) = multipart.next_field().await.map_err(|e| {
AppError::BadRequest(format!("multipart error: {e}"))
})? {
while let Some(field) = multipart
.next_field()
.await
.map_err(|e| AppError::BadRequest(format!("multipart error: {e}")))?
{
let field_name = field.name().unwrap_or("").to_owned();
match field_name.as_str() {
@@ -77,9 +80,11 @@ pub async fn submit_job(
})?;
let mut bytes_written: u64 = 0;
let mut stream = field;
while let Some(chunk) = stream.chunk().await.map_err(|e| {
AppError::BadRequest(format!("failed to read audio field: {e}"))
})? {
while let Some(chunk) = stream
.chunk()
.await
.map_err(|e| AppError::BadRequest(format!("failed to read audio field: {e}")))?
{
file.write_all(&chunk).await.map_err(|e| {
AppError::Internal(format!("failed to write audio chunk: {e}"))
})?;
@@ -90,10 +95,29 @@ pub async fn submit_job(
}
audio_saved = true;
}
"language" => language = Some(field.text().await.map_err(|e| AppError::BadRequest(e.to_string()))?),
"task" => task = field.text().await.map_err(|e| AppError::BadRequest(e.to_string()))?,
"webhook_url" => webhook_url = Some(field.text().await.map_err(|e| AppError::BadRequest(e.to_string()))?),
_ => {} // ignore unknown fields
"language" => {
language = Some(
field
.text()
.await
.map_err(|e| AppError::BadRequest(e.to_string()))?,
)
}
"task" => {
task = field
.text()
.await
.map_err(|e| AppError::BadRequest(e.to_string()))?
}
"webhook_url" => {
webhook_url = Some(
field
.text()
.await
.map_err(|e| AppError::BadRequest(e.to_string()))?,
)
}
_ => {} // ignore unknown fields
}
}
@@ -107,18 +131,54 @@ pub async fn submit_job(
));
}
// Check model state before accepting the job.
let (model_ready, retry_after_secs, state_tag) = {
let ms = state.model_state.read().await;
let ready = ms.is_ready();
let retry = ms.retry_after_secs();
let tag = ms.tag().to_string();
(ready, retry, tag)
};
// Register the webhook URL regardless of model state — so model lifecycle
// events are delivered even if the job itself is rejected.
if let Some(url) = &webhook_url {
state
.webhook_registry
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(url.clone());
}
if !model_ready {
// Trigger a load if the model is simply unloaded (not already loading).
if state_tag == "unloaded" {
let _ = state.cmd_tx.try_send(WorkerCmd::Load);
}
// Clean up the audio file we already wrote to disk.
let _ = tokio::fs::remove_file(&audio_path).await;
return Err(AppError::ModelNotReady {
state: state_tag,
retry_after_secs,
});
}
let mut job = Job::new(id, task, webhook_url, filename);
job.language = language;
state.storage.create(&job).await?;
// Pre-create the broadcast channel so SSE subscribers don't miss events.
state.progress.entry(id).or_insert_with(|| broadcast::channel(64).0);
state
.progress
.entry(id)
.or_insert_with(|| broadcast::channel(64).0);
state.queue_depth.fetch_add(1, Ordering::Relaxed);
state.job_tx.send(id).map_err(|_| {
AppError::Internal("worker channel closed".into())
})?;
state
.job_tx
.send(id)
.map_err(|_| AppError::Internal("worker channel closed".into()))?;
tracing::info!(job_id = %id, "job queued");
@@ -138,10 +198,7 @@ pub async fn submit_job(
(status = 404, description = "Not found"),
)
)]
pub async fn get_job(
State(state): State<AppState>,
Path(id): Path<JobId>,
) -> Result<Json<Job>> {
pub async fn get_job(State(state): State<AppState>, Path(id): Path<JobId>) -> Result<Json<Job>> {
let job = state.storage.get(&id).await?;
Ok(Json(job))
}
@@ -166,15 +223,15 @@ pub async fn get_job(
)]
pub async fn stream_job(
State(state): State<AppState>,
Path(id): Path<JobId>,
Path(id): Path<JobId>,
) -> Result<Sse<SseStream>> {
// If the job is already finished, return a single done event immediately.
let job = state.storage.get(&id).await?;
match job.status {
JobStatus::Done | JobStatus::Failed | JobStatus::Cancelled => {
let payload = serde_json::to_string(
&crate::models::SsePayload::Done { job: Box::new(job) }
).unwrap_or_default();
let payload =
serde_json::to_string(&crate::models::SsePayload::Done { job: Box::new(job) })
.unwrap_or_default();
let s: SseStream = Box::pin(stream::once(async move {
Ok(Event::default().event("done").data(payload))
}));
@@ -192,22 +249,28 @@ pub async fn stream_job(
let sse_stream: SseStream = Box::pin(BroadcastStream::new(rx).filter_map(|msg| async move {
let event = match msg {
Ok(ProgressEvent::Progress { percent, chunk, total }) => {
let payload = serde_json::to_string(
&crate::models::SsePayload::Progress { percent, chunk, chunks_total: total }
).ok()?;
Ok(ProgressEvent::Progress {
percent,
chunk,
total,
}) => {
let payload = serde_json::to_string(&crate::models::SsePayload::Progress {
percent,
chunk,
chunks_total: total,
})
.ok()?;
Event::default().event("progress").data(payload)
}
Ok(ProgressEvent::Done(job)) => {
let payload = serde_json::to_string(
&crate::models::SsePayload::Done { job }
).ok()?;
let payload =
serde_json::to_string(&crate::models::SsePayload::Done { job }).ok()?;
Event::default().event("done").data(payload)
}
Ok(ProgressEvent::Error(msg)) => {
let payload = serde_json::to_string(
&crate::models::SsePayload::Error { message: msg }
).ok()?;
let payload =
serde_json::to_string(&crate::models::SsePayload::Error { message: msg })
.ok()?;
Event::default().event("error").data(payload)
}
Err(_) => return None, // lagged / channel closed
@@ -234,10 +297,7 @@ pub async fn stream_job(
(status = 409, description = "Job already finished"),
)
)]
pub async fn delete_job(
State(state): State<AppState>,
Path(id): Path<JobId>,
) -> Result<Json<Job>> {
pub async fn delete_job(State(state): State<AppState>, Path(id): Path<JobId>) -> Result<Json<Job>> {
let mut job = state.storage.get(&id).await?;
match job.status {
@@ -250,7 +310,7 @@ pub async fn delete_job(
_ => {}
}
job.status = JobStatus::Cancelled;
job.status = JobStatus::Cancelled;
job.completed_at = Some(Utc::now());
state.storage.save(&job).await?;

View File

@@ -1,19 +1,34 @@
pub mod health;
pub mod jobs;
pub mod model;
use axum::{extract::DefaultBodyLimit, routing::{delete, get, post}, Router};
use crate::AppState;
use axum::{
extract::DefaultBodyLimit,
routing::{delete, get, post},
Router,
};
pub fn jobs_router() -> Router<AppState> {
Router::new()
// No body limit on the upload route — files can be multiple GB.
.route("/jobs", post(jobs::submit_job).layer(DefaultBodyLimit::disable()))
.route("/jobs/:id", get(jobs::get_job))
.route(
"/jobs",
post(jobs::submit_job).layer(DefaultBodyLimit::disable()),
)
.route("/jobs/:id", get(jobs::get_job))
.route("/jobs/:id/stream", get(jobs::stream_job))
.route("/jobs/:id", delete(jobs::delete_job))
.route("/jobs/:id", delete(jobs::delete_job))
}
pub fn health_router() -> Router<AppState> {
Router::new()
.route("/health", get(health::health))
Router::new().route("/health", get(health::health))
}
pub fn model_router() -> Router<AppState> {
Router::new()
.route("/model/status", get(model::model_status))
.route("/model/load", post(model::model_load))
.route("/model/unload", post(model::model_unload))
.route("/model/events", get(model::model_events))
}

244
src/routes/model.rs Normal file
View File

@@ -0,0 +1,244 @@
use std::pin::Pin;
use axum::{
extract::State,
http::StatusCode,
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse,
},
Json,
};
use futures::Stream;
use futures::StreamExt;
use tokio_stream::wrappers::BroadcastStream;
use crate::{
models::{ModelEvent, ModelStatusResponse},
worker::WorkerCmd,
AppState, Result,
};
type SseStream =
Pin<Box<dyn Stream<Item = std::result::Result<Event, std::convert::Infallible>> + Send>>;
// ── GET /model/status ────────────────────────────────────────────────────────
/// Return the current model state and VRAM statistics.
#[utoipa::path(
get,
path = "/model/status",
tag = "model",
responses(
(status = 200, description = "Model status", body = ModelStatusResponse),
)
)]
pub async fn model_status(State(state): State<AppState>) -> Result<Json<ModelStatusResponse>> {
let model_state = state.model_state.read().await.clone();
let (vram_used_mb, vram_total_mb) = vram_stats(state.gpu_device);
Ok(Json(ModelStatusResponse {
state: model_state,
vram_used_mb,
vram_total_mb,
}))
}
// ── POST /model/load ─────────────────────────────────────────────────────────
/// Request the model to be loaded into GPU memory.
/// Idempotent: if the model is already loading or ready, this is a no-op.
/// Returns 202 Accepted; poll `GET /model/status` or subscribe to
/// `GET /model/events` to know when it is ready.
#[utoipa::path(
post,
path = "/model/load",
tag = "model",
responses(
(status = 202, description = "Load initiated or already in progress"),
(status = 200, description = "Model already ready"),
)
)]
pub async fn model_load(State(state): State<AppState>) -> impl IntoResponse {
let is_ready = state.model_state.read().await.is_ready();
if is_ready {
return (
StatusCode::OK,
Json(serde_json::json!({"status": "already_ready"})),
);
}
// Ignore send errors (channel full = load already in progress).
let _ = state.cmd_tx.try_send(WorkerCmd::Load);
(
StatusCode::ACCEPTED,
Json(serde_json::json!({"status": "load_initiated"})),
)
}
// ── POST /model/unload ───────────────────────────────────────────────────────
/// Unload the model from GPU memory immediately.
/// Idempotent: if the model is already unloaded, returns 200 immediately.
#[utoipa::path(
post,
path = "/model/unload",
tag = "model",
responses(
(status = 200, description = "Model unloaded or was already unloaded"),
)
)]
pub async fn model_unload(State(state): State<AppState>) -> impl IntoResponse {
if !matches!(
*state.model_state.read().await,
crate::models::ModelState::Unloaded
) {
let _ = state.cmd_tx.try_send(WorkerCmd::Unload);
}
(
StatusCode::OK,
Json(serde_json::json!({"status": "unload_requested"})),
)
}
// ── GET /model/events ────────────────────────────────────────────────────────
/// Subscribe to model lifecycle events via Server-Sent Events.
///
/// Event types:
/// - `model_loading` — load initiated
/// - `model_ready` — model loaded and warmed up
/// - `model_unloaded` — model freed from GPU memory
/// - `model_waiting_for_gpu` — insufficient VRAM; retrying
#[utoipa::path(
get,
path = "/model/events",
tag = "model",
responses(
(status = 200, description = "SSE stream of model lifecycle events"),
)
)]
pub async fn model_events(State(state): State<AppState>) -> Sse<SseStream> {
let rx = state.model_event_tx.subscribe();
let stream: SseStream = Box::pin(BroadcastStream::new(rx).filter_map(|msg| async move {
match msg {
Ok(event) => {
let event_type = match &event {
ModelEvent::ModelReady { .. } => "model_ready",
ModelEvent::ModelUnloaded => "model_unloaded",
ModelEvent::ModelLoading => "model_loading",
ModelEvent::ModelWaitingForGpu { .. } => "model_waiting_for_gpu",
};
let data = serde_json::to_string(&event).ok()?;
Some(Ok(Event::default().event(event_type).data(data)))
}
Err(_) => None,
}
}));
Sse::new(stream).keep_alive(KeepAlive::default())
}
// ── Helpers ───────────────────────────────────────────────────────────────────
fn vram_stats(gpu_device: u32) -> (Option<u64>, Option<u64>) {
fn inner(gpu_device: u32) -> Option<(u64, u64)> {
let out = std::process::Command::new("nvidia-smi")
.args([
&format!("--id={gpu_device}"),
"--query-gpu=memory.used,memory.total",
"--format=csv,noheader,nounits",
])
.output()
.ok()?;
if !out.status.success() {
return None;
}
let line = String::from_utf8_lossy(&out.stdout);
let line = line.trim();
let mut parts = line.splitn(2, ',');
let used = parts.next().and_then(|s| s.trim().parse::<u64>().ok())?;
let total = parts.next().and_then(|s| s.trim().parse::<u64>().ok())?;
Some((used, total))
}
match inner(gpu_device) {
Some((u, t)) => (Some(u), Some(t)),
None => (None, None),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
use axum::response::IntoResponse;
use chrono::Utc;
use tempfile::tempdir;
use tokio::sync::{broadcast, mpsc, RwLock};
use crate::{models::ModelState, storage::Storage, worker::ProgressRegistry, AppState};
async fn test_state(
model_state: ModelState,
) -> (
AppState,
tempfile::TempDir,
std::sync::mpsc::Receiver<WorkerCmd>,
) {
let tmp = tempdir().expect("tempdir");
let storage = Arc::new(Storage::new(tmp.path()).await.expect("storage"));
let (job_tx, _job_rx) = mpsc::unbounded_channel();
let (cmd_tx, cmd_rx) = std::sync::mpsc::sync_channel(8);
let progress: ProgressRegistry = Arc::new(dashmap::DashMap::new());
let (model_event_tx, _) = broadcast::channel(8);
(
AppState {
job_tx,
cmd_tx,
storage,
progress,
model_name: "test".into(),
queue_depth: Arc::new(AtomicUsize::new(0)),
gpu_device: 0,
model_state: Arc::new(RwLock::new(model_state)),
model_event_tx,
webhook_registry: Arc::new(Mutex::new(Default::default())),
idle_timeout: std::time::Duration::from_secs(300),
gpu_poll_interval: std::time::Duration::from_secs(30),
},
tmp,
cmd_rx,
)
}
#[tokio::test]
async fn test_model_unload_skips_command_when_already_unloaded() {
let (state, _tmp, cmd_rx) = test_state(ModelState::Unloaded).await;
let response = model_unload(State(state)).await.into_response();
assert_eq!(response.status(), StatusCode::OK);
assert!(
cmd_rx.try_recv().is_err(),
"unexpected unload command queued"
);
}
#[tokio::test]
async fn test_model_unload_queues_command_when_ready() {
let (state, _tmp, cmd_rx) = test_state(ModelState::Ready {
loaded_at: Utc::now(),
})
.await;
let response = model_unload(State(state)).await.into_response();
assert_eq!(response.status(), StatusCode::OK);
assert!(matches!(cmd_rx.try_recv(), Ok(WorkerCmd::Unload)));
}
}

View File

@@ -30,20 +30,20 @@ impl Storage {
// ── CRUD ─────────────────────────────────────────────────────────────────
pub async fn create(&self, job: &Job) -> Result<()> {
let path = self.job_path(&job.id);
let payload = serde_json::to_vec_pretty(job)
.map_err(|e| AppError::Internal(e.to_string()))?;
fs::write(&path, payload).await.map_err(|e| {
AppError::Internal(format!("failed to write job {}: {e}", job.id))
})?;
let path = self.job_path(&job.id);
let payload =
serde_json::to_vec_pretty(job).map_err(|e| AppError::Internal(e.to_string()))?;
fs::write(&path, payload)
.await
.map_err(|e| AppError::Internal(format!("failed to write job {}: {e}", job.id)))?;
Ok(())
}
pub async fn get(&self, id: &JobId) -> Result<Job> {
let path = self.job_path(id);
let raw = fs::read(&path).await.map_err(|_| {
AppError::NotFound(format!("job {id} not found"))
})?;
let raw = fs::read(&path)
.await
.map_err(|_| AppError::NotFound(format!("job {id} not found")))?;
serde_json::from_slice(&raw).map_err(|e| AppError::Internal(e.to_string()))
}
@@ -54,22 +54,24 @@ impl Storage {
pub async fn delete(&self, id: &JobId) -> Result<()> {
let path = self.job_path(id);
fs::remove_file(&path).await.map_err(|_| {
AppError::NotFound(format!("job {id} not found"))
})?;
fs::remove_file(&path)
.await
.map_err(|_| AppError::NotFound(format!("job {id} not found")))?;
Ok(())
}
/// List all job IDs present on disk.
pub async fn list_ids(&self) -> Result<Vec<JobId>> {
let mut entries = fs::read_dir(&self.dir).await.map_err(|e| {
AppError::Internal(format!("read_dir failed: {e}"))
})?;
let mut entries = fs::read_dir(&self.dir)
.await
.map_err(|e| AppError::Internal(format!("read_dir failed: {e}")))?;
let mut ids = Vec::new();
while let Some(entry) = entries.next_entry().await.map_err(|e| {
AppError::Internal(e.to_string())
})? {
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| AppError::Internal(e.to_string()))?
{
let name = entry.file_name();
let name = name.to_string_lossy();
if let Some(stem) = name.strip_suffix(".json") {
@@ -88,8 +90,8 @@ impl Storage {
if let Ok(mut job) = self.get(&id).await {
if job.status == JobStatus::Running {
tracing::warn!(job_id = %id, "recovering interrupted job → failed");
job.status = JobStatus::Failed;
job.error = Some("server restarted while job was running".into());
job.status = JobStatus::Failed;
job.error = Some("server restarted while job was running".into());
job.completed_at = Some(chrono::Utc::now());
let _ = self.save(&job).await;
}

View File

@@ -1,7 +1,7 @@
use std::path::Path;
use whisper_rs::{
FullParams, SamplingStrategy, WhisperContext, WhisperContextParameters,
FullParams, SamplingStrategy, WhisperContext, WhisperContextParameters, WhisperState,
};
use crate::{
@@ -9,18 +9,38 @@ use crate::{
AppError, Result,
};
/// Wraps a loaded whisper.cpp context.
/// `WhisperContext` is `Send` but **not** `Sync` — keep it on the worker thread.
/// Wraps a loaded whisper.cpp context and a single reusable inference state.
///
/// `WhisperState` allocates ~700 MB of GPU compute buffers (KV caches, CUDA
/// workspace) via `whisper_init_state`. Creating a new state for every chunk
/// causes repeated GPU re-initialisation and VRAM allocation churn, which
/// manifests as intermittent CUDA allocation failures → 0 segments returned.
///
/// By creating the state once at load time and reusing it, GPU memory is
/// stable and inference is reliable across all chunks.
///
/// Safety: `WhisperState` is `Send + Sync` (explicitly declared in whisper-rs).
/// This struct lives on the single `whisper-gpu` OS thread and is never shared.
pub struct Transcriber {
ctx: WhisperContext,
// WhisperContext is not stored after load: WhisperState holds its own
// Arc<WhisperInnerContext>, so the model weights remain in memory for
// the lifetime of the state even after the originating context is dropped.
state: WhisperState,
}
impl Transcriber {
/// Load a GGML model file and configure GPU for RTX 2080.
/// Load a GGML model file, configure GPU, and run a warmup inference.
///
/// The warmup is critical: CUDA JIT-compiles its kernels on the FIRST call to
/// `whisper_full_with_state`. Without warmup, the first real job triggers JIT
/// compilation mid-inference, which can cause the call to return in ~0.5s with
/// 0 segments. The warmup forces kernel compilation at startup so all subsequent
/// jobs run correctly from the very first request.
pub fn load(model_path: impl AsRef<Path>, gpu_device: u32) -> Result<Self> {
let path = model_path.as_ref().to_str().ok_or_else(|| {
AppError::Internal("model path is not valid UTF-8".into())
})?;
let path = model_path
.as_ref()
.to_str()
.ok_or_else(|| AppError::Internal("model path is not valid UTF-8".into()))?;
let mut params = WhisperContextParameters::new();
params.use_gpu(true);
@@ -29,28 +49,61 @@ impl Transcriber {
// real-world audio (conference recordings, noisy MP3s).
// params.flash_attn(true);
let ctx = WhisperContext::new_with_params(path, params)
.map_err(|e| AppError::Internal(format!("failed to load model: {e}")))?;
let ctx = WhisperContext::new_with_params(path, params).map_err(|e| {
let msg = format!("failed to load model: {e}");
if AppError::is_oom(&msg) {
AppError::OutOfMemory(msg)
} else {
AppError::Internal(msg)
}
})?;
tracing::info!(model = path, "whisper model loaded");
Ok(Self { ctx })
let mut state = ctx.create_state().map_err(|e| {
let msg = format!("failed to create whisper state: {e}");
if AppError::is_oom(&msg) {
AppError::OutOfMemory(msg)
} else {
AppError::Internal(msg)
}
})?;
// ctx drops here; state holds Arc<WhisperInnerContext> so model stays loaded.
// ── GPU warmup ────────────────────────────────────────────────────────
// Run a silent 1-second inference to force CUDA JIT kernel compilation.
// Expected result: 0 segments (silence). The point is the side effect:
// all CUDA kernels are compiled and cached before the first real job arrives.
tracing::info!(model = path, "warming up GPU — compiling CUDA kernels...");
let silence = vec![0.0f32; 16_000]; // 1s @ 16 kHz
let mut wp = FullParams::new(SamplingStrategy::Greedy { best_of: 1 });
wp.set_language(Some("en"));
wp.set_print_progress(false);
wp.set_print_realtime(false);
wp.set_suppress_blank(true);
wp.set_no_context(true);
let _ = state.full(wp, &silence); // ignore result — 0 segments expected
tracing::info!("GPU warmup complete — ready for inference");
Ok(Self { state })
}
/// Transcribe 16 kHz mono f32 PCM samples.
/// `on_progress` receives 0100 from whisper.cpp.
///
/// The inference state (`self.state`) is reused across calls. GPU compute
/// buffers remain allocated, eliminating per-chunk `whisper_init_state` overhead.
/// `no_context=true` in the params prevents KV-cache contamination between chunks.
pub fn transcribe(
&self,
pcm: &[f32],
language: Option<&str>,
task: &str,
&mut self,
pcm: &[f32],
language: Option<&str>,
task: &str,
on_progress: impl Fn(u8) + Send + 'static,
) -> Result<(Vec<Segment>, String)> {
let mut state = self.ctx.create_state()
.map_err(|e| AppError::Internal(format!("create_state: {e}")))?;
let state = &mut self.state;
let mut fp = FullParams::new(SamplingStrategy::BeamSearch {
beam_size: 5,
patience: 1.0,
patience: 1.0,
});
fp.set_n_threads(num_cpus::get() as i32);
@@ -104,40 +157,55 @@ impl Transcriber {
.full(fp, pcm)
.map_err(|e| AppError::Internal(format!("transcription failed: {e}")))?;
let n_segments = state.full_n_segments()
let n_segments = state
.full_n_segments()
.map_err(|e| AppError::Internal(e.to_string()))?;
let mut segments = Vec::with_capacity(n_segments as usize);
for i in 0..n_segments {
let text = state.full_get_segment_text(i)
let text = state
.full_get_segment_text(i)
.map_err(|e| AppError::Internal(e.to_string()))?;
let start = state.full_get_segment_t0(i)
.map_err(|e| AppError::Internal(e.to_string()))? as f32 / 100.0;
let end = state.full_get_segment_t1(i)
.map_err(|e| AppError::Internal(e.to_string()))? as f32 / 100.0;
let start = state
.full_get_segment_t0(i)
.map_err(|e| AppError::Internal(e.to_string()))? as f32
/ 100.0;
let end = state
.full_get_segment_t1(i)
.map_err(|e| AppError::Internal(e.to_string()))? as f32
/ 100.0;
let n_tokens = state.full_n_tokens(i)
let n_tokens = state
.full_n_tokens(i)
.map_err(|e| AppError::Internal(e.to_string()))?;
let mut words = Vec::new();
for t in 0..n_tokens {
let token_text = state.full_get_token_text(i, t)
let token_text = state
.full_get_token_text(i, t)
.map_err(|e| AppError::Internal(e.to_string()))?;
if token_text.starts_with('[') {
continue; // skip special tokens ([MUSIC], [APPLAUSE], etc.)
}
let data = state.full_get_token_data(i, t)
let data = state
.full_get_token_data(i, t)
.map_err(|e| AppError::Internal(e.to_string()))?;
words.push(Word {
text: token_text,
start: data.t0 as f32 / 100.0,
end: data.t1 as f32 / 100.0,
text: token_text,
start: data.t0 as f32 / 100.0,
end: data.t1 as f32 / 100.0,
probability: data.p,
});
}
segments.push(Segment { index: i, start, end, text, words });
segments.push(Segment {
index: i,
start,
end,
text,
words,
});
}
let lang = state

File diff suppressed because it is too large Load Diff

View File

@@ -1,120 +1,173 @@
#!/usr/bin/env bash
set -euo pipefail
BASE="http://localhost:8090"
AUDIO="/home/moze/Sources/youtube-transcriber/docker/tmp/audio-b2167046-a236-4fcd-b739-78177542fd23.wav"
# ── Config — override via env vars ───────────────────────────────────────────
BASE="${WHISPER_BASE_URL:-http://localhost:8080}"
AUDIO="${TEST_AUDIO:-/home/moze/Sources/youtube-transcriber/docker/tmp/audio-b2167046-a236-4fcd-b739-78177542fd23.wav}"
GREEN='\033[0;32m'; RED='\033[0;31m'; NC='\033[0m'
FAILS=0
ok() { echo -e "${GREEN}[PASS]${NC} $*"; }
fail(){ echo -e "${RED}[FAIL]${NC} $*"; exit 1; }
fail(){ echo -e "${RED}[FAIL]${NC} $*"; FAILS=$((FAILS + 1)); }
echo "=== Whisper API test suite ==="
echo " BASE : $BASE"
echo " AUDIO : $AUDIO"
echo ""
echo "=== 1. GET /health ==="
HEALTH=$(curl -sf "$BASE/health")
echo "$HEALTH" | python3 -m json.tool
echo "$HEALTH" | python3 -c "import sys,json; d=json.load(sys.stdin); assert d['status']=='ok'" && ok "health"
python3 -c "
import sys, json
d = json.loads('$HEALTH' if False else sys.stdin.read())
assert d['status'] == 'ok', f'status={d[\"status\"]}'
assert 'model_state' in d, 'model_state field missing from health response'
" <<< "$HEALTH" && ok "health ok + model_state present" || fail "health check"
echo ""
echo "=== 2. GET /docs (Swagger UI reachable) ==="
curl -sf "$BASE/docs" | grep -q "swagger" && ok "swagger UI"
curl -sf "$BASE/docs" | grep -qi "swagger" && ok "swagger UI reachable" || fail "swagger UI"
echo ""
echo "=== 3. Webhook server (background nc loop) ==="
# Simple webhook receiver using Python
python3 - &
WEBHOOK_PID=$!
echo "=== 3. Webhook receiver (background Python HTTP server) ==="
cat > /tmp/webhook_receiver.py << 'PYEOF'
import http.server, json, sys
import http.server, json, sys, signal
class H(http.server.BaseHTTPRequestHandler):
def do_POST(self):
n = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(n)
print("\n[WEBHOOK] received:", json.dumps(json.loads(body), indent=2)[:500])
data = json.loads(body)
print(f"\n[WEBHOOK] status={data.get('status')} segments={len(data.get('segments', []))}", flush=True)
self.send_response(200)
self.end_headers()
def log_message(self, *a): pass
print("[WEBHOOK] listening on :9999")
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
print("[WEBHOOK] listening on :9999", flush=True)
http.server.HTTPServer(('', 9999), H).serve_forever()
PYEOF
kill $WEBHOOK_PID 2>/dev/null || true
python3 /tmp/webhook_receiver.py &
WEBHOOK_PID=$!
sleep 1
echo "Webhook receiver started (PID $WEBHOOK_PID)"
echo ""
echo "=== 4. DELETE a non-existent job → 404 ==="
STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "$BASE/jobs/00000000-0000-0000-0000-000000000000")
[ "$STATUS" = "404" ] && ok "DELETE 404 for unknown job" || fail "expected 404 got $STATUS"
echo "=== 4. GET /model/status — expect unloaded on fresh start ==="
MODEL_STATUS=$(curl -sf "$BASE/model/status")
echo "$MODEL_STATUS" | python3 -m json.tool
echo "$MODEL_STATUS" | python3 -c "
import sys, json
d = json.load(sys.stdin)
assert 'state' in d, 'state field missing from /model/status'
print(f' model state: {d[\"state\"]}')
" && ok "/model/status has state field" || fail "/model/status schema"
echo ""
echo "=== 5. POST /jobs — submit audio ==="
echo "=== 5. POST /model/load — trigger model load ==="
LOAD_RESP=$(curl -sf -X POST "$BASE/model/load")
echo "$LOAD_RESP"
ok "POST /model/load accepted"
echo ""
echo "=== 6. Poll /model/status until ready (max 3 min) ==="
LOAD_ELAPSED=0
while true; do
sleep 5
LOAD_ELAPSED=$((LOAD_ELAPSED + 5))
MS=$(curl -sf "$BASE/model/status")
STATE=$(echo "$MS" | python3 -c "import sys,json; print(json.load(sys.stdin)['state'])")
echo " [${LOAD_ELAPSED}s] model_state=${STATE}"
if [ "$STATE" = "ready" ]; then
ok "model loaded and ready in ${LOAD_ELAPSED}s"
break
fi
[ $LOAD_ELAPSED -gt 180 ] && { fail "model failed to load within 3 minutes"; break; }
done
echo ""
echo "=== 7. DELETE a non-existent job → 404 ==="
STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "$BASE/jobs/00000000-0000-0000-0000-000000000000")
[ "$STATUS" = "404" ] && ok "DELETE unknown job → 404" || fail "expected 404, got $STATUS"
echo ""
echo "=== 8. POST /jobs — submit audio ==="
SUBMIT=$(curl -sf -X POST "$BASE/jobs" \
-F "audio=@${AUDIO};type=audio/wav" \
-F "language=auto" \
-F "task=transcribe" \
-F "webhook_url=http://localhost:9999/webhook")
echo "$SUBMIT"
JOB_ID=$(echo "$SUBMIT" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
JOB_ID=$(echo "$SUBMIT" | python3 -c "import sys,json; print(json.load(sys.stdin)['job_id'])")
ok "submitted job $JOB_ID"
echo ""
echo "=== 6. GET /jobs/{id} immediately after submit ==="
echo "=== 9. GET /jobs/{id} immediately after submit ==="
JOB=$(curl -sf "$BASE/jobs/$JOB_ID")
echo "$JOB" | python3 -c "import sys,json; d=json.load(sys.stdin); assert d['status'] in ('queued','running')" \
&& ok "status is queued/running"
echo "$JOB" | python3 -c "
import sys, json
d = json.load(sys.stdin)
assert d['status'] in ('queued', 'running'), f'unexpected status: {d[\"status\"]}'
" && ok "status is queued/running" || fail "initial status check"
echo ""
echo "=== 7. SSE stream (first 15 events then detach) ==="
echo "=== 10. SSE stream (observe first 30 events then detach) ==="
echo "Subscribing to SSE stream for $JOB_ID"
curl -sN --max-time 60 "$BASE/jobs/$JOB_ID/stream" | head -30 &
curl -sN --max-time 90 "$BASE/jobs/$JOB_ID/stream" | head -60 &
SSE_PID=$!
echo ""
echo "=== 8. Poll until done (max 20 min) ==="
SECONDS=0
echo "=== 11. Poll until done (max 20 min) ==="
ELAPSED=0
while true; do
sleep 15
ELAPSED=$((ELAPSED + 15))
JOB=$(curl -sf "$BASE/jobs/$JOB_ID")
STATUS=$(echo "$JOB" | python3 -c "import sys,json; print(json.load(sys.stdin)['status'])")
echo " [${SECONDS}s] status=$STATUS"
PROGRESS=$(echo "$JOB" | python3 -c "import sys,json; print(json.load(sys.stdin).get('progress',0))")
echo " [${ELAPSED}s] status=$STATUS progress=${PROGRESS}%"
if [ "$STATUS" = "done" ]; then
ok "job finished in ${SECONDS}s"
ok "job finished in ${ELAPSED}s"
break
elif [ "$STATUS" = "failed" ]; then
echo "$JOB" | python3 -m json.tool
fail "job failed"
break
fi
[ $SECONDS -gt 1200 ] && fail "timeout after 20 minutes"
[ $ELAPSED -gt 1200 ] && { fail "timeout after 20 minutes"; break; }
done
kill $SSE_PID 2>/dev/null || true
echo ""
echo "=== 9. Inspect transcription quality ==="
echo "=== 12. Inspect transcription quality ==="
RESULT=$(curl -sf "$BASE/jobs/$JOB_ID")
echo "$RESULT" | python3 - << 'PYCHECK'
TMPJSON=$(mktemp /tmp/whisper_test_XXXXXX.json)
echo "$RESULT" > "$TMPJSON"
python3 - "$TMPJSON" << 'PYCHECK'
import sys, json, re
data = json.loads(sys.stdin.read())
with open(sys.argv[1]) as f:
data = json.load(f)
segments = data.get("segments", [])
print(f" Language : {data.get('language')}")
print(f" Duration : {data.get('duration_secs')}s")
print(f" Segments : {len(segments)}")
issues = []
if not segments:
print(" ✗ ZERO SEGMENTS — transcription likely failed silently")
sys.exit(1)
issues = []
for i, seg in enumerate(segments):
text = seg.get("text", "")
# --- repetition loop ---
words = text.strip().split()
if len(words) >= 6:
half = len(words) // 2
if words[:half] == words[half:half+half]:
issues.append(f" [seg {i}] REPETITION LOOP: {text[:80]}")
# --- long duplicate phrases ---
phrases = re.findall(r'(\b\w+ \w+ \w+\b)', text)
if len(phrases) != len(set(phrases)) and len(phrases) > 4:
issues.append(f" [seg {i}] DUPLICATE PHRASE: {text[:80]}")
# --- blank/empty segment ---
if not text.strip():
issues.append(f" [seg {i}] BLANK SEGMENT")
@@ -125,31 +178,52 @@ if issues:
else:
print("\n ✓ No repetition loops or blank segments detected")
# Print first 5 segments as sample
print("\n Sample output:")
print("\n Sample output (first 5 segments):")
for seg in segments[:5]:
print(f" [{seg['start']:.1f}{seg['end']:.1f}] {seg['text'][:100]}")
PYCHECK
PYEXIT=$?
rm -f "$TMPJSON"
[ $PYEXIT -eq 0 ] && ok "quality check passed" || fail "quality check"
echo ""
echo "=== 10. DELETE completed job ==="
STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "$BASE/jobs/$JOB_ID")
[ "$STATUS" = "204" ] || [ "$STATUS" = "200" ] && ok "DELETE returned $STATUS"
echo "=== 13. DELETE completed job → 409 Conflict ==="
DEL_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "$BASE/jobs/$JOB_ID")
[ "$DEL_STATUS" = "409" ] && ok "DELETE completed job → 409 Conflict (expected)" \
|| echo " [INFO] DELETE returned $DEL_STATUS"
echo ""
echo "=== 11. Submit + immediately cancel a job ==="
echo "=== 14. Submit + cancel a queued job ==="
JOB2=$(curl -sf -X POST "$BASE/jobs" \
-F "audio=@${AUDIO};type=audio/wav" \
-F "language=en" \
-F "task=transcribe")
JOB2_ID=$(echo "$JOB2" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
JOB2_ID=$(echo "$JOB2" | python3 -c "import sys,json; print(json.load(sys.stdin)['job_id'])")
sleep 1
DEL_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "$BASE/jobs/$JOB2_ID")
curl -s -X DELETE "$BASE/jobs/$JOB2_ID" > /dev/null
CANCEL_STATUS=$(curl -sf "$BASE/jobs/$JOB2_ID" | python3 -c "import sys,json; print(json.load(sys.stdin)['status'])")
[ "$CANCEL_STATUS" = "cancelled" ] && ok "cancel works ($DEL_STATUScancelled)"
[ "$CANCEL_STATUS" = "cancelled" ] && ok "cancel works → status=cancelled" \
|| echo " [INFO] cancel status: $CANCEL_STATUS (may be running — worker ignores cancel mid-chunk)"
echo ""
echo "=== 12. Verify webhook was fired ==="
echo "=== 15. POST /model/unload ==="
UNLOAD_RESP=$(curl -sf -X POST "$BASE/model/unload")
echo "$UNLOAD_RESP"
sleep 2
UNLOAD_STATE=$(curl -sf "$BASE/model/status" | python3 -c "import sys,json; print(json.load(sys.stdin)['state'])")
[ "$UNLOAD_STATE" = "unloaded" ] && ok "model unloaded → state=unloaded" \
|| echo " [INFO] state after unload: $UNLOAD_STATE"
echo ""
echo "=== 16. Verify webhook fired ==="
sleep 3
kill $WEBHOOK_PID 2>/dev/null || true
ok "all tests done"
ok "webhook server stopped"
echo ""
if [ $FAILS -eq 0 ]; then
echo -e "${GREEN}=== ALL TESTS PASSED ===${NC}"
else
echo -e "${RED}=== $FAILS TEST(S) FAILED ===${NC}"
exit 1
fi

246
tests/test_idle_timeout.sh Executable file
View File

@@ -0,0 +1,246 @@
#!/usr/bin/env bash
# tests/test_idle_timeout.sh
#
# Integration tests for the idle-timeout auto-unload feature.
# REQUIRES the server to be started with a short idle timeout:
#
# IDLE_TIMEOUT_SECS=5 ./whisper-server
# # or via Docker:
# docker run -e IDLE_TIMEOUT_SECS=5 ...
#
# The default idle timeout is 5 minutes; these tests use a 5-second window
# to keep the suite fast.
set -euo pipefail
BASE="${WHISPER_BASE_URL:-http://localhost:8080}"
IDLE_TIMEOUT="${EXPECTED_IDLE_TIMEOUT_SECS:-5}"
AUDIO="${TEST_AUDIO:-}"
GREEN='\033[0;32m'; RED='\033[0;31m'; YELLOW='\033[0;33m'; NC='\033[0m'
PASS=0; FAIL=0
ok() { echo -e "${GREEN}[PASS]${NC} $1"; PASS=$((PASS+1)); }
fail() { echo -e "${RED}[FAIL]${NC} $1"; FAIL=$((FAIL+1)); }
skip() { echo -e "${YELLOW}[SKIP]${NC} $1"; }
info() { echo " $1"; }
echo "=== Idle Timeout Tests ==="
echo " BASE: $BASE"
echo " IDLE_TIMEOUT_SECS: $IDLE_TIMEOUT (must be configured on the server)"
echo ""
echo "NOTE: These tests require the server to be running with IDLE_TIMEOUT_SECS=$IDLE_TIMEOUT"
echo ""
# ── Helpers ──────────────────────────────────────────────────────────────────
get_state() {
curl -sf "$BASE/model/status" | python3 -c "import sys,json; print(json.load(sys.stdin)['state'])"
}
ensure_ready() {
local state
state=$(get_state)
if [ "$state" = "ready" ]; then return 0; fi
curl -sf -X POST "$BASE/model/load" > /dev/null
local elapsed=0
while true; do
sleep 3; elapsed=$((elapsed+3))
state=$(get_state)
[ "$state" = "ready" ] && return 0
[ $elapsed -gt 180 ] && return 1
done
}
ensure_unloaded() {
curl -sf -X POST "$BASE/model/unload" > /dev/null || true
sleep 2
}
# ── TEST 1: Load model, complete a job, then wait for idle unload ─────────────
echo "--- Test 1: Idle timeout triggers auto-unload ---"
ensure_unloaded
ensure_ready || { fail "T1: model load failed"; }
WAIT_SECS=$((IDLE_TIMEOUT + 3))
info "Model is ready. Waiting $WAIT_SECS seconds (idle timeout=$IDLE_TIMEOUT + 3s buffer)..."
sleep $WAIT_SECS
STATE=$(get_state)
if [ "$STATE" = "unloaded" ]; then
ok "T1: model auto-unloaded after ${IDLE_TIMEOUT}s idle"
else
fail "T1: expected unloaded after idle timeout, got $STATE"
info "Is the server running with IDLE_TIMEOUT_SECS=$IDLE_TIMEOUT?"
fi
# ── TEST 2: model_unloaded webhook fires on idle timeout ─────────────────────
echo ""
echo "--- Test 2: model_unloaded webhook fires on idle timeout ---"
ensure_unloaded
# Start webhook receiver
python3 - <<'PYEOF' &
import http.server, json, sys, signal
class H(http.server.BaseHTTPRequestHandler):
def do_POST(self):
n = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(n))
with open('/tmp/idle_wh_event.json', 'w') as f:
json.dump(body, f)
self.send_response(200); self.end_headers()
def log_message(self, *a): pass
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
http.server.HTTPServer(('', 9995), H).serve_forever()
PYEOF
WH_PID=$!
sleep 1
# Register webhook via a job submission (will 503 since unloaded)
curl -sf -X POST "$BASE/jobs" \
-F "audio=@/dev/urandom;type=audio/wav" \
-F "webhook_url=http://localhost:9995/wh" \
--max-time 5 > /dev/null 2>&1 || true
# Load model
ensure_ready || { fail "T2: model load failed"; kill $WH_PID 2>/dev/null; }
# Wait for idle timeout
WAIT_SECS=$((IDLE_TIMEOUT + 5))
info "Waiting ${WAIT_SECS}s for idle timeout..."
sleep $WAIT_SECS
kill $WH_PID 2>/dev/null || true
wait $WH_PID 2>/dev/null || true
if [ -f /tmp/idle_wh_event.json ]; then
EVENT_TYPE=$(python3 -c "import json; print(json.load(open('/tmp/idle_wh_event.json')).get('type','?'))")
rm -f /tmp/idle_wh_event.json
[ "$EVENT_TYPE" = "model_unloaded" ] && ok "T2: model_unloaded webhook fired on idle timeout" \
|| fail "T2: webhook type=$EVENT_TYPE (expected model_unloaded)"
else
fail "T2: no webhook received within timeout"
fi
# ── TEST 3: Job submission after idle timeout → 503 → triggers reload ─────────
echo ""
echo "--- Test 3: Job triggers reload after idle unload ---"
ensure_unloaded
ensure_ready || { fail "T3: initial load failed"; }
# Wait for auto-unload
WAIT_SECS=$((IDLE_TIMEOUT + 3))
info "Waiting ${WAIT_SECS}s for idle unload..."
sleep $WAIT_SECS
STATE=$(get_state)
[ "$STATE" = "unloaded" ] || info "Note: state=$STATE (expected unloaded)"
# Submit job → 503, triggers reload
HTTP=$(curl -s -o /tmp/t3_body.json -w "%{http_code}" -X POST "$BASE/jobs" \
-F "audio=@/dev/urandom;type=audio/wav" \
--max-time 5 2>/dev/null || echo "000")
if [ "$HTTP" = "503" ]; then
ok "T3a: POST /jobs → 503 after idle unload"
else
skip "T3a: POST /jobs returned $HTTP (model may have reloaded)"
fi
# State should be loading or ready (reload triggered by job submission)
sleep 2
STATE=$(get_state)
if [ "$STATE" = "loading" ] || [ "$STATE" = "ready" ]; then
ok "T3b: reload triggered by job submission ($STATE)"
else
fail "T3b: expected loading/ready, got $STATE"
fi
rm -f /tmp/t3_body.json
# ── TEST 4: Idle timer resets per job (wait 60% of timeout → still ready) ─────
echo ""
echo "--- Test 4: Idle timer resets with each completed job ---"
ensure_unloaded
ensure_ready || { fail "T4: model load failed"; }
HALF_WAIT=$((IDLE_TIMEOUT - 1))
info "Waiting ${HALF_WAIT}s (less than idle timeout)..."
sleep $HALF_WAIT
STATE=$(get_state)
if [ "$STATE" = "ready" ]; then
ok "T4a: model still ready after ${HALF_WAIT}s (less than ${IDLE_TIMEOUT}s timeout)"
else
fail "T4a: model unexpectedly $STATE after only ${HALF_WAIT}s"
fi
# Wait for full unload
REMAINING=$((IDLE_TIMEOUT - HALF_WAIT + 3))
info "Waiting another ${REMAINING}s for full idle unload..."
sleep $REMAINING
STATE=$(get_state)
[ "$STATE" = "unloaded" ] && ok "T4b: model unloaded after total > ${IDLE_TIMEOUT}s idle" \
|| fail "T4b: expected unloaded, got $STATE"
# ── TEST 5: Job resets idle timer ─────────────────────────────────────────────
echo ""
echo "--- Test 5: Completing a job resets the idle timer ---"
if [ -z "$AUDIO" ]; then
skip "T5: TEST_AUDIO not set — skipping timer-reset test"
else
ensure_unloaded
ensure_ready || { fail "T5: model load failed"; }
# Submit a job
SUBMIT=$(curl -sf -X POST "$BASE/jobs" \
-F "audio=@${AUDIO};type=audio/wav" \
-F "task=transcribe" 2>&1)
JOB_ID=$(echo "$SUBMIT" | python3 -c "import sys,json; print(json.load(sys.stdin)['job_id'])" 2>/dev/null || echo "")
if [ -z "$JOB_ID" ]; then
fail "T5: job submission failed"
else
# Wait for job to finish
elapsed=0
while true; do
sleep 5; elapsed=$((elapsed+5))
STATUS=$(curl -sf "$BASE/jobs/$JOB_ID" | python3 -c "import sys,json; print(json.load(sys.stdin)['status'])")
[ "$STATUS" = "done" ] || [ "$STATUS" = "failed" ] && break
[ $elapsed -gt 300 ] && break
done
info "Job finished in ${elapsed}s with status=$STATUS"
# Now wait IDLE_TIMEOUT - 2 seconds — should still be ready
SAFE_WAIT=$((IDLE_TIMEOUT - 2))
[ $SAFE_WAIT -lt 1 ] && SAFE_WAIT=1
info "Waiting ${SAFE_WAIT}s after job completion (less than idle timeout)..."
sleep $SAFE_WAIT
STATE=$(get_state)
[ "$STATE" = "ready" ] && ok "T5a: model still ready ${SAFE_WAIT}s after job completion" \
|| fail "T5a: model unexpectedly $STATE after job"
# Wait for idle timeout
REMAINING=$((IDLE_TIMEOUT - SAFE_WAIT + 3))
info "Waiting ${REMAINING}s more for idle unload..."
sleep $REMAINING
STATE=$(get_state)
[ "$STATE" = "unloaded" ] && ok "T5b: model auto-unloaded after idle period post-job" \
|| fail "T5b: expected unloaded, got $STATE"
fi
fi
# ── Summary ────────────────────────────────────────────────────────────────────
echo ""
echo "=========================================="
echo " Results: ${PASS} passed, ${FAIL} failed"
echo "=========================================="
[ $FAIL -eq 0 ] && echo -e "${GREEN}ALL PASSED${NC}" || { echo -e "${RED}FAILURES: $FAIL${NC}"; exit 1; }

470
tests/test_model_lifecycle.sh Executable file
View File

@@ -0,0 +1,470 @@
#!/usr/bin/env bash
# tests/test_model_lifecycle.sh
#
# Integration tests for dynamic model loading/unloading.
# Requires a running whisper-server with GPU access.
#
# Usage:
# WHISPER_BASE_URL=http://localhost:8080 bash tests/test_model_lifecycle.sh
#
# Tests are designed to be independent; each section that needs a specific
# state resets it explicitly at the start.
set -euo pipefail
BASE="${WHISPER_BASE_URL:-http://localhost:8080}"
AUDIO="${TEST_AUDIO:-}"
GREEN='\033[0;32m'; RED='\033[0;31m'; YELLOW='\033[0;33m'; NC='\033[0m'
PASS=0; FAIL=0
ok() { echo -e "${GREEN}[PASS]${NC} $1"; PASS=$((PASS+1)); }
fail() { echo -e "${RED}[FAIL]${NC} $1"; FAIL=$((FAIL+1)); }
skip() { echo -e "${YELLOW}[SKIP]${NC} $1"; }
info() { echo " $1"; }
echo "=== Model Lifecycle Integration Tests ==="
echo " BASE: $BASE"
echo ""
# ── Helpers ──────────────────────────────────────────────────────────────────
get_state() {
curl -sf "$BASE/model/status" | python3 -c "import sys,json; print(json.load(sys.stdin)['state'])"
}
ensure_unloaded() {
curl -sf -X POST "$BASE/model/unload" > /dev/null
sleep 2
local s
s=$(get_state)
if [ "$s" != "unloaded" ]; then
echo " WARNING: expected unloaded, got $s — waiting 5s"
sleep 5
fi
}
ensure_ready() {
local state
state=$(get_state)
if [ "$state" = "ready" ]; then return 0; fi
curl -sf -X POST "$BASE/model/load" > /dev/null
local elapsed=0
while true; do
sleep 5; elapsed=$((elapsed+5))
state=$(get_state)
[ "$state" = "ready" ] && return 0
[ $elapsed -gt 180 ] && echo " TIMEOUT: model did not become ready" && return 1
done
}
poll_state_transition() {
local target="$1" max_secs="${2:-120}"
local elapsed=0
while true; do
sleep 2; elapsed=$((elapsed+2))
local s
s=$(get_state)
[ "$s" = "$target" ] && return 0
[ $elapsed -ge $max_secs ] && return 1
done
}
# ── TEST 1: Startup state is unloaded ────────────────────────────────────────
echo "--- Test 1: Startup state is unloaded (or after explicit unload) ---"
ensure_unloaded
STATE=$(get_state)
if [ "$STATE" = "unloaded" ]; then
ok "T1: state=unloaded after explicit unload"
else
fail "T1: expected unloaded, got $STATE"
fi
# ── TEST 2: POST /model/load returns 202 ─────────────────────────────────────
echo ""
echo "--- Test 2: POST /model/load returns 202 ---"
ensure_unloaded
HTTP=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/model/load")
if [ "$HTTP" = "202" ]; then
ok "T2: POST /model/load → 202 Accepted"
else
fail "T2: expected 202, got $HTTP"
fi
# Cancel the in-progress load to clean up
curl -sf -X POST "$BASE/model/unload" > /dev/null || true
sleep 2
# ── TEST 3: State transitions to loading/ready after load trigger ─────────────
echo ""
echo "--- Test 3: State transitions to loading (not stuck at unloaded) ---"
ensure_unloaded
curl -sf -X POST "$BASE/model/load" > /dev/null
sleep 1
STATE=$(get_state)
if [ "$STATE" = "loading" ] || [ "$STATE" = "ready" ]; then
ok "T3: state transitioned to $STATE (not stuck at unloaded)"
else
fail "T3: expected loading or ready, got $STATE"
fi
# ── TEST 4: Model reaches ready state and loaded_at is set ───────────────────
echo ""
echo "--- Test 4: Model reaches ready state with loaded_at timestamp ---"
# Already loading from T3 — wait for ready
if ! poll_state_transition "ready" 180; then
fail "T4: model did not become ready within 3 minutes"
else
STATUS_JSON=$(curl -sf "$BASE/model/status")
LOADED_AT=$(echo "$STATUS_JSON" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('loaded_at','MISSING'))" 2>/dev/null || echo "MISSING")
if [ "$LOADED_AT" != "MISSING" ] && [ "$LOADED_AT" != "null" ] && [ -n "$LOADED_AT" ]; then
ok "T4: model=ready, loaded_at=$LOADED_AT"
else
fail "T4: model ready but loaded_at is missing or null"
fi
fi
# ── TEST 5: Idempotent load — POST /model/load when ready returns 200 ─────────
echo ""
echo "--- Test 5: POST /model/load when already ready → 200 ---"
ensure_ready || { fail "T5: could not load model"; }
HTTP=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/model/load")
STATE=$(get_state)
if [ "$HTTP" = "200" ] && [ "$STATE" = "ready" ]; then
ok "T5: idempotent load → 200, state stays ready"
elif [ "$HTTP" = "202" ] && [ "$STATE" = "ready" ]; then
ok "T5: idempotent load → 202, state stays ready"
else
fail "T5: expected 200 and ready, got HTTP=$HTTP state=$STATE"
fi
# ── TEST 6: Job accepted when ready (segments > 0) ────────────────────────────
echo ""
echo "--- Test 6: Job accepted when model is ready ---"
if [ -z "$AUDIO" ]; then
skip "T6: TEST_AUDIO not set — skipping job submission test"
else
ensure_ready || { fail "T6: model load failed"; }
SUBMIT=$(curl -sf -X POST "$BASE/jobs" -F "audio=@${AUDIO};type=audio/wav" -F "task=transcribe" 2>&1)
JOB_ID=$(echo "$SUBMIT" | python3 -c "import sys,json; print(json.load(sys.stdin)['job_id'])" 2>/dev/null || echo "")
if [ -n "$JOB_ID" ]; then
ok "T6: job accepted, id=$JOB_ID"
# Poll to done
elapsed=0
while true; do
sleep 10; elapsed=$((elapsed+10))
STATUS=$(curl -sf "$BASE/jobs/$JOB_ID" | python3 -c "import sys,json; print(json.load(sys.stdin)['status'])")
[ "$STATUS" = "done" ] && break
[ "$STATUS" = "failed" ] && break
[ $elapsed -gt 600 ] && break
done
SEGS=$(curl -sf "$BASE/jobs/$JOB_ID" | python3 -c "import sys,json; d=json.load(sys.stdin); print(len(d.get('segments',[])))")
[ "$SEGS" -gt 0 ] && ok "T6b: job done with $SEGS segments" || fail "T6b: job done but 0 segments"
else
fail "T6: job submission failed: $SUBMIT"
fi
fi
# ── TEST 7: POST /model/unload → state=unloaded ───────────────────────────────
echo ""
echo "--- Test 7: POST /model/unload ---"
ensure_ready || { fail "T7: model load failed"; }
curl -sf -X POST "$BASE/model/unload" > /dev/null
sleep 3
STATE=$(get_state)
if [ "$STATE" = "unloaded" ]; then
ok "T7: POST /model/unload → state=unloaded"
else
fail "T7: expected unloaded after unload, got $STATE"
fi
# ── TEST 8: POST /jobs when unloaded → 503 + Retry-After ─────────────────────
echo ""
echo "--- Test 8: POST /jobs when unloaded → 503 + Retry-After ---"
ensure_unloaded
# Submit a tiny dummy payload (won't be valid audio but that's ok for this test)
HTTP=$(curl -s -o /tmp/t8_body.json -w "%{http_code}" -X POST "$BASE/jobs" \
-F "audio=@/dev/urandom;type=audio/wav" \
--max-time 5 2>/dev/null || echo "000")
# If the model auto-loads it might start processing; check for 503 first
if [ "$HTTP" = "503" ]; then
RETRY_AFTER=$(curl -sI -X POST "$BASE/jobs" \
-F "audio=@/dev/urandom;type=audio/wav" \
--max-time 5 2>/dev/null | grep -i "retry-after" | awk '{print $2}' | tr -d '\r' || echo "")
BODY=$(cat /tmp/t8_body.json 2>/dev/null || echo "{}")
HAS_STATE=$(echo "$BODY" | python3 -c "import sys,json; d=json.load(sys.stdin); print('state' in d)" 2>/dev/null || echo "False")
HAS_RETRY=$(echo "$BODY" | python3 -c "import sys,json; d=json.load(sys.stdin); print('retry_after_secs' in d)" 2>/dev/null || echo "False")
if [ "$HAS_STATE" = "True" ] && [ "$HAS_RETRY" = "True" ]; then
ok "T8: 503 with state + retry_after_secs in body"
else
fail "T8: 503 but body missing state/retry_after_secs. body=$BODY"
fi
if [ -n "$RETRY_AFTER" ]; then
ok "T8b: Retry-After header present: $RETRY_AFTER"
else
fail "T8b: Retry-After header missing from 503 response"
fi
else
skip "T8: got HTTP $HTTP (model may have loaded before check) — skipping"
fi
# ── TEST 9: Rejected job triggers load ────────────────────────────────────────
echo ""
echo "--- Test 9: Job rejection triggers model load ---"
ensure_unloaded
# Send a job (we expect 503)
curl -sf -X POST "$BASE/jobs" \
-F "audio=@/dev/urandom;type=audio/wav" \
--max-time 5 > /dev/null 2>&1 || true
sleep 2
STATE=$(get_state)
if [ "$STATE" = "loading" ] || [ "$STATE" = "ready" ]; then
ok "T9: model started loading after job rejection ($STATE)"
else
fail "T9: expected loading/ready after job rejection, got $STATE"
fi
# Stop the load to clean up
curl -sf -X POST "$BASE/model/unload" > /dev/null || true
sleep 2
# ── TEST 10: Retry-After values ───────────────────────────────────────────────
echo ""
echo "--- Test 10: Retry-After values match state ---"
ensure_unloaded
# Unloaded → Retry-After: 30
RESP_UNLOADED=$(curl -si -X POST "$BASE/jobs" -F "audio=@/dev/urandom;type=audio/wav" --max-time 5 2>/dev/null || echo "")
RA_UNLOADED=$(echo "$RESP_UNLOADED" | grep -i "retry-after" | awk '{print $2}' | tr -d '\r' || echo "")
[ "$RA_UNLOADED" = "30" ] && ok "T10a: Retry-After=30 when unloaded" \
|| skip "T10a: Retry-After=$RA_UNLOADED (expected 30) — model may have started loading"
# ── TEST 11: Retry-After=10 during loading ────────────────────────────────────
echo ""
echo "--- Test 11: Retry-After=10 when loading ---"
ensure_unloaded
curl -sf -X POST "$BASE/model/load" > /dev/null
sleep 1 # In loading state
STATE=$(get_state)
if [ "$STATE" = "loading" ]; then
RESP_LOADING=$(curl -si -X POST "$BASE/jobs" -F "audio=@/dev/urandom;type=audio/wav" --max-time 5 2>/dev/null || echo "")
RA_LOADING=$(echo "$RESP_LOADING" | grep -i "retry-after" | awk '{print $2}' | tr -d '\r' || echo "")
[ "$RA_LOADING" = "10" ] && ok "T11: Retry-After=10 when loading" \
|| fail "T11: expected Retry-After=10, got '$RA_LOADING' (state=$STATE)"
else
skip "T11: model already $STATE — can't test loading state Retry-After"
fi
# ── TEST 12: 503 body schema validation ──────────────────────────────────────
echo ""
echo "--- Test 12: 503 body schema validation ---"
ensure_unloaded
BODY=$(curl -sf -X POST "$BASE/jobs" -F "audio=@/dev/urandom;type=audio/wav" --max-time 5 2>/dev/null || echo "{}")
python3 - <<PYCHECK
import json
body = json.loads('$BODY')
required = {'error', 'state', 'retry_after_secs'}
missing = required - set(body.keys())
if missing:
print(f"MISSING: {missing}")
exit(1)
assert body['error'] == 'model_not_ready', f"error={body['error']}"
assert isinstance(body['retry_after_secs'], int), f"retry_after_secs not int: {body['retry_after_secs']}"
print("schema ok")
PYCHECK
[ $? -eq 0 ] && ok "T12: 503 body has correct schema" || fail "T12: 503 body schema invalid"
# ── TEST 13: GET /health has model_state field ────────────────────────────────
echo ""
echo "--- Test 13: GET /health has model_state ---"
HEALTH=$(curl -sf "$BASE/health")
HAS_MODEL_STATE=$(echo "$HEALTH" | python3 -c "import sys,json; d=json.load(sys.stdin); print('model_state' in d)")
[ "$HAS_MODEL_STATE" = "True" ] && ok "T13: /health has model_state" || fail "T13: /health missing model_state"
# ── TEST 14: SSE /model/events delivers model_ready event ─────────────────────
echo ""
echo "--- Test 14: GET /model/events SSE delivers model_ready ---"
ensure_unloaded
# Collect SSE events for up to 3 minutes
SSE_LOG=$(mktemp /tmp/sse_events_XXXXXX.txt)
curl -sN --max-time 180 "$BASE/model/events" > "$SSE_LOG" &
SSE_PID=$!
sleep 1
# Trigger load
curl -sf -X POST "$BASE/model/load" > /dev/null
poll_state_transition "ready" 180 || true
sleep 2
kill $SSE_PID 2>/dev/null || true
wait $SSE_PID 2>/dev/null || true
if grep -q "model_loading" "$SSE_LOG" 2>/dev/null; then
ok "T14a: SSE received model_loading event"
else
fail "T14a: SSE did not receive model_loading event"
fi
if grep -q "model_ready" "$SSE_LOG" 2>/dev/null; then
ok "T14b: SSE received model_ready event"
else
fail "T14b: SSE did not receive model_ready event"
fi
# Now unload to get model_unloaded event
curl -sf -X POST "$BASE/model/unload" > /dev/null
sleep 1
SSE_LOG2=$(mktemp /tmp/sse_events_XXXXXX.txt)
curl -sN --max-time 10 "$BASE/model/events" > "$SSE_LOG2" &
SSE_PID2=$!
sleep 2
kill $SSE_PID2 2>/dev/null || true
wait $SSE_PID2 2>/dev/null || true
# model_unloaded fires immediately on unload command
if grep -q "model_unloaded" "$SSE_LOG" 2>/dev/null || grep -q "model_unloaded" "$SSE_LOG2" 2>/dev/null; then
ok "T14c: SSE received model_unloaded event"
else
fail "T14c: SSE did not receive model_unloaded event"
fi
rm -f "$SSE_LOG" "$SSE_LOG2"
# ── TEST 15: model_ready webhook fires after load ──────────────────────────────
echo ""
echo "--- Test 15: model_ready webhook ---"
ensure_unloaded
# Start webhook receiver
WEBHOOK_LOG=$(mktemp /tmp/webhook_log_XXXXXX.txt)
python3 - <<'PYEOF' &
import http.server, json, sys, signal, os
class H(http.server.BaseHTTPRequestHandler):
def do_POST(self):
n = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(n))
with open('/tmp/t15_webhook.json', 'w') as f:
json.dump(body, f)
self.send_response(200); self.end_headers()
def log_message(self, *a): pass
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
http.server.HTTPServer(('', 9998), H).serve_forever()
PYEOF
WBOOK_PID=$!
sleep 1
# Register a webhook via a (doomed) job submission — this registers the URL
# even though the model is unloaded (and the job will 503)
curl -sf -X POST "$BASE/jobs" \
-F "audio=@/dev/urandom;type=audio/wav" \
-F "webhook_url=http://localhost:9998/wh" \
--max-time 5 > /dev/null 2>&1 || true
# Now load the model
curl -sf -X POST "$BASE/model/load" > /dev/null
poll_state_transition "ready" 180 || true
sleep 3
kill $WBOOK_PID 2>/dev/null || true
wait $WBOOK_PID 2>/dev/null || true
if [ -f /tmp/t15_webhook.json ]; then
EVENT_TYPE=$(python3 -c "import json; d=json.load(open('/tmp/t15_webhook.json')); print(d.get('type','?'))")
[ "$EVENT_TYPE" = "model_ready" ] && ok "T15: model_ready webhook fired" \
|| fail "T15: webhook fired but type=$EVENT_TYPE (expected model_ready)"
rm -f /tmp/t15_webhook.json
else
fail "T15: model_ready webhook not received within timeout"
fi
# ── TEST 16: model_unloaded webhook fires ─────────────────────────────────────
echo ""
echo "--- Test 16: model_unloaded webhook ---"
python3 - <<'PYEOF' &
import http.server, json, sys, signal
class H(http.server.BaseHTTPRequestHandler):
def do_POST(self):
n = int(self.headers.get('Content-Length', 0))
body = json.loads(self.rfile.read(n))
with open('/tmp/t16_webhook.json', 'w') as f:
json.dump(body, f)
self.send_response(200); self.end_headers()
def log_message(self, *a): pass
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
http.server.HTTPServer(('', 9997), H).serve_forever()
PYEOF
WBOOK2_PID=$!
sleep 1
# Register webhook URL
curl -sf -X POST "$BASE/jobs" \
-F "audio=@/dev/urandom;type=audio/wav" \
-F "webhook_url=http://localhost:9997/wh" \
--max-time 5 > /dev/null 2>&1 || true
ensure_ready
# Unload
curl -sf -X POST "$BASE/model/unload" > /dev/null
sleep 5
kill $WBOOK2_PID 2>/dev/null || true
wait $WBOOK2_PID 2>/dev/null || true
if [ -f /tmp/t16_webhook.json ]; then
EVENT_TYPE=$(python3 -c "import json; d=json.load(open('/tmp/t16_webhook.json')); print(d.get('type','?'))")
[ "$EVENT_TYPE" = "model_unloaded" ] && ok "T16: model_unloaded webhook fired" \
|| fail "T16: webhook type=$EVENT_TYPE (expected model_unloaded)"
rm -f /tmp/t16_webhook.json
else
fail "T16: model_unloaded webhook not received"
fi
# ── TEST 17: Concurrent load requests — single load, stable ready ─────────────
echo ""
echo "--- Test 17: Concurrent POST /model/load requests ---"
ensure_unloaded
# Send 3 concurrent load requests
curl -sf -X POST "$BASE/model/load" > /dev/null &
curl -sf -X POST "$BASE/model/load" > /dev/null &
curl -sf -X POST "$BASE/model/load" > /dev/null &
wait
poll_state_transition "ready" 180 || true
STATE=$(get_state)
[ "$STATE" = "ready" ] && ok "T17: concurrent loads handled cleanly, state=ready" \
|| fail "T17: expected ready after concurrent loads, got $STATE"
# ── TEST 18: POST /model/unload during loading → clean unloaded ───────────────
echo ""
echo "--- Test 18: POST /model/unload during loading ---"
ensure_unloaded
curl -sf -X POST "$BASE/model/load" > /dev/null
sleep 1 # Hopefully still in loading state
curl -sf -X POST "$BASE/model/unload" > /dev/null
# Allow time for the unload to propagate
sleep 5
STATE=$(get_state)
if [ "$STATE" = "unloaded" ]; then
ok "T18: unload during loading → clean unloaded"
elif [ "$STATE" = "ready" ]; then
# Load completed before unload arrived — immediately unload
curl -sf -X POST "$BASE/model/unload" > /dev/null
sleep 3
STATE=$(get_state)
[ "$STATE" = "unloaded" ] && ok "T18: load completed then unloaded (race condition OK)" \
|| fail "T18: state=$STATE after load+unload"
else
fail "T18: unexpected state after unload-during-load: $STATE"
fi
# ── Summary ────────────────────────────────────────────────────────────────────
echo ""
echo "=========================================="
echo " Results: ${PASS} passed, ${FAIL} failed"
echo "=========================================="
[ $FAIL -eq 0 ] && echo -e "${GREEN}ALL PASSED${NC}" || { echo -e "${RED}FAILURES: $FAIL${NC}"; exit 1; }