import { createJob, updateJob, setJobStatus, getJob, resetJob } from './db.js'; import { downloadYouTube, saveUploadedFile, cleanupJobTmp } from './downloader.js'; import { prepareAudio, cleanup as cleanupFiles } from './audio.js'; import { submitJob, streamJob } from './whisper.js'; import { ensureWhisperRunning } from './docker.js'; import type { AudioMode, Segment } from '$lib/types.js'; const WEBHOOK_BASE_URL = process.env.WEBHOOK_BASE_URL ?? 'http://localhost:3000'; /** Progress listeners: jobId → set of callbacks */ const progressListeners = new Map void>>(); export function subscribeProgress(jobId: string, cb: (data: string) => void): () => void { if (!progressListeners.has(jobId)) progressListeners.set(jobId, new Set()); progressListeners.get(jobId)!.add(cb); return () => progressListeners.get(jobId)?.delete(cb); } export function emitProgress(jobId: string, payload: object) { const listeners = progressListeners.get(jobId); if (!listeners) return; const data = JSON.stringify(payload); for (const cb of listeners) cb(data); } /** Start a transcription job for a YouTube URL. Runs async — returns immediately. */ export async function startYouTubeJob( url: string, audioMode: AudioMode = 'auto', language?: string ): Promise { const job = createJob(url, 'Downloading…', audioMode); runJob(job.id, { type: 'youtube', url }, audioMode, language).catch((err) => { console.error(`[pipeline] job ${job.id} failed:`, err); }); return job.id; } /** Start a transcription job for an uploaded file. Runs async — returns immediately. */ export async function startUploadJob( buffer: Buffer, filename: string, audioMode: AudioMode = 'auto', language?: string ): Promise { const job = createJob(filename, filename, audioMode); runJob(job.id, { type: 'upload', buffer, filename }, audioMode, language).catch((err) => { console.error(`[pipeline] job ${job.id} failed:`, err); }); return job.id; } /** Retry a failed/cancelled YouTube job by resetting and re-running the pipeline. */ export async function retryJob(jobId: string): Promise { const job = getJob(jobId); if (!job) throw new Error('Job not found'); resetJob(jobId); runJob(jobId, { type: 'youtube', url: job.source }, job.audioMode as AudioMode).catch((err) => { console.error(`[pipeline] retry job ${jobId} failed:`, err); }); } async function runJob( jobId: string, input: { type: 'youtube'; url: string } | { type: 'upload'; buffer: Buffer; filename: string }, audioMode: AudioMode, language?: string ) { let rawAudioPath: string | null = null; let wavPath: string | null = null; try { // ── 1. Download / save input ────────────────────────────────────────── setJobStatus(jobId, 'downloading', 0); emitProgress(jobId, { type: 'status', status: 'downloading' }); let title = 'Untitled'; let captionSegments: Segment[] | null = null; if (input.type === 'youtube') { const result = await downloadYouTube(input.url, jobId); if (result.type === 'captions') { // Fast path — use captions directly title = result.title; captionSegments = result.segments; } else { rawAudioPath = result.audioPath; title = result.title; } } else { rawAudioPath = await saveUploadedFile(input.buffer, input.filename, jobId); title = input.filename.replace(/\.[^.]+$/, ''); } updateJob({ id: jobId, title }); if (captionSegments) { // Caption fast path — skip whisper const { deduplicateSegments } = await import('./postprocess.js'); const { writeOutputs } = await import('./formatter.js'); const segments = deduplicateSegments(captionSegments); const paths = await writeOutputs(segments, title, jobId); updateJob({ id: jobId, status: 'done', progress: 100, segmentsJson: JSON.stringify(segments), outputDir: paths.srt.replace(/\/[^/]+$/, '') }); emitProgress(jobId, { type: 'done' }); const { sendNotification } = await import('./push.js'); await sendNotification(jobId, '✅ Transcript ready', title); await cleanupJobTmp(jobId); return; } // ── 2. Prepare audio ───────────────────────────────────────────────── setJobStatus(jobId, 'preparing', 5); emitProgress(jobId, { type: 'status', status: 'preparing' }); const { wavPath: wp, analysis } = await prepareAudio(rawAudioPath!, jobId, audioMode); wavPath = wp; updateJob({ id: jobId, meanVolume: analysis.meanVolume }); // ── 3. Ensure whisper is running ────────────────────────────────────── await ensureWhisperRunning(); // ── 4. Submit to whisper with webhook ──────────────────────────────── setJobStatus(jobId, 'transcribing', 10); emitProgress(jobId, { type: 'status', status: 'transcribing' }); const webhookUrl = `${WEBHOOK_BASE_URL}/api/webhook/${jobId}`; const whisperJobId = await submitJob(wavPath, webhookUrl, language); updateJob({ id: jobId, whisperJobId }); // ── 5. Open SSE for live progress (non-blocking relay) ─────────────── streamJob( whisperJobId, (percent, chunk, total) => { const progress = 10 + Math.round(percent * 0.8); setJobStatus(jobId, 'transcribing', progress); emitProgress(jobId, { type: 'progress', percent, chunk, total, progress }); }, () => { /* webhook will handle completion */ }, (msg) => { setJobStatus(jobId, 'failed', 0); updateJob({ id: jobId, error: msg }); emitProgress(jobId, { type: 'error', message: msg }); } ).catch((err) => console.warn('[pipeline] SSE relay error:', err)); // Clean up wav after submitting (webhook handles the rest) await cleanupFiles(wavPath); wavPath = null; } catch (err: unknown) { const message = err instanceof Error ? err.message : String(err); updateJob({ id: jobId, status: 'failed', error: message }); emitProgress(jobId, { type: 'error', message }); if (rawAudioPath) await cleanupFiles(rawAudioPath).catch(() => {}); if (wavPath) await cleanupFiles(wavPath).catch(() => {}); await cleanupJobTmp(jobId); } }