fix(ssr): resolve EventSource SSR violations and implement best practices

- Fix EventSource is not defined error in queue dashboard
- Add browser guards for all EventSource usage
- Replace static constants (EventSource.OPEN/CLOSED) with numeric values
- Fix setInterval SSR violation in LLM health indicator
- Replace $effect anti-pattern with onMount in share page
- Add comprehensive SvelteKit SSR best practices documentation
- Add SSR audit and testing verification

All changes follow SvelteKit best practices and are verified against
official documentation. Production build succeeds with no SSR errors.

Closes: FixEventSourceSSR
See: docs/outcomes/FixEventSourceSSR.md
This commit is contained in:
Giancarmine Salucci
2025-12-22 03:00:29 +01:00
parent 35d6f6e40a
commit 8545744bb1
47 changed files with 12827 additions and 363 deletions

View File

@@ -0,0 +1,219 @@
/**
* Push Notification Service for InstaRecipe Queue System
*
* Handles web push notifications for background processing updates
* when users are not actively viewing the application.
*/
import { queueConfig } from '../queue/config';
interface PushSubscription {
endpoint: string;
keys: {
p256dh: string;
auth: string;
};
}
interface NotificationPayload {
title?: string;
body: string;
type: 'success' | 'error' | 'progress';
itemId: string;
recipeName?: string;
tag?: string;
requireInteraction?: boolean;
analytics?: any;
}
class PushNotificationService {
private subscriptions = new Map<string, PushSubscription>();
private vapidKeys: { publicKey: string; privateKey: string } | null = null;
constructor() {
this.loadVapidKeys();
}
/**
* Load VAPID keys for push notifications
* In production, these should be stored securely and loaded from environment
*/
private loadVapidKeys() {
// Load from config module which uses SvelteKit's $env/dynamic/private
this.vapidKeys = {
publicKey: queueConfig.push.vapidPublicKey,
privateKey: queueConfig.push.vapidPrivateKey
};
}
/**
* Get the public VAPID key for client-side subscription
*/
getPublicVapidKey(): string | null {
return this.vapidKeys?.publicKey || null;
}
/**
* Subscribe a client to push notifications
*/
async subscribe(clientId: string, subscription: PushSubscription): Promise<void> {
console.log(`[PushService] Subscribing client ${clientId}`);
this.subscriptions.set(clientId, subscription);
// In production, store subscriptions in database
// For development, we'll keep them in memory
}
/**
* Unsubscribe a client from push notifications
*/
async unsubscribe(clientId: string): Promise<void> {
console.log(`[PushService] Unsubscribing client ${clientId}`);
this.subscriptions.delete(clientId);
}
/**
* Send notification to all subscribed clients
*/
async sendNotification(payload: NotificationPayload): Promise<void> {
if (this.subscriptions.size === 0) {
console.log('[PushService] No subscriptions, skipping notification');
return;
}
console.log(`[PushService] Sending notification to ${this.subscriptions.size} subscribers`);
console.log(`[PushService] Notification payload:`, payload);
// In a real implementation, this would use web-push library
// For development/demo purposes, we'll simulate the notification
const notificationData = {
...payload,
timestamp: new Date().toISOString()
};
for (const [clientId, subscription] of this.subscriptions) {
try {
await this.sendToSubscription(subscription, notificationData);
console.log(`[PushService] ✓ Sent notification to client ${clientId}`);
} catch (error) {
console.error(`[PushService] ✗ Failed to send to client ${clientId}:`, error);
// Remove invalid subscriptions
this.subscriptions.delete(clientId);
}
}
}
/**
* Send notification to specific subscription
*/
private async sendToSubscription(subscription: PushSubscription, data: any): Promise<void> {
// In production, use web-push library:
// import webpush from 'web-push';
//
// webpush.setVapidDetails(
// 'mailto:your-email@example.com',
// this.vapidKeys.publicKey,
// this.vapidKeys.privateKey
// );
//
// return webpush.sendNotification(subscription, JSON.stringify(data));
// For development, we'll log the notification
console.log(`[PushService] Would send push notification:`, {
endpoint: subscription.endpoint,
data: data
});
// Simulate network delay
await new Promise(resolve => setTimeout(resolve, 100));
}
/**
* Send success notification when recipe extraction completes
*/
async notifySuccess(itemId: string, recipeName?: string, tandoorUrl?: string): Promise<void> {
const payload: NotificationPayload = {
type: 'success',
itemId,
recipeName,
body: recipeName
? `Recipe "${recipeName}" has been extracted and saved successfully!`
: 'Your recipe extraction is complete and ready to view.',
tag: `recipe-success-${itemId}`,
requireInteraction: true,
analytics: {
event: 'recipe_extraction_complete',
itemId,
timestamp: Date.now()
}
};
if (tandoorUrl) {
payload.body += ' View it in Tandoor.';
}
await this.sendNotification(payload);
}
/**
* Send error notification when recipe extraction fails
*/
async notifyError(itemId: string, error: string): Promise<void> {
const payload: NotificationPayload = {
type: 'error',
itemId,
body: `Recipe extraction failed: ${error}. Tap to retry.`,
tag: `recipe-error-${itemId}`,
requireInteraction: true,
analytics: {
event: 'recipe_extraction_failed',
itemId,
error,
timestamp: Date.now()
}
};
await this.sendNotification(payload);
}
/**
* Send progress notification for long-running extractions
*/
async notifyProgress(itemId: string, phase: string): Promise<void> {
const payload: NotificationPayload = {
type: 'progress',
itemId,
body: `Recipe extraction in progress: ${phase}`,
tag: `recipe-progress-${itemId}`,
requireInteraction: false,
analytics: {
event: 'recipe_extraction_progress',
itemId,
phase,
timestamp: Date.now()
}
};
await this.sendNotification(payload);
}
/**
* Get subscription count for monitoring
*/
getSubscriptionCount(): number {
return this.subscriptions.size;
}
/**
* Clear all subscriptions (for testing/cleanup)
*/
clearAllSubscriptions(): void {
console.log('[PushService] Clearing all subscriptions');
this.subscriptions.clear();
}
}
// Singleton instance
export const pushNotificationService = new PushNotificationService();
export type { PushSubscription, NotificationPayload };

View File

@@ -0,0 +1,442 @@
/**
* Queue Manager - Core queue operations and event management
*
* Manages an in-memory queue of Instagram URL processing jobs.
* Provides CRUD operations and pub/sub mechanism for queue updates.
*
* Architecture: Domain Layer (Hexagonal Architecture)
* - Port: Defines queue operations interface
* - Implementation: In-memory Map-based storage
*/
import { v4 as uuidv4 } from 'uuid';
import { tandoorConfig } from '$lib/server/tandoor-config';
import type { QueueItem, QueueItemStatus, QueueStatusUpdate, QueueUpdateCallback } from './types';
/**
* Singleton queue manager for processing Instagram URLs
*
* Features:
* - FIFO queue with unique IDs
* - Status tracking and updates
* - Progress event accumulation
* - Retry support for failed items
* - Pub/sub for real-time updates
*
* @example
* ```typescript
* import { queueManager } from './QueueManager';
*
* // Add item to queue
* const item = queueManager.enqueue('https://instagram.com/p/abc123');
*
* // Subscribe to updates
* const unsubscribe = queueManager.subscribe((update) => {
* console.log('Item updated:', update);
* });
*
* // Get all items
* const items = queueManager.getAll();
* ```
*/
export class QueueManager {
/** Map of queue items by ID */
private items: Map<string, QueueItem> = new Map();
/** Set of subscriber callbacks */
private subscribers: Set<QueueUpdateCallback> = new Set();
/**
* Add URL to processing queue
*
* @param url - Instagram URL to process
* @returns Newly created queue item
*
* @example
* ```typescript
* const item = queueManager.enqueue('https://instagram.com/p/abc123');
* console.log('Queued with ID:', item.id);
* ```
*/
enqueue(url: string): QueueItem {
const now = new Date().toISOString();
const item: QueueItem = {
id: uuidv4(),
url,
status: 'pending',
enqueuedAt: now,
createdAt: now,
updatedAt: now,
phases: [
{ name: 'extraction', status: 'pending' },
{ name: 'parsing', status: 'pending' },
{ name: 'uploading', status: 'pending' }
],
logs: [],
progressEvents: [],
retryCount: 0,
maxRetries: 3
};
this.items.set(item.id, item);
this.notifySubscribers({
type: 'status_change',
itemId: item.id,
status: 'pending',
url: item.url,
timestamp: now,
progress: item.phases
});
return item;
}
/**
* Get next pending item for processing (FIFO)
*
* Automatically marks the item as in_progress when dequeued.
*
* @returns Next pending item, or null if queue is empty
*
* @example
* ```typescript
* const item = queueManager.dequeue();
* if (item) {
* // Process item
* console.log('Processing:', item.url);
* }
* ```
*/
dequeue(): QueueItem | null {
for (const item of this.items.values()) {
if (item.status === 'pending') {
this.updateStatus(item.id, 'in_progress', { phase: 'extraction' });
return item;
}
}
return null;
}
/**
* Update item status and optional data
*
* Handles status-specific logic:
* - Sets startedAt when transitioning to in_progress
* - Sets completedAt when transitioning to success/error
* - Updates currentPhase for in_progress status
*
* @param itemId - ID of item to update
* @param status - New status
* @param data - Optional additional data to merge into item
*
* @example
* ```typescript
* queueManager.updateStatus(itemId, 'in_progress', {
* phase: 'parsing'
* });
*
* queueManager.updateStatus(itemId, 'success', {
* recipe: parsedRecipe,
* tandoorRecipeId: 123
* });
* ```
*/
updateStatus(
itemId: string,
status: QueueItemStatus,
data?: any
): void {
const item = this.items.get(itemId);
if (!item) return;
const now = new Date().toISOString();
item.status = status;
item.updatedAt = now;
// Update phase progress
if (status === 'in_progress' && data?.phase) {
item.currentPhase = data.phase;
if (!item.startedAt) {
item.startedAt = now;
}
// Update phases array
const phaseIndex = item.phases.findIndex(p => p.name === data.phase);
if (phaseIndex >= 0) {
// Mark previous phases as completed
for (let i = 0; i < phaseIndex; i++) {
if (item.phases[i].status === 'in_progress') {
item.phases[i].status = 'completed';
item.phases[i].completedAt = now;
}
}
// Mark current phase as in progress
item.phases[phaseIndex].status = 'in_progress';
item.phases[phaseIndex].startedAt = now;
}
}
if (status === 'success') {
item.completedAt = now;
// Mark all phases as completed
item.phases.forEach(phase => {
if (phase.status !== 'completed') {
phase.status = 'completed';
phase.completedAt = now;
}
});
}
if (status === 'error' || status === 'unhealthy') {
item.completedAt = now;
// Mark current phase as error
if (item.currentPhase) {
const phaseIndex = item.phases.findIndex(p => p.name === item.currentPhase);
if (phaseIndex >= 0) {
item.phases[phaseIndex].status = 'error';
item.phases[phaseIndex].error = data?.error?.message;
}
}
}
// Wrap results in results object
if (data?.extractedText || data?.thumbnail !== undefined || data?.recipe || data?.tandoorRecipeId) {
if (!item.results) {
item.results = {};
}
if (data.extractedText) {
item.results.extractedText = data.extractedText;
item.extractedText = data.extractedText; // Keep legacy
}
if (data.thumbnail !== undefined) {
item.results.thumbnail = data.thumbnail;
item.thumbnail = data.thumbnail; // Keep legacy
}
if (data.recipe) {
item.results.recipe = data.recipe;
item.recipe = data.recipe; // Keep legacy
}
if (data.tandoorRecipeId) {
item.results.tandoorRecipeId = data.tandoorRecipeId;
item.tandoorRecipeId = data.tandoorRecipeId; // Keep legacy
// Construct Tandoor URL
if (tandoorConfig.serverUrl) {
item.results.tandoorUrl = `${tandoorConfig.serverUrl}/view/recipe/${data.tandoorRecipeId}`;
}
}
}
if (data?.error) {
item.error = data.error;
}
// Notify subscribers with enhanced update
this.notifySubscribers({
type: 'status_change',
itemId,
status,
timestamp: now,
url: item.url,
phase: item.currentPhase,
progress: item.phases,
results: item.results,
error: item.error,
...data
});
}
/**
* Add progress event to item's history
*
* Also extracts message into logs array for easy display.
*
* @param itemId - ID of item
* @param event - Progress event to add
*
* @example
* ```typescript
* queueManager.addProgressEvent(itemId, {
* type: 'status',
* message: 'Extracting from Instagram...',
* timestamp: new Date().toISOString()
* });
* ```
*/
addProgressEvent(itemId: string, event: any): void {
const item = this.items.get(itemId);
if (!item) return;
item.progressEvents.push(event);
item.logs.push(event.message);
this.notifySubscribers({
type: 'progress',
itemId,
status: item.status,
timestamp: new Date().toISOString(),
data: { event }
});
}
/**
* Remove item from queue
*
* @param itemId - ID of item to remove
* @returns true if item was removed, false if not found
*
* @example
* ```typescript
* const removed = queueManager.remove(itemId);
* if (removed) {
* console.log('Item removed successfully');
* }
* ```
*/
remove(itemId: string): boolean {
const deleted = this.items.delete(itemId);
if (deleted) {
this.notifySubscribers({
type: 'status_change',
itemId,
status: 'error', // Use error to signal removal
timestamp: new Date().toISOString(),
data: { removed: true }
});
}
return deleted;
}
/**
* Retry a failed or unhealthy item
*
* Resets item to pending status and clears error state.
* Cannot retry items currently in progress.
*
* @param itemId - ID of item to retry
* @returns true if retry was initiated, false otherwise
*
* @example
* ```typescript
* const retried = queueManager.retry(itemId);
* if (retried) {
* console.log('Item queued for retry');
* } else {
* console.log('Cannot retry (item in progress or not found)');
* }
* ```
*/
retry(itemId: string): boolean {
const item = this.items.get(itemId);
if (!item || item.status === 'in_progress') return false;
item.retryCount++;
item.status = 'pending';
item.currentPhase = undefined;
item.error = undefined;
item.startedAt = undefined;
item.completedAt = undefined;
// Reset phases to pending
item.phases = [
{ name: 'extraction', status: 'pending' },
{ name: 'parsing', status: 'pending' },
{ name: 'uploading', status: 'pending' }
];
this.notifySubscribers({
type: 'status_change',
itemId,
status: 'pending',
timestamp: new Date().toISOString(),
progress: item.phases,
data: { retryCount: item.retryCount }
});
return true;
}
/**
* Get all queue items
*
* @returns Array of all queue items
*
* @example
* ```typescript
* const items = queueManager.getAll();
* console.log(`Queue has ${items.length} items`);
* ```
*/
getAll(): QueueItem[] {
return Array.from(this.items.values());
}
/**
* Get single item by ID
*
* @param itemId - ID of item to retrieve
* @returns Queue item or undefined if not found
*
* @example
* ```typescript
* const item = queueManager.get(itemId);
* if (item) {
* console.log('Status:', item.status);
* }
* ```
*/
get(itemId: string): QueueItem | undefined {
return this.items.get(itemId);
}
/**
* Subscribe to queue updates
*
* Callback will be called whenever any item is updated.
*
* @param callback - Function to call on each update
* @returns Unsubscribe function
*
* @example
* ```typescript
* const unsubscribe = queueManager.subscribe((update) => {
* console.log('Update:', update.itemId, update.status);
* });
*
* // Later...
* unsubscribe();
* ```
*/
subscribe(callback: QueueUpdateCallback): () => void {
this.subscribers.add(callback);
return () => this.subscribers.delete(callback);
}
/**
* Notify all subscribers of an update
*
* Handles errors in individual subscribers to prevent one
* bad subscriber from affecting others.
*
* @param update - Update to broadcast
*/
private notifySubscribers(update: QueueStatusUpdate): void {
for (const callback of this.subscribers) {
try {
callback(update);
} catch (err) {
console.error('[QueueManager] Subscriber error:', err);
}
}
}
}
/**
* Singleton instance of QueueManager
*
* Use this instance throughout the application to ensure
* all components interact with the same queue.
*/
export const queueManager = new QueueManager();

View File

@@ -0,0 +1,425 @@
/**
* Queue Processor - Orchestrates async processing of queue items
*
* Manages concurrent processing of Instagram URLs through three phases:
* 1. Extraction - Browser automation to extract text and thumbnail
* 2. Parsing - LLM-based recipe extraction
* 3. Uploading - Automatic upload to Tandoor (if configured)
*
* Architecture: Domain Layer (Hexagonal Architecture)
* - Domain Logic: Orchestrates processing workflow
* - Uses Ports: extraction.ts, parser.ts, tandoor.ts (secondary adapters)
*/
import { queueManager } from './QueueManager';
import { extractTextAndThumbnail } from '$lib/server/extraction';
import { extractRecipe } from '$lib/server/parser';
import { uploadRecipeWithIngredientsDTO, uploadRecipeImage } from '$lib/server/tandoor';
import { pushNotificationService } from '$lib/server/notifications/PushNotificationService';
import { queueConfig } from './config';
import type { ProgressEvent } from '$lib/server/extraction';
import type { QueueItem } from './types';
/**
* Queue processor with configurable concurrency
*
* Features:
* - Concurrent processing (default: 2 simultaneous items)
* - Three-phase pipeline: extraction → parsing → uploading
* - Error classification (recoverable vs non-recoverable)
* - Progress tracking via QueueManager
* - Automatic start on instantiation
*
* @example
* ```typescript
* import { queueProcessor } from './QueueProcessor';
*
* // Processor auto-starts on import
* // Add items to queue and they'll be processed automatically
*
* // Stop processing (e.g., for maintenance)
* queueProcessor.stop();
*
* // Resume processing
* queueProcessor.start();
* ```
*/
export class QueueProcessor {
/** Whether processor is actively running */
private processing = false;
/** Maximum number of items to process simultaneously */
private concurrency = queueConfig.concurrency;
/** Number of workers currently processing items */
private activeWorkers = 0;
/**
* Start processing queue
*
* Begins dequeuing and processing items up to concurrency limit.
* Safe to call multiple times - will not start duplicates.
*/
start(): void {
if (this.processing) return;
this.processing = true;
console.log(`[QueueProcessor] Started with concurrency ${this.concurrency}`);
this.processNextBatch();
}
/**
* Stop processing queue
*
* Prevents new items from being dequeued.
* Items currently in progress will complete.
*/
stop(): void {
this.processing = false;
console.log('[QueueProcessor] Stopped');
}
/**
* Process items up to concurrency limit
*
* Dequeues pending items and starts processing them.
* Automatically called recursively to maintain worker pool.
*/
private async processNextBatch(): Promise<void> {
if (!this.processing) return;
// Start new workers up to concurrency limit
while (this.activeWorkers < this.concurrency) {
const item = queueManager.dequeue();
if (!item) break;
this.activeWorkers++;
console.log(`[QueueProcessor] Starting item ${item.id} (${this.activeWorkers}/${this.concurrency} active)`);
this.processItem(item)
.finally(() => {
this.activeWorkers--;
console.log(`[QueueProcessor] Finished item ${item.id} (${this.activeWorkers}/${this.concurrency} active)`);
// Try to process next item
setTimeout(() => this.processNextBatch(), 0);
});
}
// Check again after delay if still processing
if (this.processing) {
setTimeout(() => this.processNextBatch(), 1000);
}
}
/**
* Process a single queue item through all phases
*
* Executes three phases sequentially:
* 1. Extraction - Extract content from Instagram
* 2. Parsing - Parse recipe from extracted text
* 3. Uploading - Upload to Tandoor (if configured)
*
* On success: marks item as 'success'
* On error: marks item as 'unhealthy' (recoverable) or 'error' (non-recoverable)
*
* @param item - Queue item to process
*/
private async processItem(item: QueueItem): Promise<void> {
try {
console.log(`[QueueProcessor] Processing ${item.url}`);
// Phase 1: Extraction
await this.extractionPhase(item);
// Phase 2: Parsing
await this.parsingPhase(item);
// Phase 3: Tandoor Upload (if enabled)
await this.uploadPhase(item);
// Success
queueManager.updateStatus(item.id, 'success');
console.log(`[QueueProcessor] ✓ Success: ${item.id}`);
// Send push notification
await this.sendPushNotification(item, 'success');
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
const recoverable = this.isRecoverableError(error);
console.error(`[QueueProcessor] ${recoverable ? 'Unhealthy' : 'Error'}: ${item.id}`, errorMsg);
queueManager.updateStatus(item.id, recoverable ? 'unhealthy' : 'error', {
error: {
phase: item.currentPhase || 'extraction',
message: errorMsg,
recoverable,
timestamp: new Date().toISOString()
}
});
// Send push notification
await this.sendPushNotification(item, recoverable ? 'unhealthy' : 'error');
}
}
/**
* Phase 1: Extract text and thumbnail from Instagram
*
* Uses browser automation to load Instagram post and extract:
* - Recipe text (from caption, comments, etc.)
* - Thumbnail image (from meta tags or screenshot)
*
* Progress events are captured and added to queue item.
*
* @param item - Queue item being processed
* @throws Error if extraction fails
*/
private async extractionPhase(item: QueueItem): Promise<void> {
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'extraction'
});
const progressCallback = (event: ProgressEvent) => {
queueManager.addProgressEvent(item.id, event);
};
console.log(`[QueueProcessor] Extracting: ${item.url}`);
const extracted = await extractTextAndThumbnail(item.url, progressCallback);
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'extraction',
extractedText: extracted.bodyText,
thumbnail: extracted.thumbnail
});
console.log(`[QueueProcessor] ✓ Extraction complete: ${item.id}`);
}
/**
* Phase 2: Parse recipe from extracted text
*
* Uses LLM to extract structured recipe data:
* - Recipe name
* - Ingredients with amounts and units
* - Instructions/steps
* - Servings, times, etc.
*
* Enriches recipe with metadata (URL, thumbnail).
*
* @param item - Queue item being processed
* @throws Error if parsing fails or no recipe found
*/
private async parsingPhase(item: QueueItem): Promise<void> {
if (!item.extractedText) {
throw new Error('No extracted text available for parsing');
}
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'parsing'
});
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Parsing recipe with LLM...',
timestamp: new Date().toISOString()
});
console.log(`[QueueProcessor] Parsing recipe: ${item.id}`);
const recipe = await extractRecipe(item.extractedText);
if (!recipe) {
throw new Error('Failed to parse recipe from extracted text');
}
// Enrich recipe with metadata
if (recipe.description) {
recipe.description += `\n\nLink: ${item.url}`;
} else {
recipe.description = `Link: ${item.url}`;
}
if (item.thumbnail) {
recipe.image = item.thumbnail;
}
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'parsing',
recipe
});
console.log(`[QueueProcessor] ✓ Parsing complete: ${item.id} - ${recipe.name}`);
}
/**
* Phase 3: Upload to Tandoor (automatic)
*
* If Tandoor is configured (TANDOOR_TOKEN env var set):
* - Uploads recipe with ingredients and steps
* - Attempts to upload thumbnail/image
* - Image upload failure is non-fatal (logged but doesn't fail item)
*
* If Tandoor not configured: skips silently
*
* @param item - Queue item being processed
* @throws Error if Tandoor upload fails
*/
private async uploadPhase(item: QueueItem): Promise<void> {
// Check if Tandoor is enabled
if (!queueConfig.tandoor.enabled) {
// Skip if Tandoor not configured
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Tandoor not configured, skipping upload',
timestamp: new Date().toISOString()
});
console.log(`[QueueProcessor] Tandoor not configured, skipping: ${item.id}`);
return;
}
if (!item.recipe) {
throw new Error('No recipe available for upload');
}
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'uploading'
});
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Uploading recipe to Tandoor...',
timestamp: new Date().toISOString()
});
console.log(`[QueueProcessor] Uploading to Tandoor: ${item.id}`);
// Upload recipe
const result = await uploadRecipeWithIngredientsDTO(item.recipe);
if (!result.success) {
throw new Error(`Tandoor upload failed: ${result.error}`);
}
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'uploading',
tandoorRecipeId: result.recipeId
});
console.log(`[QueueProcessor] ✓ Recipe uploaded: ${item.id} → Tandoor #${result.recipeId}`);
// Upload image if available
if (result.recipeId && result.imageUrl) {
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Uploading recipe image to Tandoor...',
timestamp: new Date().toISOString()
});
const imageResult = await uploadRecipeImage(result.recipeId, result.imageUrl);
if (!imageResult.success) {
// Image upload failure is recoverable - log but don't fail
console.warn(`[QueueProcessor] Image upload failed for ${item.id}: ${imageResult.error}`);
queueManager.addProgressEvent(item.id, {
type: 'status',
message: `Image upload failed: ${imageResult.error}`,
timestamp: new Date().toISOString()
});
} else {
console.log(`[QueueProcessor] ✓ Image uploaded: ${item.id}`);
}
}
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Tandoor upload completed',
timestamp: new Date().toISOString()
});
}
/**
* Determine if error is recoverable
*
* Recoverable errors (unhealthy):
* - Network timeouts
* - Connection failures
* - Image upload failures
* - Thumbnail extraction failures
*
* Non-recoverable errors (error):
* - Invalid URL format
* - Authentication failures
* - Parsing failures (no recipe found)
*
* @param error - Error to classify
* @returns true if error is recoverable, false otherwise
*/
private isRecoverableError(error: unknown): boolean {
if (!(error instanceof Error)) return false;
const message = error.message.toLowerCase();
// Recoverable errors
const recoverablePatterns = [
'timeout',
'network',
'econnrefused',
'enotfound',
'image upload failed',
'thumbnail',
'etimeout',
'fetch failed'
];
return recoverablePatterns.some(pattern => message.includes(pattern));
}
/**
* Send Web Push notification for queue item completion
*
* Sends appropriate notification based on processing status:
* - success: Recipe extraction complete with details
* - error/unhealthy: Extraction failed with retry option
*
* @param item - Queue item that completed
* @param status - Completion status (success, unhealthy, error)
*/
private async sendPushNotification(
item: QueueItem,
status: 'success' | 'unhealthy' | 'error'
): Promise<void> {
try {
switch (status) {
case 'success':
await pushNotificationService.notifySuccess(
item.id,
item.results?.recipe?.name,
item.results?.tandoorUrl
);
break;
case 'error':
case 'unhealthy':
const errorMessage = item.error || 'Processing failed';
await pushNotificationService.notifyError(item.id, errorMessage);
break;
default:
console.warn(`[QueueProcessor] Unknown status for push notification: ${status}`);
}
} catch (error) {
console.error(`[QueueProcessor] Failed to send push notification:`, error);
// Don't let notification failures break processing
}
}
}
/**
* Singleton instance of QueueProcessor
*
* Auto-starts on module import to begin processing queue.
*/
export const queueProcessor = new QueueProcessor();
// Auto-start processor
queueProcessor.start();

View File

@@ -0,0 +1,34 @@
import { env } from '$env/dynamic/private';
/**
* Server-side configuration for the async queue system
* Uses SvelteKit's $env/dynamic/private for runtime environment access
*
* Environment Variables:
* - QUEUE_CONCURRENCY: Number of items to process concurrently (default: 2)
* - QUEUE_MAX_RETRIES: Maximum retry attempts for failed items (default: 3)
* - TANDOOR_TOKEN: Token for Tandoor API authentication
* - TANDOOR_SERVER_URL: Base URL for Tandoor server
* - VAPID_PUBLIC_KEY: Public VAPID key for web push notifications
* - VAPID_PRIVATE_KEY: Private VAPID key for web push notifications
*/
export const queueConfig = {
/** Number of items to process concurrently (default: 2) */
concurrency: parseInt(env.QUEUE_CONCURRENCY || '2', 10),
/** Maximum retry attempts for failed items (default: 3) */
maxRetries: parseInt(env.QUEUE_MAX_RETRIES || '3', 10),
/** Tandoor integration settings */
tandoor: {
enabled: !!env.TANDOOR_TOKEN,
token: env.TANDOOR_TOKEN || null,
serverUrl: env.TANDOOR_SERVER_URL || null
},
/** Web Push notification settings */
push: {
vapidPublicKey: env.VAPID_PUBLIC_KEY || 'BDummyPublicKeyForDevelopment',
vapidPrivateKey: env.VAPID_PRIVATE_KEY || 'DummyPrivateKeyForDevelopment'
}
};

View File

@@ -0,0 +1,192 @@
/**
* Type definitions for the async in-memory processing queue
*
* This module defines the core data structures for queue items,
* status updates, and callbacks used throughout the queue system.
*/
import type { ProgressEvent } from '$lib/server/extraction';
/**
* Possible states for a queue item
* - pending: Waiting in queue to be processed
* - in_progress: Currently being processed through one of the phases
* - success: All phases completed successfully
* - unhealthy: Recoverable error occurred, can be retried
* - error: Non-recoverable error occurred
*/
export type QueueItemStatus =
| 'pending'
| 'in_progress'
| 'success'
| 'unhealthy'
| 'error';
/**
* Processing phases for queue items
* - extraction: Extracting content from Instagram
* - parsing: Parsing recipe from extracted text
* - uploading: Uploading recipe to Tandoor
*/
export type ProcessingPhase =
| 'extraction'
| 'parsing'
| 'uploading';
/**
* Phase progress information
* Tracks the status of each processing phase
*/
export interface PhaseProgress {
/** Name of the phase */
name: ProcessingPhase;
/** Current status of this phase */
status: 'pending' | 'in_progress' | 'completed' | 'error';
/** When phase started processing (ISO 8601 string) */
startedAt?: string;
/** When phase completed (ISO 8601 string) */
completedAt?: string;
/** Error message if phase failed */
error?: string;
}
/**
* Processing results wrapper
* Contains all outputs from the processing pipeline
*/
export interface ProcessingResults {
/** Extracted text from Instagram */
extractedText?: string;
/** Thumbnail URL or data URL */
thumbnail?: string | null;
/** Parsed recipe object */
recipe?: any;
/** Tandoor recipe ID */
tandoorRecipeId?: number;
/** Tandoor recipe URL (constructed from ID) */
tandoorUrl?: string;
}
/**
* Queue item representing a single Instagram URL processing job
*/
export interface QueueItem {
/** Unique identifier (UUID) */
id: string;
/** Instagram URL to process */
url: string;
/** Current status of the item */
status: QueueItemStatus;
// Phase tracking
/** Current processing phase (only set when status is in_progress) */
currentPhase?: ProcessingPhase;
/** Array of all phases with their progress status */
phases: PhaseProgress[];
// Timestamps
/** When item was added to queue (ISO 8601 string) */
enqueuedAt: string;
/** Alias for enqueuedAt (frontend uses this) */
createdAt: string;
/** When processing started (ISO 8601 string) */
startedAt?: string;
/** When processing completed (ISO 8601 string) */
completedAt?: string;
/** Last update timestamp (ISO 8601 string) */
updatedAt?: string;
// Results - wrapped in results object
/** Processing results container */
results?: ProcessingResults;
// Legacy direct properties (kept for transition period)
/** @deprecated Use results.extractedText instead */
extractedText?: string;
/** @deprecated Use results.thumbnail instead */
thumbnail?: string | null;
/** @deprecated Use results.recipe instead */
recipe?: any;
/** @deprecated Use results.tandoorRecipeId instead */
tandoorRecipeId?: number;
// Progress tracking
/** User-facing log messages */
logs: string[];
/** All SSE progress events received */
progressEvents: ProgressEvent[];
// Error handling
/** Error details if processing failed */
error?: {
/** Phase where error occurred */
phase: ProcessingPhase;
/** Error message */
message: string;
/** Whether error is recoverable (can retry) */
recoverable: boolean;
/** When error occurred (ISO 8601 string) */
timestamp: string;
};
// Retry tracking
/** Number of times this item has been retried */
retryCount: number;
/** Maximum number of retries allowed */
maxRetries: number;
}
/**
* Update notification sent to queue subscribers
*/
export interface QueueStatusUpdate {
/** Type of update */
type: 'status_change' | 'progress' | 'phase_complete';
/** ID of the item that was updated */
itemId: string;
/** New status of the item */
status: QueueItemStatus;
/** When update occurred (ISO 8601 string) */
timestamp: string;
/** URL of the item */
url?: string;
// Phase information
/** Current phase (if status is in_progress) */
phase?: ProcessingPhase;
/** Full phase progress array */
progress?: PhaseProgress[];
// Results
/** Processing results object */
results?: ProcessingResults;
// Error
/** Error information */
error?: any;
/** Additional data related to the update (legacy) */
data?: any;
}
/**
* Callback function for queue update notifications
*/
export type QueueUpdateCallback = (update: QueueStatusUpdate) => void;