/** * Queue SSE Stream API Endpoint * * Provides Server-Sent Events stream for real-time queue updates: * - GET /api/queue/stream - Stream queue status updates */ import { queueManager } from '$lib/server/queue/QueueManager'; import type { RequestHandler } from './$types'; import type { QueueStatusUpdate } from '$lib/server/queue/types'; /** * GET /api/queue/stream - Server-Sent Events stream for queue updates * * Returns a continuous stream of queue status updates in SSE format. * Supports optional query parameters: * - ?id={queue-item-id} - Stream updates only for specific item * - ?status={status} - Stream updates only for items with specific status * * SSE Event Format: * - event: queue-update * - data: JSON string with QueueStatusUpdate object * * Connection is kept alive until client disconnects. */ export const GET: RequestHandler = async ({ url, request }) => { const searchParams = url.searchParams; const itemIdFilter = searchParams.get('id'); const statusFilter = searchParams.get('status'); // Validate status filter if provided const validStatuses = ['pending', 'in_progress', 'success', 'unhealthy', 'error']; if (statusFilter && !validStatuses.includes(statusFilter)) { return new Response(`Invalid status filter. Must be one of: ${validStatuses.join(', ')}`, { status: 400, headers: { 'Content-Type': 'text/plain' } }); } // Validate item ID filter if provided if (itemIdFilter) { const uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; if (!uuidPattern.test(itemIdFilter)) { return new Response('Invalid queue item ID format', { status: 400, headers: { 'Content-Type': 'text/plain' } }); } } // Track stream state to prevent "Controller already closed" errors let isClosed = false; let unsubscribe: (() => void) | null = null; let keepAliveInterval: NodeJS.Timeout | null = null; // Unified cleanup function - prevents double cleanup const cleanup = () => { if (isClosed) return; // Already cleaned up isClosed = true; console.log('[SSE] Cleaning up stream connection'); // Unsubscribe from queue updates if (unsubscribe) { unsubscribe(); unsubscribe = null; } // Clear keep-alive interval if (keepAliveInterval) { clearInterval(keepAliveInterval); keepAliveInterval = null; } }; // Safe enqueue helper - checks stream state before enqueueing const safeEnqueue = (controller: ReadableStreamDefaultController, message: string): boolean => { if (isClosed) { return false; // Stream already closed, don't attempt to enqueue } try { controller.enqueue(new TextEncoder().encode(message)); return true; } catch (error) { // Controller closed or errored - clean up and mark as closed console.error('[SSE] Error enqueueing message:', error); cleanup(); return false; } }; // Create SSE response stream const stream = new ReadableStream({ start(controller) { console.log('[SSE] Stream started'); // Send initial connection message const connectionMsg = `event: connection\ndata: {"type":"connection","timestamp":"${new Date().toISOString()}","message":"Connected to queue stream"}\n\n`; if (!safeEnqueue(controller, connectionMsg)) { return; } // Send current queue state as initial data try { const currentItems = queueManager.getAll(); let filteredItems = currentItems; // Apply filters if (itemIdFilter) { filteredItems = currentItems.filter(item => item.id === itemIdFilter); } if (statusFilter) { filteredItems = filteredItems.filter(item => item.status === statusFilter); } // Send initial state for each matching item for (const item of filteredItems) { if (isClosed) break; // Stop if stream was closed const update: QueueStatusUpdate = { type: 'status_change', itemId: item.id, status: item.status, timestamp: new Date().toISOString(), url: item.url, progress: item.phases, results: item.results, error: item.error }; const sseMessage = `event: queue-update\ndata: ${JSON.stringify(update)}\n\n`; if (!safeEnqueue(controller, sseMessage)) { break; // Stop if enqueue failed } } } catch (error) { console.error('[SSE] Error sending initial queue state:', error); } // Subscribe to queue updates unsubscribe = queueManager.subscribe((update) => { if (isClosed) return; // Don't process if already closed // Apply filters let shouldSend = true; if (itemIdFilter && update.itemId !== itemIdFilter) { shouldSend = false; } if (statusFilter && update.status !== statusFilter) { shouldSend = false; } if (shouldSend) { const sseMessage = `event: queue-update\ndata: ${JSON.stringify(update)}\n\n`; safeEnqueue(controller, sseMessage); } }); // Keep-alive ping every 30 seconds keepAliveInterval = setInterval(() => { if (isClosed) { // Stop pinging if closed if (keepAliveInterval) { clearInterval(keepAliveInterval); keepAliveInterval = null; } return; } const pingMsg = `event: ping\ndata: {"type":"ping","timestamp":"${new Date().toISOString()}"}\n\n`; if (!safeEnqueue(controller, pingMsg)) { // Failed to send ping, clear interval if (keepAliveInterval) { clearInterval(keepAliveInterval); keepAliveInterval = null; } } }, 30000); // Handle client disconnect request.signal.addEventListener('abort', () => { console.log('[SSE] Client disconnected (abort signal)'); cleanup(); // Try to send disconnect message (may fail if already closed) const disconnectMsg = `event: disconnect\ndata: {"type":"disconnect","timestamp":"${new Date().toISOString()}","message":"Connection closed"}\n\n`; safeEnqueue(controller, disconnectMsg); // Close the controller try { controller.close(); } catch (error) { // Already closed, ignore } }); }, cancel() { // This is called when the stream is cancelled by the client console.log('[SSE] Stream cancelled by client'); cleanup(); } }); return new Response(stream, { status: 200, headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', // Connection header omitted - Node.js handles connection management automatically 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control', 'Access-Control-Expose-Headers': 'Content-Type' } }); };