- Fix EventSource is not defined error in queue dashboard - Add browser guards for all EventSource usage - Replace static constants (EventSource.OPEN/CLOSED) with numeric values - Fix setInterval SSR violation in LLM health indicator - Replace $effect anti-pattern with onMount in share page - Add comprehensive SvelteKit SSR best practices documentation - Add SSR audit and testing verification All changes follow SvelteKit best practices and are verified against official documentation. Production build succeeds with no SSR errors. Closes: FixEventSourceSSR See: docs/outcomes/FixEventSourceSSR.md
42 KiB
Execution Plan: Async In-Memory Processing Queue
OUTCOME_NAME: AsyncInMemoryProcessingQueue
Created: 21 December 2025
Problem Statement: The current Share endpoint is synchronous and blocks the user's browser while processing Instagram URLs. Users must wait on the Share page until extraction, parsing, and Tandoor upload complete. This creates a poor user experience with no ability to track multiple requests or retry failed operations. Need to implement an async, in-memory processing queue that decouples URL submission from processing, provides real-time status updates via SSE, and displays queue items on the Homepage.
Current State Analysis
Existing Architecture
Share Flow:
- User shares Instagram URL →
/share?url=... - User clicks "Extract Recipe" button
- Frontend calls
/api/extract-stream(SSE endpoint) - Browser waits for complete processing pipeline:
- Extraction (browser automation)
- Parsing (LLM call)
- Tandoor upload (if enabled)
- Share page displays results in real-time
- User can manually trigger Tandoor import
Current Files:
src/routes/share/+page.svelte- Share page UIsrc/routes/api/extract-stream/+server.ts- SSE streaming endpointsrc/lib/server/extraction.ts- Instagram extraction logicsrc/lib/server/parser.ts- LLM recipe parsingsrc/lib/server/tandoor.ts- Tandoor upload logic
Current Status Reporting:
- SSE events:
status,method,retry,error,thumbnail,complete - Real-time logs displayed in Share page
- Thumbnail preview with progress states
- Recipe card with Tandoor import button
Problems with Current Approach
- Blocking UX: User must keep Share page open during processing
- No Multi-Request Support: Can't queue multiple URLs
- No Persistence: Refreshing page loses progress
- No Retry Capability: Failed extractions can't be retried without re-sharing
- Wrong Page for Progress: Share page should confirm submission, Homepage should show progress
Solution Architecture
Hexagonal Architecture Mapping
┌─────────────────────────────────────────────────┐
│ Primary Adapters (Inbound) │
│ - Share Page: Submit URL to queue │
│ - Homepage: Display queue items │
│ - SSE Stream: Consume queue updates │
└─────────────────┬───────────────────────────────┘
│
┌─────────────────┴───────────────────────────────┐
│ Domain (Core) │
│ ┌────────────────────────────────────────────┐ │
│ │ QueueManager (Port) │ │
│ │ - enqueue(url) → QueueItem │ │
│ │ - dequeue() → QueueItem │ │
│ │ - updateStatus(id, status) │ │
│ │ - remove(id) │ │
│ │ - retry(id) │ │
│ │ - getAll() → QueueItem[] │ │
│ │ - subscribe(callback) │ │
│ └────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────┐ │
│ │ QueueProcessor (Domain Logic) │ │
│ │ - processItem(item): Promise<void> │ │
│ │ - extractPhase() │ │
│ │ - parsePhase() │ │
│ │ - uploadPhase() │ │
│ └────────────────────────────────────────────┘ │
└─────────────────┬───────────────────────────────┘
│
┌─────────────────┴───────────────────────────────┐
│ Secondary Adapters (Outbound) │
│ - Browser Automation (extraction.ts) │
│ - LLM Service (parser.ts) │
│ - Tandoor API (tandoor.ts) │
│ - Web Push API (notifications) │
└──────────────────────────────────────────────────┘
Queue Item State Machine
enqueue()
│
▼
┌─────────┐
│ PENDING │──────┐
└─────────┘ │
│ │
dequeue() remove()
│ │
▼ ▼
┌──────────────┐ EXIT
│ IN_PROGRESS │
└──────────────┘
│ │ │
│ │ └──> UNHEALTHY ──> retry() ──┐
│ │ │ │
│ │ remove() │
│ │ │ │
│ └──> ERROR ───┴──> EXIT │
│ │
│ │
└──> SUCCESS ──> (auto-remove) ──> EXIT
or manual
States:
PENDING- Waiting in queueIN_PROGRESS- Currently being processed- Sub-states:
extracting,parsing,uploading
- Sub-states:
SUCCESS- All phases completed successfullyUNHEALTHY- Recoverable error (can retry)ERROR- Non-recoverable error
Technical Design
Queue Manager Design Pattern
Based on research into Node.js queue best practices, we'll use the fastq pattern:
- In-memory queue with concurrency control
- Promise-based API
- Event-driven status updates
- Minimal dependencies
Data Structures
// src/lib/server/queue/types.ts
export type QueueItemStatus =
| 'pending'
| 'in_progress'
| 'success'
| 'unhealthy'
| 'error';
export type ProcessingPhase =
| 'extraction'
| 'parsing'
| 'uploading';
export interface QueueItem {
id: string; // UUID
url: string; // Instagram URL
status: QueueItemStatus;
currentPhase?: ProcessingPhase;
// Timestamps
enqueuedAt: string; // ISO timestamp
startedAt?: string;
completedAt?: string;
// Results
extractedText?: string;
thumbnail?: string | null;
recipe?: any;
tandoorRecipeId?: number;
// Progress tracking
logs: string[]; // User-facing log messages
progressEvents: ProgressEvent[]; // All SSE events
// Error handling
error?: {
phase: ProcessingPhase;
message: string;
recoverable: boolean;
timestamp: string;
};
// Retry tracking
retryCount: number;
maxRetries: number;
}
export interface QueueStatusUpdate {
itemId: string;
status: QueueItemStatus;
phase?: ProcessingPhase;
data?: any;
error?: string;
timestamp: string;
}
export type QueueUpdateCallback = (update: QueueStatusUpdate) => void;
Queue Manager Implementation
// src/lib/server/queue/QueueManager.ts
import { v4 as uuidv4 } from 'uuid';
import type { QueueItem, QueueStatusUpdate, QueueUpdateCallback } from './types';
export class QueueManager {
private items: Map<string, QueueItem> = new Map();
private subscribers: Set<QueueUpdateCallback> = new Set();
/**
* Add URL to processing queue
*/
enqueue(url: string): QueueItem {
const item: QueueItem = {
id: uuidv4(),
url,
status: 'pending',
enqueuedAt: new Date().toISOString(),
logs: [],
progressEvents: [],
retryCount: 0,
maxRetries: 3
};
this.items.set(item.id, item);
this.notifySubscribers({
itemId: item.id,
status: 'pending',
timestamp: new Date().toISOString()
});
return item;
}
/**
* Get next pending item for processing
*/
dequeue(): QueueItem | null {
for (const item of this.items.values()) {
if (item.status === 'pending') {
this.updateStatus(item.id, 'in_progress', { phase: 'extraction' });
return item;
}
}
return null;
}
/**
* Update item status
*/
updateStatus(
itemId: string,
status: QueueItemStatus,
data?: any
): void {
const item = this.items.get(itemId);
if (!item) return;
item.status = status;
if (status === 'in_progress' && data?.phase) {
item.currentPhase = data.phase;
if (!item.startedAt) {
item.startedAt = new Date().toISOString();
}
}
if (status === 'success' || status === 'error') {
item.completedAt = new Date().toISOString();
}
if (data?.error) {
item.error = data.error;
}
// Merge data into item
Object.assign(item, data);
this.notifySubscribers({
itemId,
status,
...data,
timestamp: new Date().toISOString()
});
}
/**
* Add progress event to item
*/
addProgressEvent(itemId: string, event: any): void {
const item = this.items.get(itemId);
if (!item) return;
item.progressEvents.push(event);
item.logs.push(event.message);
this.notifySubscribers({
itemId,
status: item.status,
data: { event },
timestamp: new Date().toISOString()
});
}
/**
* Remove item from queue
*/
remove(itemId: string): boolean {
const deleted = this.items.delete(itemId);
if (deleted) {
this.notifySubscribers({
itemId,
status: 'error', // Use error to signal removal
data: { removed: true },
timestamp: new Date().toISOString()
});
}
return deleted;
}
/**
* Retry a failed item
*/
retry(itemId: string): boolean {
const item = this.items.get(itemId);
if (!item || item.status === 'in_progress') return false;
item.retryCount++;
item.status = 'pending';
item.currentPhase = undefined;
item.error = undefined;
item.startedAt = undefined;
item.completedAt = undefined;
this.notifySubscribers({
itemId,
status: 'pending',
data: { retryCount: item.retryCount },
timestamp: new Date().toISOString()
});
return true;
}
/**
* Get all queue items
*/
getAll(): QueueItem[] {
return Array.from(this.items.values());
}
/**
* Get single item by ID
*/
get(itemId: string): QueueItem | undefined {
return this.items.get(itemId);
}
/**
* Subscribe to queue updates
*/
subscribe(callback: QueueUpdateCallback): () => void {
this.subscribers.add(callback);
return () => this.subscribers.delete(callback);
}
/**
* Notify all subscribers of update
*/
private notifySubscribers(update: QueueStatusUpdate): void {
for (const callback of this.subscribers) {
try {
callback(update);
} catch (err) {
console.error('[QueueManager] Subscriber error:', err);
}
}
}
}
// Singleton instance
export const queueManager = new QueueManager();
Queue Processor
// src/lib/server/queue/QueueProcessor.ts
import { queueManager } from './QueueManager';
import { extractTextAndThumbnail } from '$lib/server/extraction';
import { extractRecipe } from '$lib/server/parser';
import { uploadRecipeWithIngredientsDTO, uploadRecipeImage } from '$lib/server/tandoor';
import type { ProgressEvent } from '$lib/server/extraction';
import type { QueueItem } from './types';
export class QueueProcessor {
private processing = false;
private concurrency = 2; // Process 2 items simultaneously
private activeWorkers = 0;
/**
* Start processing queue
*/
start(): void {
if (this.processing) return;
this.processing = true;
this.processNextBatch();
}
/**
* Stop processing queue
*/
stop(): void {
this.processing = false;
}
/**
* Process items up to concurrency limit
*/
private async processNextBatch(): Promise<void> {
if (!this.processing) return;
// Start new workers up to concurrency limit
while (this.activeWorkers < this.concurrency) {
const item = queueManager.dequeue();
if (!item) break;
this.activeWorkers++;
this.processItem(item)
.finally(() => {
this.activeWorkers--;
// Try to process next item
setTimeout(() => this.processNextBatch(), 0);
});
}
// Check again after delay if still processing
if (this.processing) {
setTimeout(() => this.processNextBatch(), 1000);
}
}
/**
* Process a single queue item through all phases
*/
private async processItem(item: QueueItem): Promise<void> {
try {
// Phase 1: Extraction
await this.extractionPhase(item);
// Phase 2: Parsing
await this.parsingPhase(item);
// Phase 3: Tandoor Upload (if enabled)
await this.uploadPhase(item);
// Success
queueManager.updateStatus(item.id, 'success');
// Send push notification
await this.sendPushNotification(item, 'success');
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
const recoverable = this.isRecoverableError(error);
queueManager.updateStatus(item.id, recoverable ? 'unhealthy' : 'error', {
error: {
phase: item.currentPhase,
message: errorMsg,
recoverable,
timestamp: new Date().toISOString()
}
});
// Send push notification
await this.sendPushNotification(item, recoverable ? 'unhealthy' : 'error');
}
}
/**
* Phase 1: Extract text and thumbnail from Instagram
*/
private async extractionPhase(item: QueueItem): Promise<void> {
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'extraction'
});
const progressCallback = (event: ProgressEvent) => {
queueManager.addProgressEvent(item.id, event);
};
const extracted = await extractTextAndThumbnail(item.url, progressCallback);
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'extraction',
extractedText: extracted.bodyText,
thumbnail: extracted.thumbnail
});
}
/**
* Phase 2: Parse recipe from extracted text
*/
private async parsingPhase(item: QueueItem): Promise<void> {
if (!item.extractedText) {
throw new Error('No extracted text available for parsing');
}
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'parsing'
});
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Parsing recipe with LLM...',
timestamp: new Date().toISOString()
});
const recipe = await extractRecipe(item.extractedText);
if (!recipe) {
throw new Error('Failed to parse recipe from extracted text');
}
// Enrich recipe with metadata
if (recipe.description) {
recipe.description += `\n\nLink: ${item.url}`;
} else {
recipe.description = `Link: ${item.url}`;
}
if (item.thumbnail) {
recipe.image = item.thumbnail;
}
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'parsing',
recipe
});
}
/**
* Phase 3: Upload to Tandoor (automatic)
*/
private async uploadPhase(item: QueueItem): Promise<void> {
// Check if Tandoor is enabled
const tandoorToken = process.env.TANDOOR_TOKEN;
if (!tandoorToken) {
// Skip if Tandoor not configured
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Tandoor not configured, skipping upload',
timestamp: new Date().toISOString()
});
return;
}
if (!item.recipe) {
throw new Error('No recipe available for upload');
}
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'uploading'
});
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Uploading recipe to Tandoor...',
timestamp: new Date().toISOString()
});
// Upload recipe
const result = await uploadRecipeWithIngredientsDTO(item.recipe);
if (!result.success) {
throw new Error(`Tandoor upload failed: ${result.error}`);
}
queueManager.updateStatus(item.id, 'in_progress', {
phase: 'uploading',
tandoorRecipeId: result.recipeId
});
// Upload image if available
if (result.recipeId && result.imageUrl) {
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Uploading recipe image to Tandoor...',
timestamp: new Date().toISOString()
});
const imageResult = await uploadRecipeImage(result.recipeId, result.imageUrl);
if (!imageResult.success) {
// Image upload failure is recoverable
queueManager.addProgressEvent(item.id, {
type: 'status',
message: `Image upload failed: ${imageResult.error}`,
timestamp: new Date().toISOString()
});
}
}
queueManager.addProgressEvent(item.id, {
type: 'status',
message: 'Tandoor upload completed',
timestamp: new Date().toISOString()
});
}
/**
* Determine if error is recoverable
*/
private isRecoverableError(error: unknown): boolean {
if (!(error instanceof Error)) return false;
const message = error.message.toLowerCase();
// Recoverable errors
const recoverablePatterns = [
'timeout',
'network',
'econnrefused',
'enotfound',
'image upload failed',
'thumbnail'
];
return recoverablePatterns.some(pattern => message.includes(pattern));
}
/**
* Send Web Push notification for queue item completion
*/
private async sendPushNotification(
item: QueueItem,
status: 'success' | 'unhealthy' | 'error'
): Promise<void> {
// TODO: Implement Web Push in Story 7
console.log(`[QueueProcessor] Would send push notification: ${status} for ${item.id}`);
}
}
// Singleton instance
export const queueProcessor = new QueueProcessor();
// Auto-start processor
queueProcessor.start();
Story Breakdown
Story 1: Implement Queue Manager Core
Priority: Critical
Dependencies: None
Objective: Create the in-memory queue management system with CRUD operations and event subscriptions.
Tasks:
- Create
src/lib/server/queue/types.tswith TypeScript definitions - Create
src/lib/server/queue/QueueManager.tswith queue logic - Add
uuidpackage for ID generation:npm install uuid @types/uuid - Implement all QueueManager methods
- Write unit tests for QueueManager
Acceptance Criteria:
- ✅ Can enqueue items
- ✅ Can dequeue items (FIFO)
- ✅ Can update item status
- ✅ Can add progress events to items
- ✅ Can remove items
- ✅ Can retry items
- ✅ Can subscribe to updates
- ✅ All tests passing
Files:
src/lib/server/queue/types.ts(new)src/lib/server/queue/QueueManager.ts(new)src/tests/queue-manager.spec.ts(new)
Story 2: Implement Queue Processor
Priority: Critical
Dependencies: Story 1
Objective: Create the queue processor that orchestrates extraction, parsing, and upload phases.
Tasks:
- Create
src/lib/server/queue/QueueProcessor.ts - Implement concurrency control (2 simultaneous workers)
- Implement extraction phase with progress callbacks
- Implement parsing phase
- Implement Tandoor upload phase
- Add error handling and recovery detection
- Write unit tests for QueueProcessor
Acceptance Criteria:
- ✅ Processes items from queue automatically
- ✅ Respects concurrency limit
- ✅ Updates queue item status through all phases
- ✅ Captures progress events
- ✅ Handles errors gracefully
- ✅ Distinguishes recoverable vs non-recoverable errors
- ✅ All tests passing
Files:
src/lib/server/queue/QueueProcessor.ts(new)src/tests/queue-processor.spec.ts(new)
Story 3: Create Queue API Endpoints
Priority: Critical
Dependencies: Story 1, Story 2
Objective: Expose queue operations via REST API endpoints.
Tasks:
- Create
src/routes/api/queue/enqueue/+server.ts- Add URL to queue - Create
src/routes/api/queue/list/+server.ts- Get all queue items - Create
src/routes/api/queue/[id]/+server.ts- Get/Delete specific item - Create
src/routes/api/queue/[id]/retry/+server.ts- Retry item - Add request validation
- Add error handling
API Endpoints:
POST /api/queue/enqueue { url: string } → { itemId: string, item: QueueItem }
GET /api/queue/list → { items: QueueItem[] }
GET /api/queue/:id → { item: QueueItem }
DELETE /api/queue/:id → { success: boolean }
POST /api/queue/:id/retry → { success: boolean }
Acceptance Criteria:
- ✅ All endpoints implemented
- ✅ Request validation working
- ✅ Error handling comprehensive
- ✅ Returns proper HTTP status codes
- ✅ All tests passing
Files:
src/routes/api/queue/enqueue/+server.ts(new)src/routes/api/queue/list/+server.ts(new)src/routes/api/queue/[id]/+server.ts(new)src/routes/api/queue/[id]/retry/+server.ts(new)src/tests/queue-api.spec.ts(new)
Story 4: Create Queue Status Stream Endpoint
Priority: Critical
Dependencies: Story 1
Objective: Create SSE endpoint that streams queue updates to subscribed clients.
Tasks:
- Create
src/routes/api/queue/stream/+server.ts - Implement SSE connection with keep-alive
- Subscribe to QueueManager updates
- Send initial state (all items)
- Stream real-time updates
- Handle client disconnection
- Support filtering by item ID (optional query param)
Implementation:
// src/routes/api/queue/stream/+server.ts
import { queueManager } from '$lib/server/queue/QueueManager';
import type { RequestHandler } from '@sveltejs/kit';
export const GET: RequestHandler = async ({ url }) => {
const itemId = url.searchParams.get('itemId'); // Optional filter
const stream = new ReadableStream({
start(controller) {
const encoder = new TextEncoder();
// Send initial state
const items = itemId
? [queueManager.get(itemId)].filter(Boolean)
: queueManager.getAll();
const initMessage = `event: init\ndata: ${JSON.stringify(items)}\n\n`;
controller.enqueue(encoder.encode(initMessage));
// Subscribe to updates
const unsubscribe = queueManager.subscribe((update) => {
// Filter by itemId if specified
if (itemId && update.itemId !== itemId) return;
const message = `event: update\ndata: ${JSON.stringify(update)}\n\n`;
controller.enqueue(encoder.encode(message));
});
// Keep-alive ping every 30 seconds
const keepAlive = setInterval(() => {
try {
controller.enqueue(encoder.encode(': keep-alive\n\n'));
} catch {
clearInterval(keepAlive);
}
}, 30000);
// Cleanup on disconnect
const cleanup = () => {
clearInterval(keepAlive);
unsubscribe();
controller.close();
};
// Handle client disconnect
controller.cancel = cleanup;
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
};
Acceptance Criteria:
- ✅ SSE connection established
- ✅ Sends initial queue state
- ✅ Streams real-time updates
- ✅ Keep-alive prevents timeout
- ✅ Handles client disconnect gracefully
- ✅ Optional item filtering works
- ✅ All tests passing
Files:
src/routes/api/queue/stream/+server.ts(new)src/tests/queue-stream.spec.ts(new)
Story 5: Refactor Share Page to Fire-and-Forget
Priority: High
Dependencies: Story 3
Objective: Modify Share page to only enqueue URLs and show success confirmation, removing all processing UI.
Tasks:
- Update
src/routes/share/+page.svelte - Change
process()function to call/api/queue/enqueue - Remove SSE connection code
- Remove progress indicators
- Remove log viewer
- Remove thumbnail preview
- Remove recipe card
- Show simple success message with link to homepage
- Optionally redirect to homepage after 2 seconds
New Share Page Flow:
<script lang="ts">
import { page } from '$app/stores';
import { goto } from '$app/navigation';
let status = $state<'idle' | 'submitting' | 'success' | 'error'>('idle');
let errorMessage = $state<string | null>(null);
let itemId = $state<string | null>(null);
let targetUrl = $derived(/* ... extract from params ... */);
async function submit() {
if (!targetUrl) return;
status = 'submitting';
errorMessage = null;
try {
const response = await fetch('/api/queue/enqueue', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url: targetUrl })
});
if (!response.ok) {
throw new Error('Failed to enqueue URL');
}
const result = await response.json();
itemId = result.itemId;
status = 'success';
// Redirect to homepage after 2 seconds
setTimeout(() => {
goto('/');
}, 2000);
} catch (error) {
status = 'error';
errorMessage = error instanceof Error ? error.message : 'Unknown error';
}
}
</script>
{#if status === 'idle'}
<h1>Share Recipe</h1>
{#if targetUrl}
<p class="url-display">{targetUrl}</p>
<button onclick={submit}>Add to Queue</button>
{:else}
<p>No URL detected. Please share from Instagram.</p>
{/if}
{:else if status === 'submitting'}
<div class="loading">
<p>Adding to queue...</p>
</div>
{:else if status === 'success'}
<div class="success">
<h2>✓ Added to Queue</h2>
<p>Your recipe is being processed.</p>
<p>Redirecting to homepage...</p>
<a href="/">View Queue Status</a>
</div>
{:else if status === 'error'}
<div class="error">
<h2>Error</h2>
<p>{errorMessage}</p>
<button onclick={() => status = 'idle'}>Try Again</button>
</div>
{/if}
Acceptance Criteria:
- ✅ Share page only handles URL submission
- ✅ No processing happens on Share page
- ✅ Success message shows queue confirmation
- ✅ Auto-redirects to homepage
- ✅ Error handling works
- ✅ All Share page components removed (except URL input)
Files:
src/routes/share/+page.svelte(major refactor)src/routes/share/components/(delete most components, keep minimal)
Story 6: Create Homepage Queue View
Priority: Critical
Dependencies: Story 4
Objective: Transform homepage to display queue items as cards with real-time status updates.
Tasks:
- Update
src/routes/+page.svelteto be queue dashboard - Create
src/routes/components/QueueItemCard.svelte - Create
src/routes/components/QueueItemDetail.svelte - Connect to
/api/queue/streamSSE endpoint - Display queue items sorted by status (in_progress → unhealthy → pending → success)
- Implement expand/collapse for item details
- Add remove button per item
- Add retry button for unhealthy/error items
- Show phase progress for in_progress items
- Reuse existing components (ProgressIndicator, ThumbnailPreview, RecipeCard, LogViewer)
Homepage Structure:
<script lang="ts">
import QueueItemCard from './components/QueueItemCard.svelte';
import type { QueueItem } from '$lib/server/queue/types';
let queueItems = $state<QueueItem[]>([]);
let expandedItemId = $state<string | null>(null);
$effect(() => {
// Connect to SSE stream
const eventSource = new EventSource('/api/queue/stream');
eventSource.addEventListener('init', (e) => {
const items = JSON.parse(e.data);
queueItems = sortItems(items);
});
eventSource.addEventListener('update', (e) => {
const update = JSON.parse(e.data);
updateItem(update);
});
return () => eventSource.close();
});
function sortItems(items: QueueItem[]): QueueItem[] {
const order = {
in_progress: 0,
unhealthy: 1,
pending: 2,
success: 3,
error: 4
};
return items.sort((a, b) => order[a.status] - order[b.status]);
}
function updateItem(update: any) {
const index = queueItems.findIndex(i => i.id === update.itemId);
if (index >= 0) {
queueItems[index] = { ...queueItems[index], ...update };
queueItems = sortItems([...queueItems]);
}
}
async function removeItem(id: string) {
await fetch(`/api/queue/${id}`, { method: 'DELETE' });
queueItems = queueItems.filter(i => i.id !== id);
}
async function retryItem(id: string) {
await fetch(`/api/queue/${id}/retry`, { method: 'POST' });
}
</script>
<h1>Recipe Queue</h1>
{#if queueItems.length === 0}
<p>No recipes in queue. Share an Instagram recipe to get started!</p>
{:else}
<div class="queue-grid">
{#each queueItems as item (item.id)}
<QueueItemCard
{item}
expanded={expandedItemId === item.id}
onToggle={() => expandedItemId = expandedItemId === item.id ? null : item.id}
onRemove={() => removeItem(item.id)}
onRetry={() => retryItem(item.id)}
/>
{/each}
</div>
{/if}
QueueItemCard Component:
<script lang="ts">
import type { QueueItem } from '$lib/server/queue/types';
import QueueItemDetail from './QueueItemDetail.svelte';
let { item, expanded, onToggle, onRemove, onRetry } = $props<{
item: QueueItem;
expanded: boolean;
onToggle: () => void;
onRemove: () => void;
onRetry: () => void;
}>();
function getStatusBadge(status: string) {
const badges = {
pending: { color: 'gray', icon: '⏳', text: 'Pending' },
in_progress: { color: 'blue', icon: '⚙️', text: 'Processing' },
success: { color: 'green', icon: '✓', text: 'Success' },
unhealthy: { color: 'yellow', icon: '⚠️', text: 'Warning' },
error: { color: 'red', icon: '✗', text: 'Error' }
};
return badges[status] || badges.pending;
}
</script>
<div class="queue-item-card" class:expanded>
<!-- Header -->
<div class="header" onclick={onToggle}>
<div class="status-badge bg-{getStatusBadge(item.status).color}">
{getStatusBadge(item.status).icon} {getStatusBadge(item.status).text}
</div>
{#if item.currentPhase}
<div class="phase-badge">
{item.currentPhase}
</div>
{/if}
<div class="url-preview">{item.url.substring(0, 50)}...</div>
<div class="timestamp">
{new Date(item.enqueuedAt).toLocaleTimeString()}
</div>
</div>
<!-- Actions -->
<div class="actions">
{#if item.status === 'unhealthy' || item.status === 'error'}
<button onclick={onRetry}>🔄 Retry</button>
{/if}
<button onclick={onRemove}>🗑️ Remove</button>
</div>
<!-- Expanded Details -->
{#if expanded}
<QueueItemDetail {item} />
{/if}
</div>
QueueItemDetail Component: Reuses existing Share page components:
ThumbnailPreview- Show thumbnail if extractedLogViewer- Show progress logsRecipeCard- Show parsed recipeErrorState- Show error details
Acceptance Criteria:
- ✅ Homepage displays all queue items
- ✅ Real-time updates via SSE
- ✅ Items sorted by status priority
- ✅ Can expand/collapse item details
- ✅ Can remove items
- ✅ Can retry failed items
- ✅ Shows current phase for in-progress items
- ✅ Reuses existing UI components
- ✅ All tests passing
Files:
src/routes/+page.svelte(major refactor)src/routes/components/QueueItemCard.svelte(new)src/routes/components/QueueItemDetail.svelte(new)- Move
src/routes/share/components/*→src/lib/components/(shared)
Story 7: Implement Web Push Notifications
Priority: Medium
Dependencies: Story 2
Objective: Send push notifications when queue items complete (success, unhealthy, or error).
Tasks:
- Research Web Push API and service worker integration
- Add push notification permission request to homepage
- Store push subscriptions (in-memory for now)
- Implement
sendPushNotification()in QueueProcessor - Send notification on item completion
- Include item status and URL in notification
- Handle notification click to navigate to homepage
Reference: https://whatpwacando.today/declarative-web-push
Implementation Notes:
- Use Vite PWA plugin's existing service worker
- Store push subscriptions in QueueManager
- Send notification with item ID in data payload
- On click: focus app window and expand relevant queue item
Acceptance Criteria:
- ✅ User can grant push notification permission
- ✅ Notifications sent on item completion
- ✅ Notification includes status and URL
- ✅ Clicking notification opens homepage
- ✅ Works even when app not in focus
- ✅ All tests passing
Files:
src/lib/server/queue/PushManager.ts(new)src/routes/api/push/subscribe/+server.ts(new)src/routes/+page.svelte(add permission request)- Update service worker configuration
Story 8: Remove Legacy Status APIs
Priority: Low
Dependencies: Story 5, Story 6
Objective: Clean up old endpoints and code that are no longer needed.
Tasks:
Delete- KEEP for now (might be useful for manual testing)src/routes/api/extract-stream/+server.ts- Remove unused imports from Share page
- Delete unused Share page components if not reused
- Update README documentation
- Clean up any obsolete tests
Acceptance Criteria:
- ✅ No dead code remaining
- ✅ All tests passing
- ✅ Documentation updated
- ✅ No console warnings/errors
Files:
- Various cleanup
Testing Strategy
Unit Tests
QueueManager (queue-manager.spec.ts):
describe('QueueManager', () => {
it('should enqueue items with unique IDs');
it('should dequeue oldest pending item first (FIFO)');
it('should update item status');
it('should add progress events to items');
it('should remove items by ID');
it('should retry failed items');
it('should notify subscribers of updates');
it('should handle subscriber errors gracefully');
});
QueueProcessor (queue-processor.spec.ts):
describe('QueueProcessor', () => {
it('should process items up to concurrency limit');
it('should go through all phases: extraction → parsing → upload');
it('should mark item as success when all phases complete');
it('should mark item as unhealthy on recoverable error');
it('should mark item as error on non-recoverable error');
it('should capture progress events');
it('should skip Tandoor upload if not configured');
});
Integration Tests
Queue API (queue-api.spec.ts):
describe('Queue API', () => {
it('POST /api/queue/enqueue should add item to queue');
it('GET /api/queue/list should return all items');
it('GET /api/queue/:id should return specific item');
it('DELETE /api/queue/:id should remove item');
it('POST /api/queue/:id/retry should retry item');
it('should validate request bodies');
it('should return proper error responses');
});
Queue Stream (queue-stream.spec.ts):
describe('Queue Stream SSE', () => {
it('should send initial queue state on connect');
it('should stream updates in real-time');
it('should send keep-alive pings');
it('should filter by itemId if specified');
it('should handle client disconnect');
});
Manual Testing Checklist
Share Page:
- Share Instagram URL from mobile
- See success confirmation
- Auto-redirect to homepage
- Error handling works
Homepage:
- See queue items appear
- Real-time status updates work
- Can expand/collapse items
- Can remove items
- Can retry failed items
- Items sorted correctly
End-to-End Flow:
- Share URL → Homepage shows pending
- Item progresses: extraction → parsing → uploading
- Success state shows recipe
- Tandoor recipe created (if enabled)
- Push notification received
Deployment Considerations
Environment Variables
# Existing
TANDOOR_TOKEN=your-token
TANDOOR_SERVER_URL=https://tandoor.example.com
OPENAI_API_KEY=your-key
# New (optional)
QUEUE_CONCURRENCY=2 # Number of simultaneous workers
QUEUE_MAX_RETRIES=3 # Max retry attempts
PUSH_VAPID_PUBLIC_KEY=... # For Web Push
PUSH_VAPID_PRIVATE_KEY=... # For Web Push
Monitoring & Observability
Add logging for:
- Queue size
- Processing rate
- Error rate by phase
- Average processing time
- Concurrency utilization
Metrics to track:
{
queueSize: number,
pendingCount: number,
inProgressCount: number,
successCount: number,
errorCount: number,
averageProcessingTime: number,
concurrencyUtilization: number // activeWorkers / concurrency
}
Future Enhancements (Out of Scope)
Persistence Layer
- Store queue in Redis or SQLite
- Survive server restarts
- Distributed queue across multiple instances
Advanced Features
- Priority queue (urgent items first)
- Scheduled processing (process at specific time)
- Bulk operations (add multiple URLs at once)
- Queue statistics dashboard
- Export queue history
Performance Optimizations
- Dynamic concurrency based on system load
- Rate limiting for Instagram requests
- Caching extraction results
Technical Decisions & Rationale
Why In-Memory Queue?
- Simplicity: No external dependencies (Redis, database)
- Performance: Fastest possible queue operations
- Sufficient: PWA typically serves single user
- Extensible: Easy to swap for persistent queue later
Why fastq Pattern?
- Proven: Battle-tested in production
- Lightweight: Minimal dependencies
- Promise-based: Modern async/await API
- Concurrency: Built-in worker pooling
Why SSE over WebSocket?
- One-way: Only server→client needed
- Simpler: No handshake, automatic reconnect
- Native: EventSource API in browser
- Compatible: Works with SvelteKit ReadableStream
Why Automatic Tandoor Upload?
- Consistency: Every recipe uploaded immediately
- Simplicity: No manual step to forget
- Recoverable: Image upload failures don't block success
Risk Assessment
High Risk
- Browser automation failures: Instagram changes → extraction breaks
- Mitigation: Multi-strategy extraction already in place
Medium Risk
-
Memory usage: Large queue could consume RAM
- Mitigation: Concurrency limit + eventual auto-removal of success items
-
Race conditions: Multiple updates to same item
- Mitigation: Synchronous queue operations, no async writes
Low Risk
-
SSE connection stability: Client disconnect/reconnect
- Mitigation: Keep-alive + automatic reconnection
-
Lost progress on server restart: In-memory queue cleared
- Mitigation: Acceptable for MVP, persistence in future
Success Metrics
| Metric | Target |
|---|---|
| Share page load time | < 500ms |
| Time to enqueue | < 100ms |
| Average processing time | < 30s per item |
| Concurrent processing | 2 items simultaneously |
| Error recovery rate | > 80% of unhealthy items succeed on retry |
| Push notification delivery | > 95% success rate |
Documentation Requirements
README Updates:
- Queue architecture overview
- How to use the queue
- Environment variables
- Troubleshooting guide
Code Documentation:
- JSDoc for all public methods
- Inline comments for complex logic
- Type definitions exported
Checklist for Completion
Backend
- QueueManager implemented and tested
- QueueProcessor implemented and tested
- Queue API endpoints created
- Queue stream SSE endpoint created
- Push notifications working
- All unit tests passing
- All integration tests passing
Frontend
- Share page refactored to fire-and-forget
- Homepage queue view implemented
- QueueItemCard component created
- QueueItemDetail component created
- SSE connection to queue stream working
- Real-time updates working
- Remove/retry actions working
Documentation
- README updated
- Code fully documented
- Manual testing completed
Cleanup
- Legacy code removed
- No console warnings
- No dead code
Notes
This is a large feature that fundamentally changes the application architecture. Implement stories sequentially and verify each before proceeding. The queue system is extensible and can be enhanced with persistence, distributed processing, and advanced features in future iterations.
Estimated Implementation Time: 3-5 days for full implementation and testing.
References
- Hexagonal Architecture:
.system/abstract_architecture.md - fastq Documentation: https://github.com/mcollina/fastq
- Web Push Guide: https://whatpwacando.today/declarative-web-push
- SSE MDN Docs: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events
- Existing Plans:
docs/plans/IntegrateExtractionProgressFrontend.md