diff --git a/src/worker.rs b/src/worker.rs index 7215227..6b2a5ce 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -294,6 +294,8 @@ fn try_load_with_polling( Err(e) => { tracing::error!(error = %e, "model load failed with non-recoverable error"); set_state(model_state, ModelState::Unloaded); + broadcast_event(model_event_tx, ModelEvent::ModelUnloaded); + fire_webhooks(webhook_registry, ModelEvent::ModelUnloaded, rt); return None; } } @@ -427,6 +429,7 @@ async fn run( }; if job.status == JobStatus::Cancelled { + let _ = tokio::fs::remove_file(&audio_path_for(&job_id)).await; registry.remove(&job_id); continue; } @@ -447,6 +450,15 @@ async fn run( let _ = tokio::fs::remove_file(&audio_path).await; + // Re-read from storage: the job may have been cancelled via DELETE /jobs/:id + // while process_job() was running. If so, discard the result entirely. + let current_status = storage.get(&job_id).await.map(|j| j.status).ok(); + if current_status == Some(JobStatus::Cancelled) { + tracing::info!(job_id = %job_id, "job cancelled during inference — discarding result"); + registry.remove(&job_id); + continue; + } + match result { Ok((segments, language, duration_secs)) => { job.status = JobStatus::Done; @@ -615,17 +627,20 @@ async fn process_job( let base = (ci * 100 / n) as u8; let span = (100usize / n).max(1) as u8; - let _ = progress_tx.send(ProgressEvent::Progress { - percent: base, - chunk: ci + 1, - total: n, - }); + // Save progress to disk before emitting SSE — polling clients who respond + // immediately to the SSE event will then see consistent state. let mut snapshot = job.clone(); snapshot.progress = base; if let Err(e) = storage.save(&snapshot).await { tracing::warn!(error = %e, "failed to persist mid-job progress"); } + let _ = progress_tx.send(ProgressEvent::Progress { + percent: base, + chunk: ci + 1, + total: n, + }); + let tx = progress_tx.clone(); let chunk_num = ci + 1; let on_progress = Box::new(move |p: u8| {