use std::sync::atomic::Ordering; use std::pin::Pin; use axum::{ extract::{Multipart, Path, State}, http::StatusCode, response::{ sse::{Event, KeepAlive, Sse}, IntoResponse, }, Json, }; use chrono::Utc; use futures::stream::{self, Stream, StreamExt}; use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; use uuid::Uuid; use crate::{ models::{Job, JobId, JobStatus, SubmitResponse}, worker::{audio_path_for, ProgressEvent, WorkerCmd}, AppError, AppState, Result, }; type SseStream = Pin> + Send>>; // ── POST /jobs ─────────────────────────────────────────────────────────────── /// Submit an audio file for transcription. /// /// Multipart fields: /// - `audio` (required) – audio file; any format ffmpeg understands; no size limit /// - `language` (optional) – ISO 639-1 code, e.g. "en". Auto-detected when absent. /// - `task` (optional) – "transcribe" (default) or "translate" (→ English) /// - `webhook_url` (optional) – URL to POST the completed job JSON to #[utoipa::path( post, path = "/jobs", tag = "jobs", request_body( content = String, content_type = "multipart/form-data", description = "Multipart form: audio (file), language (opt), task (opt), webhook_url (opt)" ), responses( (status = 202, description = "Job queued", body = SubmitResponse), (status = 400, description = "Bad request"), (status = 500, description = "Server error"), ) )] pub async fn submit_job( State(state): State, mut multipart: Multipart, ) -> Result { let mut language: Option = None; let mut task: String = "transcribe".into(); let mut webhook_url: Option = None; let mut filename: Option = None; let mut audio_saved = false; // Assign ID early so we know where to stream the audio bytes. let id = Uuid::new_v4(); let audio_path = audio_path_for(&id); while let Some(field) = multipart.next_field().await.map_err(|e| { AppError::BadRequest(format!("multipart error: {e}")) })? { let field_name = field.name().unwrap_or("").to_owned(); match field_name.as_str() { "audio" => { use tokio::io::AsyncWriteExt; filename = field.file_name().map(str::to_owned); // Stream directly to disk — avoids holding GB in RAM. let mut file = tokio::fs::File::create(&audio_path).await.map_err(|e| { AppError::Internal(format!("cannot create audio temp file: {e}")) })?; let mut bytes_written: u64 = 0; let mut stream = field; while let Some(chunk) = stream.chunk().await.map_err(|e| { AppError::BadRequest(format!("failed to read audio field: {e}")) })? { file.write_all(&chunk).await.map_err(|e| { AppError::Internal(format!("failed to write audio chunk: {e}")) })?; bytes_written += chunk.len() as u64; } if bytes_written == 0 { return Err(AppError::BadRequest("audio field is empty".into())); } audio_saved = true; } "language" => language = Some(field.text().await.map_err(|e| AppError::BadRequest(e.to_string()))?), "task" => task = field.text().await.map_err(|e| AppError::BadRequest(e.to_string()))?, "webhook_url" => webhook_url = Some(field.text().await.map_err(|e| AppError::BadRequest(e.to_string()))?), _ => {} // ignore unknown fields } } if !audio_saved { return Err(AppError::BadRequest("missing 'audio' field".into())); } if !matches!(task.as_str(), "transcribe" | "translate") { return Err(AppError::BadRequest( "task must be 'transcribe' or 'translate'".into(), )); } // Check model state before accepting the job. let (model_ready, retry_after_secs, state_tag) = { let ms = state.model_state.read().await; let ready = ms.is_ready(); let retry = ms.retry_after_secs(); let tag = ms.tag().to_string(); (ready, retry, tag) }; // Register the webhook URL regardless of model state — so model lifecycle // events are delivered even if the job itself is rejected. if let Some(url) = &webhook_url { state.webhook_registry.lock() .unwrap_or_else(|e| e.into_inner()) .insert(url.clone()); } if !model_ready { // Trigger a load if the model is simply unloaded (not already loading). if state_tag == "unloaded" { let _ = state.cmd_tx.try_send(WorkerCmd::Load); } // Clean up the audio file we already wrote to disk. let _ = tokio::fs::remove_file(&audio_path).await; return Err(AppError::ModelNotReady { state: state_tag, retry_after_secs, }); } let mut job = Job::new(id, task, webhook_url, filename); job.language = language; state.storage.create(&job).await?; // Pre-create the broadcast channel so SSE subscribers don't miss events. state.progress.entry(id).or_insert_with(|| broadcast::channel(64).0); state.queue_depth.fetch_add(1, Ordering::Relaxed); state.job_tx.send(id).map_err(|_| { AppError::Internal("worker channel closed".into()) })?; tracing::info!(job_id = %id, "job queued"); Ok((StatusCode::ACCEPTED, Json(SubmitResponse { job_id: id }))) } // ── GET /jobs/{id} ─────────────────────────────────────────────────────────── /// Poll the status and result of a transcription job. #[utoipa::path( get, path = "/jobs/:id", tag = "jobs", params(("id" = Uuid, Path, description = "Job ID")), responses( (status = 200, description = "Job details", body = Job), (status = 404, description = "Not found"), ) )] pub async fn get_job( State(state): State, Path(id): Path, ) -> Result> { let job = state.storage.get(&id).await?; Ok(Json(job)) } // ── GET /jobs/{id}/stream ──────────────────────────────────────────────────── /// Subscribe to real-time transcription progress via Server-Sent Events. /// /// Events: /// - `progress` — `{ "type": "progress", "percent": 0..100 }` emitted periodically /// - `done` — `{ "type": "done", "job": {...} }` emitted on completion /// - `error` — `{ "type": "error", "message": "..." }` emitted on failure #[utoipa::path( get, path = "/jobs/:id/stream", tag = "jobs", params(("id" = Uuid, Path, description = "Job ID")), responses( (status = 200, description = "SSE stream"), (status = 404, description = "Not found"), ) )] pub async fn stream_job( State(state): State, Path(id): Path, ) -> Result> { // If the job is already finished, return a single done event immediately. let job = state.storage.get(&id).await?; match job.status { JobStatus::Done | JobStatus::Failed | JobStatus::Cancelled => { let payload = serde_json::to_string( &crate::models::SsePayload::Done { job: Box::new(job) } ).unwrap_or_default(); let s: SseStream = Box::pin(stream::once(async move { Ok(Event::default().event("done").data(payload)) })); return Ok(Sse::new(s).keep_alive(KeepAlive::default())); } _ => {} } // Subscribe to live broadcast channel. let rx = state .progress .entry(id) .or_insert_with(|| broadcast::channel(64).0) .subscribe(); let sse_stream: SseStream = Box::pin(BroadcastStream::new(rx).filter_map(|msg| async move { let event = match msg { Ok(ProgressEvent::Progress { percent, chunk, total }) => { let payload = serde_json::to_string( &crate::models::SsePayload::Progress { percent, chunk, chunks_total: total } ).ok()?; Event::default().event("progress").data(payload) } Ok(ProgressEvent::Done(job)) => { let payload = serde_json::to_string( &crate::models::SsePayload::Done { job } ).ok()?; Event::default().event("done").data(payload) } Ok(ProgressEvent::Error(msg)) => { let payload = serde_json::to_string( &crate::models::SsePayload::Error { message: msg } ).ok()?; Event::default().event("error").data(payload) } Err(_) => return None, // lagged / channel closed }; Some(Ok(event)) })); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } // ── DELETE /jobs/{id} ──────────────────────────────────────────────────────── /// Cancel a queued or running job. /// Running jobs are marked cancelled; the worker discards them after the current /// transcription call returns (whisper.cpp does not support mid-inference abort). #[utoipa::path( delete, path = "/jobs/:id", tag = "jobs", params(("id" = Uuid, Path, description = "Job ID")), responses( (status = 200, description = "Job cancelled", body = Job), (status = 404, description = "Not found"), (status = 409, description = "Job already finished"), ) )] pub async fn delete_job( State(state): State, Path(id): Path, ) -> Result> { let mut job = state.storage.get(&id).await?; match job.status { JobStatus::Done | JobStatus::Failed | JobStatus::Cancelled => { return Err(AppError::Conflict(format!( "job {id} is already in terminal state {:?}", job.status ))); } _ => {} } job.status = JobStatus::Cancelled; job.completed_at = Some(Utc::now()); state.storage.save(&job).await?; Ok(Json(job)) }