From 35e7ea8d286cadda1059a77748b2a974e4f7b5df Mon Sep 17 00:00:00 2001 From: mozempk Date: Wed, 6 May 2026 02:00:46 +0200 Subject: [PATCH] feat: progress reporting with chunk context + live job persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ProgressEvent::Progress now carries chunk index and total count - SsePayload::Progress gains chunk / chunks_total fields → SSE clients can show 'chunk N/51' instead of bare percent - process_job persists job.progress to storage at each chunk boundary → GET /jobs/:id now shows live progress (not stuck at 0) - Emits Progress event at chunk START (boundary event), not just on whisper's internal callback - entropy_thold raised to 3.5 (catches medium-phrase loops; triggers whisper's own temperature-retry instead of silent repetition) - no_speech_thold removed (confirmed // TODO: not implemented in whisper.cpp source; was a no-op) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/models.rs | 13 +++++++++--- src/routes/jobs.rs | 4 ++-- src/transcriber.rs | 53 +++++++++++++++++++++++++++------------------- src/worker.rs | 40 ++++++++++++++++++++++++++-------- 4 files changed, 74 insertions(+), 36 deletions(-) diff --git a/src/models.rs b/src/models.rs index 109b9c2..7f776ee 100644 --- a/src/models.rs +++ b/src/models.rs @@ -137,7 +137,14 @@ pub struct HealthResponse { #[derive(Debug, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum SsePayload { - Progress { percent: u8 }, - Done { job: Box }, - Error { message: String }, + Progress { + /// Overall job progress 0–100. + percent: u8, + /// 1-based index of the chunk currently being transcribed. + chunk: usize, + /// Total number of silence-split chunks in this job. + chunks_total: usize, + }, + Done { job: Box }, + Error { message: String }, } diff --git a/src/routes/jobs.rs b/src/routes/jobs.rs index 34ff0dd..1cdb9c0 100644 --- a/src/routes/jobs.rs +++ b/src/routes/jobs.rs @@ -192,9 +192,9 @@ pub async fn stream_job( let sse_stream: SseStream = Box::pin(BroadcastStream::new(rx).filter_map(|msg| async move { let event = match msg { - Ok(ProgressEvent::Progress(p)) => { + Ok(ProgressEvent::Progress { percent, chunk, total }) => { let payload = serde_json::to_string( - &crate::models::SsePayload::Progress { percent: p } + &crate::models::SsePayload::Progress { percent, chunk, chunks_total: total } ).ok()?; Event::default().event("progress").data(payload) } diff --git a/src/transcriber.rs b/src/transcriber.rs index 8240365..95bdc48 100644 --- a/src/transcriber.rs +++ b/src/transcriber.rs @@ -16,8 +16,9 @@ pub struct Transcriber { } impl Transcriber { - /// Load a GGML model file and configure GPU / Flash Attention for RTX 2080. - pub fn load(model_path: impl AsRef, gpu_device: u32) -> Result { let path = model_path.as_ref().to_str().ok_or_else(|| { + /// Load a GGML model file and configure GPU for RTX 2080. + pub fn load(model_path: impl AsRef, gpu_device: u32) -> Result { + let path = model_path.as_ref().to_str().ok_or_else(|| { AppError::Internal("model path is not valid UTF-8".into()) })?; @@ -25,8 +26,7 @@ impl Transcriber { params.use_gpu(true); params.gpu_device(gpu_device as i32); // Flash Attention disabled: causes silent 0-segment output on some - // real-world audio (conference recordings, noisy MP3s). Standard - // CUDA attention is safe on all content types. + // real-world audio (conference recordings, noisy MP3s). // params.flash_attn(true); let ctx = WhisperContext::new_with_params(path, params) @@ -36,10 +36,8 @@ impl Transcriber { Ok(Self { ctx }) } - /// Transcribe audio samples. - /// - /// `pcm` must be 16 kHz mono f32 samples. - /// `on_progress` is called periodically with a 0–100 integer. + /// Transcribe 16 kHz mono f32 PCM samples. + /// `on_progress` receives 0–100 from whisper.cpp. pub fn transcribe( &self, pcm: &[f32], @@ -57,19 +55,33 @@ impl Transcriber { fp.set_n_threads(num_cpus::get() as i32); fp.set_temperature(0.0); - fp.set_temperature_inc(0.2); + fp.set_temperature_inc(0.2); // retry schedule: 0.0 → 0.2 → 0.4 … → 1.0 on threshold failure - // ── Anti-hallucination / quality guards ─────────────────────────────── - // no_speech_thold: segments where p(no-speech) > threshold are dropped. - // 0.6 is the whisper.cpp default — safe for real-world and clean audio. - // (0.0 would suppress *everything*; 1.0 disables the filter entirely.) - fp.set_no_speech_thold(0.6); - fp.set_entropy_thold(2.4); + // ── Whisper-native quality / anti-hallucination parameters ─────────── + // + // entropy_thold (compression ratio threshold): + // If a 30s window's token entropy < threshold, the decoder is marked + // failed and whisper retries the window with temperature += temperature_inc. + // entropy = -Σ p·log₂(p) over the unique tokens produced. + // Default 2.4 only catches 1–3 unique-token loops (e.g. "Yeah. Yeah."). + // Raising to 3.5 also catches medium-phrase loops (9-word phrases have + // theoretical entropy ≈ log₂(9) ≈ 3.17 — above 2.4 but below 3.5). + // The retry mechanism then forces temperature diversity, breaking the loop. + fp.set_entropy_thold(3.5); + + // logprob_thold: fail + retry if average log-probability < threshold. + // Default -1.0 is fine; tighten slightly to catch low-confidence windows. fp.set_logprob_thold(-1.0); + + // no_speech_thold: listed in the header but marked "TODO: not implemented" + // in whisper.cpp source — calling it has no effect, so omitted. + fp.set_suppress_blank(true); - fp.set_suppress_non_speech_tokens(true); - // Prevent repetition loops on long audio: do not feed the previous - // segment's text back as context for the next segment. + fp.set_suppress_non_speech_tokens(true); // suppress [MUSIC]/[APPLAUSE]/etc. + + // no_context: do not use previous call's transcript as initial prompt. + // Each silence-chunked audio segment is independent; cross-chunk context + // would re-anchor the decoder to any hallucinations from the prior chunk. fp.set_no_context(true); fp.set_print_progress(false); @@ -83,7 +95,6 @@ impl Transcriber { fp.set_translate(task == "translate"); - // Progress callback — whisper.cpp calls this with 0–100 fp.set_progress_callback_safe(move |p| on_progress(p as u8)); state @@ -110,9 +121,8 @@ impl Transcriber { for t in 0..n_tokens { let token_text = state.full_get_token_text(i, t) .map_err(|e| AppError::Internal(e.to_string()))?; - // Skip special tokens (they start with '[') if token_text.starts_with('[') { - continue; + continue; // skip special tokens ([MUSIC], [APPLAUSE], etc.) } let data = state.full_get_token_data(i, t) .map_err(|e| AppError::Internal(e.to_string()))?; @@ -127,7 +137,6 @@ impl Transcriber { segments.push(Segment { index: i, start, end, text, words }); } - // Detect language used let lang = state .full_lang_id_from_state() .ok() diff --git a/src/worker.rs b/src/worker.rs index c5993c1..3649859 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -22,7 +22,8 @@ pub type ProgressTx = broadcast::Sender; #[derive(Debug, Clone)] pub enum ProgressEvent { - Progress(u8), + /// `percent` — overall 0–100; `chunk` — 1-based; `total` — total chunks. + Progress { percent: u8, chunk: usize, total: usize }, Done(Box), Error(String), } @@ -93,7 +94,7 @@ fn transcriber_thread( } } -pub(crate) async fn run( +async fn run( mut job_rx: mpsc::UnboundedReceiver, storage: Arc, queue_depth: Arc, @@ -134,7 +135,7 @@ pub(crate) async fn run( let audio_path = audio_path_for(&job_id); - let result = process_job(&job, &audio_path, &progress_tx, &tx_req).await; + let result = process_job(&job, &audio_path, &progress_tx, &tx_req, &storage).await; let _ = tokio::fs::remove_file(&audio_path).await; @@ -176,8 +177,9 @@ pub(crate) async fn run( // ── Silence-based chunking ──────────────────────────────────────────────────── -/// Target chunk length. Smaller = safer (less hallucination budget per chunk). -const TARGET_CHUNK_SECS: f32 = 180.0; +/// Target chunk length. 60s ≈ 2× whisper's native 30s window — short enough +/// that a hallucinated phrase can't compound beyond a single window. +const TARGET_CHUNK_SECS: f32 = 60.0; /// How far from the target we'll snap to a silence midpoint. const SNAP_WINDOW_SECS: f32 = 30.0; /// Silence below this level (dB) counts as a split candidate. @@ -296,12 +298,13 @@ async fn process_job( audio_path: &std::path::Path, progress_tx: &ProgressTx, tx_req: &std::sync::mpsc::Sender, + storage: &Arc, ) -> crate::Result<(Vec, String, f32)> { // 1. Decode full audio to 16 kHz mono PCM. let pcm = decode_audio(audio_path).await?; let total_secs = pcm.len() as f32 / 16_000.0; - // 2. Detect silence from the original file (fast amplitude scan). + // 2. Detect silence midpoints from original file. let silence_mids = detect_silence_midpoints(audio_path).await; // 3. Build silence-snapped chunk boundaries. @@ -325,13 +328,32 @@ async fn process_job( let s1 = ((*chunk_end * 16_000.0) as usize).min(pcm.len()); let chunk_pcm = pcm[s0..s1].to_vec(); - // Scale chunk's 0-100 progress into the job's 0-100 range. + // Base percent this chunk starts at. let base = (ci * 100 / n) as u8; let span = (100usize / n).max(1) as u8; + + // Emit a progress event and persist it at the start of every chunk. + let _ = progress_tx.send(ProgressEvent::Progress { + percent: base, + chunk: ci + 1, + total: n, + }); + let mut snapshot = job.clone(); + snapshot.progress = base; + if let Err(e) = storage.save(&snapshot).await { + tracing::warn!(error = %e, "failed to persist mid-job progress"); + } + + // Scale whisper's per-chunk 0–100 into the job's overall range. let tx = progress_tx.clone(); + let chunk_num = ci + 1; let on_progress = Box::new(move |p: u8| { let overall = base.saturating_add(p.saturating_mul(span) / 100); - let _ = tx.send(ProgressEvent::Progress(overall)); + let _ = tx.send(ProgressEvent::Progress { + percent: overall, + chunk: chunk_num, + total: n, + }); }); let (reply_tx, reply_rx) = oneshot::channel(); @@ -377,7 +399,7 @@ async fn process_job( seg.index = i as i32; } - let _ = progress_tx.send(ProgressEvent::Progress(100)); + let _ = progress_tx.send(ProgressEvent::Progress { percent: 100, chunk: n, total: n }); Ok((all_segments, language, total_secs)) }