Complete implementation of fixes for queue processing, SSE connection display, service worker installation, and failing tests. Key Changes: - Fix queue processor startup with proper import and subscription mechanism - Implement centralized API error handling middleware for proper HTTP status codes - Enhance service worker configuration for PWA compliance and reliability - Fix SSE connection display with reactive state management - Add comprehensive test coverage and health check endpoints Results: - All 169 tests now passing (previously 16 failing) - Queue items process immediately from pending to success/error states - Real-time SSE connection status with auto-reconnection logic - Proper PWA functionality with working service worker registration - API endpoints return correct HTTP status codes (400/404/409) instead of 500 errors This resolves the critical issues preventing core app functionality and enables proper production deployment.
219 lines
7.1 KiB
TypeScript
219 lines
7.1 KiB
TypeScript
/**
|
|
* Queue SSE Stream API Endpoint
|
|
*
|
|
* Provides Server-Sent Events stream for real-time queue updates:
|
|
* - GET /api/queue/stream - Stream queue status updates
|
|
*/
|
|
|
|
import { queueManager } from '$lib/server/queue/QueueManager';
|
|
import type { RequestHandler } from './$types';
|
|
import type { QueueStatusUpdate } from '$lib/server/queue/types';
|
|
|
|
/**
|
|
* GET /api/queue/stream - Server-Sent Events stream for queue updates
|
|
*
|
|
* Returns a continuous stream of queue status updates in SSE format.
|
|
* Supports optional query parameters:
|
|
* - ?id={queue-item-id} - Stream updates only for specific item
|
|
* - ?status={status} - Stream updates only for items with specific status
|
|
*
|
|
* SSE Event Format:
|
|
* - event: queue-update
|
|
* - data: JSON string with QueueStatusUpdate object
|
|
*
|
|
* Connection is kept alive until client disconnects.
|
|
*/
|
|
export const GET: RequestHandler = async ({ url, request }) => {
|
|
const searchParams = url.searchParams;
|
|
const itemIdFilter = searchParams.get('id');
|
|
const statusFilter = searchParams.get('status');
|
|
|
|
// Validate status filter if provided
|
|
const validStatuses = ['pending', 'in_progress', 'success', 'unhealthy', 'error'];
|
|
if (statusFilter && !validStatuses.includes(statusFilter)) {
|
|
return new Response(`Invalid status filter. Must be one of: ${validStatuses.join(', ')}`, {
|
|
status: 400,
|
|
headers: { 'Content-Type': 'text/plain' }
|
|
});
|
|
}
|
|
|
|
// Validate item ID filter if provided
|
|
if (itemIdFilter) {
|
|
const uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;
|
|
if (!uuidPattern.test(itemIdFilter)) {
|
|
return new Response('Invalid queue item ID format', {
|
|
status: 400,
|
|
headers: { 'Content-Type': 'text/plain' }
|
|
});
|
|
}
|
|
}
|
|
|
|
// Track stream state to prevent "Controller already closed" errors
|
|
let isClosed = false;
|
|
let unsubscribe: (() => void) | null = null;
|
|
let keepAliveInterval: NodeJS.Timeout | null = null;
|
|
|
|
// Unified cleanup function - prevents double cleanup
|
|
const cleanup = () => {
|
|
if (isClosed) return; // Already cleaned up
|
|
isClosed = true;
|
|
|
|
console.log('[SSE] Cleaning up stream connection');
|
|
|
|
// Unsubscribe from queue updates
|
|
if (unsubscribe) {
|
|
unsubscribe();
|
|
unsubscribe = null;
|
|
}
|
|
|
|
// Clear keep-alive interval
|
|
if (keepAliveInterval) {
|
|
clearInterval(keepAliveInterval);
|
|
keepAliveInterval = null;
|
|
}
|
|
};
|
|
|
|
// Safe enqueue helper - checks stream state before enqueueing
|
|
const safeEnqueue = (controller: ReadableStreamDefaultController, message: string): boolean => {
|
|
if (isClosed) {
|
|
return false; // Stream already closed, don't attempt to enqueue
|
|
}
|
|
|
|
try {
|
|
controller.enqueue(new TextEncoder().encode(message));
|
|
return true;
|
|
} catch (error) {
|
|
// Controller closed or errored - clean up and mark as closed
|
|
console.error('[SSE] Error enqueueing message:', error);
|
|
cleanup();
|
|
return false;
|
|
}
|
|
};
|
|
|
|
// Create SSE response stream
|
|
const stream = new ReadableStream({
|
|
start(controller) {
|
|
console.log('[SSE] Stream started');
|
|
|
|
// Send initial connection message
|
|
const connectionMsg = `event: connection\ndata: {"type":"connection","timestamp":"${new Date().toISOString()}","message":"Connected to queue stream"}\n\n`;
|
|
if (!safeEnqueue(controller, connectionMsg)) {
|
|
return;
|
|
}
|
|
|
|
// Send current queue state as initial data
|
|
try {
|
|
const currentItems = queueManager.getAll();
|
|
let filteredItems = currentItems;
|
|
|
|
// Apply filters
|
|
if (itemIdFilter) {
|
|
filteredItems = currentItems.filter(item => item.id === itemIdFilter);
|
|
}
|
|
if (statusFilter) {
|
|
filteredItems = filteredItems.filter(item => item.status === statusFilter);
|
|
}
|
|
|
|
// Send initial state for each matching item
|
|
for (const item of filteredItems) {
|
|
if (isClosed) break; // Stop if stream was closed
|
|
|
|
const update: QueueStatusUpdate = {
|
|
type: 'status_change',
|
|
itemId: item.id,
|
|
status: item.status,
|
|
timestamp: new Date().toISOString(),
|
|
url: item.url,
|
|
progress: item.phases,
|
|
results: item.results,
|
|
error: item.error
|
|
};
|
|
|
|
const sseMessage = `event: queue-update\ndata: ${JSON.stringify(update)}\n\n`;
|
|
if (!safeEnqueue(controller, sseMessage)) {
|
|
break; // Stop if enqueue failed
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error('[SSE] Error sending initial queue state:', error);
|
|
}
|
|
|
|
// Subscribe to queue updates
|
|
unsubscribe = queueManager.subscribe((update) => {
|
|
if (isClosed) return; // Don't process if already closed
|
|
|
|
// Apply filters
|
|
let shouldSend = true;
|
|
|
|
if (itemIdFilter && update.itemId !== itemIdFilter) {
|
|
shouldSend = false;
|
|
}
|
|
|
|
if (statusFilter && update.status !== statusFilter) {
|
|
shouldSend = false;
|
|
}
|
|
|
|
if (shouldSend) {
|
|
const sseMessage = `event: queue-update\ndata: ${JSON.stringify(update)}\n\n`;
|
|
safeEnqueue(controller, sseMessage);
|
|
}
|
|
});
|
|
|
|
// Keep-alive ping every 30 seconds
|
|
keepAliveInterval = setInterval(() => {
|
|
if (isClosed) {
|
|
// Stop pinging if closed
|
|
if (keepAliveInterval) {
|
|
clearInterval(keepAliveInterval);
|
|
keepAliveInterval = null;
|
|
}
|
|
return;
|
|
}
|
|
|
|
const pingMsg = `event: ping\ndata: {"type":"ping","timestamp":"${new Date().toISOString()}"}\n\n`;
|
|
if (!safeEnqueue(controller, pingMsg)) {
|
|
// Failed to send ping, clear interval
|
|
if (keepAliveInterval) {
|
|
clearInterval(keepAliveInterval);
|
|
keepAliveInterval = null;
|
|
}
|
|
}
|
|
}, 30000);
|
|
|
|
// Handle client disconnect
|
|
request.signal.addEventListener('abort', () => {
|
|
console.log('[SSE] Client disconnected (abort signal)');
|
|
cleanup();
|
|
|
|
// Try to send disconnect message (may fail if already closed)
|
|
const disconnectMsg = `event: disconnect\ndata: {"type":"disconnect","timestamp":"${new Date().toISOString()}","message":"Connection closed"}\n\n`;
|
|
safeEnqueue(controller, disconnectMsg);
|
|
|
|
// Close the controller
|
|
try {
|
|
controller.close();
|
|
} catch (error) {
|
|
// Already closed, ignore
|
|
}
|
|
});
|
|
},
|
|
|
|
cancel() {
|
|
// This is called when the stream is cancelled by the client
|
|
console.log('[SSE] Stream cancelled by client');
|
|
cleanup();
|
|
}
|
|
});
|
|
|
|
return new Response(stream, {
|
|
status: 200,
|
|
headers: {
|
|
'Content-Type': 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
// Connection header omitted - Node.js handles connection management automatically
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Headers': 'Cache-Control',
|
|
'Access-Control-Expose-Headers': 'Content-Type'
|
|
}
|
|
});
|
|
}; |