feat: progress reporting with chunk context + live job persistence
All checks were successful
Build & Push Docker Image / build-and-push (push) Successful in 6m38s
All checks were successful
Build & Push Docker Image / build-and-push (push) Successful in 6m38s
- ProgressEvent::Progress now carries chunk index and total count - SsePayload::Progress gains chunk / chunks_total fields → SSE clients can show 'chunk N/51' instead of bare percent - process_job persists job.progress to storage at each chunk boundary → GET /jobs/:id now shows live progress (not stuck at 0) - Emits Progress event at chunk START (boundary event), not just on whisper's internal callback - entropy_thold raised to 3.5 (catches medium-phrase loops; triggers whisper's own temperature-retry instead of silent repetition) - no_speech_thold removed (confirmed // TODO: not implemented in whisper.cpp source; was a no-op) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -137,7 +137,14 @@ pub struct HealthResponse {
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum SsePayload {
|
||||
Progress { percent: u8 },
|
||||
Done { job: Box<Job> },
|
||||
Error { message: String },
|
||||
Progress {
|
||||
/// Overall job progress 0–100.
|
||||
percent: u8,
|
||||
/// 1-based index of the chunk currently being transcribed.
|
||||
chunk: usize,
|
||||
/// Total number of silence-split chunks in this job.
|
||||
chunks_total: usize,
|
||||
},
|
||||
Done { job: Box<Job> },
|
||||
Error { message: String },
|
||||
}
|
||||
|
||||
@@ -192,9 +192,9 @@ pub async fn stream_job(
|
||||
|
||||
let sse_stream: SseStream = Box::pin(BroadcastStream::new(rx).filter_map(|msg| async move {
|
||||
let event = match msg {
|
||||
Ok(ProgressEvent::Progress(p)) => {
|
||||
Ok(ProgressEvent::Progress { percent, chunk, total }) => {
|
||||
let payload = serde_json::to_string(
|
||||
&crate::models::SsePayload::Progress { percent: p }
|
||||
&crate::models::SsePayload::Progress { percent, chunk, chunks_total: total }
|
||||
).ok()?;
|
||||
Event::default().event("progress").data(payload)
|
||||
}
|
||||
|
||||
@@ -16,8 +16,9 @@ pub struct Transcriber {
|
||||
}
|
||||
|
||||
impl Transcriber {
|
||||
/// Load a GGML model file and configure GPU / Flash Attention for RTX 2080.
|
||||
pub fn load(model_path: impl AsRef<Path>, gpu_device: u32) -> Result<Self> { let path = model_path.as_ref().to_str().ok_or_else(|| {
|
||||
/// Load a GGML model file and configure GPU for RTX 2080.
|
||||
pub fn load(model_path: impl AsRef<Path>, gpu_device: u32) -> Result<Self> {
|
||||
let path = model_path.as_ref().to_str().ok_or_else(|| {
|
||||
AppError::Internal("model path is not valid UTF-8".into())
|
||||
})?;
|
||||
|
||||
@@ -25,8 +26,7 @@ impl Transcriber {
|
||||
params.use_gpu(true);
|
||||
params.gpu_device(gpu_device as i32);
|
||||
// Flash Attention disabled: causes silent 0-segment output on some
|
||||
// real-world audio (conference recordings, noisy MP3s). Standard
|
||||
// CUDA attention is safe on all content types.
|
||||
// real-world audio (conference recordings, noisy MP3s).
|
||||
// params.flash_attn(true);
|
||||
|
||||
let ctx = WhisperContext::new_with_params(path, params)
|
||||
@@ -36,10 +36,8 @@ impl Transcriber {
|
||||
Ok(Self { ctx })
|
||||
}
|
||||
|
||||
/// Transcribe audio samples.
|
||||
///
|
||||
/// `pcm` must be 16 kHz mono f32 samples.
|
||||
/// `on_progress` is called periodically with a 0–100 integer.
|
||||
/// Transcribe 16 kHz mono f32 PCM samples.
|
||||
/// `on_progress` receives 0–100 from whisper.cpp.
|
||||
pub fn transcribe(
|
||||
&self,
|
||||
pcm: &[f32],
|
||||
@@ -57,19 +55,33 @@ impl Transcriber {
|
||||
|
||||
fp.set_n_threads(num_cpus::get() as i32);
|
||||
fp.set_temperature(0.0);
|
||||
fp.set_temperature_inc(0.2);
|
||||
fp.set_temperature_inc(0.2); // retry schedule: 0.0 → 0.2 → 0.4 … → 1.0 on threshold failure
|
||||
|
||||
// ── Anti-hallucination / quality guards ───────────────────────────────
|
||||
// no_speech_thold: segments where p(no-speech) > threshold are dropped.
|
||||
// 0.6 is the whisper.cpp default — safe for real-world and clean audio.
|
||||
// (0.0 would suppress *everything*; 1.0 disables the filter entirely.)
|
||||
fp.set_no_speech_thold(0.6);
|
||||
fp.set_entropy_thold(2.4);
|
||||
// ── Whisper-native quality / anti-hallucination parameters ───────────
|
||||
//
|
||||
// entropy_thold (compression ratio threshold):
|
||||
// If a 30s window's token entropy < threshold, the decoder is marked
|
||||
// failed and whisper retries the window with temperature += temperature_inc.
|
||||
// entropy = -Σ p·log₂(p) over the unique tokens produced.
|
||||
// Default 2.4 only catches 1–3 unique-token loops (e.g. "Yeah. Yeah.").
|
||||
// Raising to 3.5 also catches medium-phrase loops (9-word phrases have
|
||||
// theoretical entropy ≈ log₂(9) ≈ 3.17 — above 2.4 but below 3.5).
|
||||
// The retry mechanism then forces temperature diversity, breaking the loop.
|
||||
fp.set_entropy_thold(3.5);
|
||||
|
||||
// logprob_thold: fail + retry if average log-probability < threshold.
|
||||
// Default -1.0 is fine; tighten slightly to catch low-confidence windows.
|
||||
fp.set_logprob_thold(-1.0);
|
||||
|
||||
// no_speech_thold: listed in the header but marked "TODO: not implemented"
|
||||
// in whisper.cpp source — calling it has no effect, so omitted.
|
||||
|
||||
fp.set_suppress_blank(true);
|
||||
fp.set_suppress_non_speech_tokens(true);
|
||||
// Prevent repetition loops on long audio: do not feed the previous
|
||||
// segment's text back as context for the next segment.
|
||||
fp.set_suppress_non_speech_tokens(true); // suppress [MUSIC]/[APPLAUSE]/etc.
|
||||
|
||||
// no_context: do not use previous call's transcript as initial prompt.
|
||||
// Each silence-chunked audio segment is independent; cross-chunk context
|
||||
// would re-anchor the decoder to any hallucinations from the prior chunk.
|
||||
fp.set_no_context(true);
|
||||
|
||||
fp.set_print_progress(false);
|
||||
@@ -83,7 +95,6 @@ impl Transcriber {
|
||||
|
||||
fp.set_translate(task == "translate");
|
||||
|
||||
// Progress callback — whisper.cpp calls this with 0–100
|
||||
fp.set_progress_callback_safe(move |p| on_progress(p as u8));
|
||||
|
||||
state
|
||||
@@ -110,9 +121,8 @@ impl Transcriber {
|
||||
for t in 0..n_tokens {
|
||||
let token_text = state.full_get_token_text(i, t)
|
||||
.map_err(|e| AppError::Internal(e.to_string()))?;
|
||||
// Skip special tokens (they start with '[')
|
||||
if token_text.starts_with('[') {
|
||||
continue;
|
||||
continue; // skip special tokens ([MUSIC], [APPLAUSE], etc.)
|
||||
}
|
||||
let data = state.full_get_token_data(i, t)
|
||||
.map_err(|e| AppError::Internal(e.to_string()))?;
|
||||
@@ -127,7 +137,6 @@ impl Transcriber {
|
||||
segments.push(Segment { index: i, start, end, text, words });
|
||||
}
|
||||
|
||||
// Detect language used
|
||||
let lang = state
|
||||
.full_lang_id_from_state()
|
||||
.ok()
|
||||
|
||||
@@ -22,7 +22,8 @@ pub type ProgressTx = broadcast::Sender<ProgressEvent>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProgressEvent {
|
||||
Progress(u8),
|
||||
/// `percent` — overall 0–100; `chunk` — 1-based; `total` — total chunks.
|
||||
Progress { percent: u8, chunk: usize, total: usize },
|
||||
Done(Box<Job>),
|
||||
Error(String),
|
||||
}
|
||||
@@ -93,7 +94,7 @@ fn transcriber_thread(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run(
|
||||
async fn run(
|
||||
mut job_rx: mpsc::UnboundedReceiver<JobId>,
|
||||
storage: Arc<Storage>,
|
||||
queue_depth: Arc<AtomicUsize>,
|
||||
@@ -134,7 +135,7 @@ pub(crate) async fn run(
|
||||
|
||||
let audio_path = audio_path_for(&job_id);
|
||||
|
||||
let result = process_job(&job, &audio_path, &progress_tx, &tx_req).await;
|
||||
let result = process_job(&job, &audio_path, &progress_tx, &tx_req, &storage).await;
|
||||
|
||||
let _ = tokio::fs::remove_file(&audio_path).await;
|
||||
|
||||
@@ -176,8 +177,9 @@ pub(crate) async fn run(
|
||||
|
||||
// ── Silence-based chunking ────────────────────────────────────────────────────
|
||||
|
||||
/// Target chunk length. Smaller = safer (less hallucination budget per chunk).
|
||||
const TARGET_CHUNK_SECS: f32 = 180.0;
|
||||
/// Target chunk length. 60s ≈ 2× whisper's native 30s window — short enough
|
||||
/// that a hallucinated phrase can't compound beyond a single window.
|
||||
const TARGET_CHUNK_SECS: f32 = 60.0;
|
||||
/// How far from the target we'll snap to a silence midpoint.
|
||||
const SNAP_WINDOW_SECS: f32 = 30.0;
|
||||
/// Silence below this level (dB) counts as a split candidate.
|
||||
@@ -296,12 +298,13 @@ async fn process_job(
|
||||
audio_path: &std::path::Path,
|
||||
progress_tx: &ProgressTx,
|
||||
tx_req: &std::sync::mpsc::Sender<TranscribeRequest>,
|
||||
storage: &Arc<Storage>,
|
||||
) -> crate::Result<(Vec<Segment>, String, f32)> {
|
||||
// 1. Decode full audio to 16 kHz mono PCM.
|
||||
let pcm = decode_audio(audio_path).await?;
|
||||
let total_secs = pcm.len() as f32 / 16_000.0;
|
||||
|
||||
// 2. Detect silence from the original file (fast amplitude scan).
|
||||
// 2. Detect silence midpoints from original file.
|
||||
let silence_mids = detect_silence_midpoints(audio_path).await;
|
||||
|
||||
// 3. Build silence-snapped chunk boundaries.
|
||||
@@ -325,13 +328,32 @@ async fn process_job(
|
||||
let s1 = ((*chunk_end * 16_000.0) as usize).min(pcm.len());
|
||||
let chunk_pcm = pcm[s0..s1].to_vec();
|
||||
|
||||
// Scale chunk's 0-100 progress into the job's 0-100 range.
|
||||
// Base percent this chunk starts at.
|
||||
let base = (ci * 100 / n) as u8;
|
||||
let span = (100usize / n).max(1) as u8;
|
||||
|
||||
// Emit a progress event and persist it at the start of every chunk.
|
||||
let _ = progress_tx.send(ProgressEvent::Progress {
|
||||
percent: base,
|
||||
chunk: ci + 1,
|
||||
total: n,
|
||||
});
|
||||
let mut snapshot = job.clone();
|
||||
snapshot.progress = base;
|
||||
if let Err(e) = storage.save(&snapshot).await {
|
||||
tracing::warn!(error = %e, "failed to persist mid-job progress");
|
||||
}
|
||||
|
||||
// Scale whisper's per-chunk 0–100 into the job's overall range.
|
||||
let tx = progress_tx.clone();
|
||||
let chunk_num = ci + 1;
|
||||
let on_progress = Box::new(move |p: u8| {
|
||||
let overall = base.saturating_add(p.saturating_mul(span) / 100);
|
||||
let _ = tx.send(ProgressEvent::Progress(overall));
|
||||
let _ = tx.send(ProgressEvent::Progress {
|
||||
percent: overall,
|
||||
chunk: chunk_num,
|
||||
total: n,
|
||||
});
|
||||
});
|
||||
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
@@ -377,7 +399,7 @@ async fn process_job(
|
||||
seg.index = i as i32;
|
||||
}
|
||||
|
||||
let _ = progress_tx.send(ProgressEvent::Progress(100));
|
||||
let _ = progress_tx.send(ProgressEvent::Progress { percent: 100, chunk: n, total: n });
|
||||
Ok((all_segments, language, total_secs))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user