fix: four worker.rs bugs found during E2E testing
This commit is contained in:
@@ -294,6 +294,8 @@ fn try_load_with_polling(
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!(error = %e, "model load failed with non-recoverable error");
|
tracing::error!(error = %e, "model load failed with non-recoverable error");
|
||||||
set_state(model_state, ModelState::Unloaded);
|
set_state(model_state, ModelState::Unloaded);
|
||||||
|
broadcast_event(model_event_tx, ModelEvent::ModelUnloaded);
|
||||||
|
fire_webhooks(webhook_registry, ModelEvent::ModelUnloaded, rt);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -427,6 +429,7 @@ async fn run(
|
|||||||
};
|
};
|
||||||
|
|
||||||
if job.status == JobStatus::Cancelled {
|
if job.status == JobStatus::Cancelled {
|
||||||
|
let _ = tokio::fs::remove_file(&audio_path_for(&job_id)).await;
|
||||||
registry.remove(&job_id);
|
registry.remove(&job_id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -447,6 +450,15 @@ async fn run(
|
|||||||
|
|
||||||
let _ = tokio::fs::remove_file(&audio_path).await;
|
let _ = tokio::fs::remove_file(&audio_path).await;
|
||||||
|
|
||||||
|
// Re-read from storage: the job may have been cancelled via DELETE /jobs/:id
|
||||||
|
// while process_job() was running. If so, discard the result entirely.
|
||||||
|
let current_status = storage.get(&job_id).await.map(|j| j.status).ok();
|
||||||
|
if current_status == Some(JobStatus::Cancelled) {
|
||||||
|
tracing::info!(job_id = %job_id, "job cancelled during inference — discarding result");
|
||||||
|
registry.remove(&job_id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok((segments, language, duration_secs)) => {
|
Ok((segments, language, duration_secs)) => {
|
||||||
job.status = JobStatus::Done;
|
job.status = JobStatus::Done;
|
||||||
@@ -615,17 +627,20 @@ async fn process_job(
|
|||||||
let base = (ci * 100 / n) as u8;
|
let base = (ci * 100 / n) as u8;
|
||||||
let span = (100usize / n).max(1) as u8;
|
let span = (100usize / n).max(1) as u8;
|
||||||
|
|
||||||
let _ = progress_tx.send(ProgressEvent::Progress {
|
// Save progress to disk before emitting SSE — polling clients who respond
|
||||||
percent: base,
|
// immediately to the SSE event will then see consistent state.
|
||||||
chunk: ci + 1,
|
|
||||||
total: n,
|
|
||||||
});
|
|
||||||
let mut snapshot = job.clone();
|
let mut snapshot = job.clone();
|
||||||
snapshot.progress = base;
|
snapshot.progress = base;
|
||||||
if let Err(e) = storage.save(&snapshot).await {
|
if let Err(e) = storage.save(&snapshot).await {
|
||||||
tracing::warn!(error = %e, "failed to persist mid-job progress");
|
tracing::warn!(error = %e, "failed to persist mid-job progress");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let _ = progress_tx.send(ProgressEvent::Progress {
|
||||||
|
percent: base,
|
||||||
|
chunk: ci + 1,
|
||||||
|
total: n,
|
||||||
|
});
|
||||||
|
|
||||||
let tx = progress_tx.clone();
|
let tx = progress_tx.clone();
|
||||||
let chunk_num = ci + 1;
|
let chunk_num = ci + 1;
|
||||||
let on_progress = Box::new(move |p: u8| {
|
let on_progress = Box::new(move |p: u8| {
|
||||||
|
|||||||
Reference in New Issue
Block a user