412 lines
11 KiB
Markdown
412 lines
11 KiB
Markdown
# Migration Guide: Synchronous to Async Queue System
|
|
|
|
This document outlines the migration from InstaRecipe's original synchronous extraction system to the new async queue-based architecture.
|
|
|
|
## Overview
|
|
|
|
The migration transformed InstaRecipe from a blocking, synchronous extraction system to a modern, async queue-based system that provides better user experience, reliability, and scalability.
|
|
|
|
## What Changed
|
|
|
|
### Architecture Transformation
|
|
|
|
**Before: Synchronous System**
|
|
|
|
```
|
|
User Request → Direct Processing → Response (wait 30-60s)
|
|
↓ ↓ ↓
|
|
Share Page → Extract Function → Success/Error Page
|
|
```
|
|
|
|
**After: Async Queue System**
|
|
|
|
```
|
|
User Request → Queue Item Created → Immediate Response
|
|
↓ ↓ ↓
|
|
Share Page → Queue Manager → Dashboard (with real-time updates)
|
|
↓
|
|
Background Processing
|
|
(Extraction → Parsing → Upload)
|
|
↓
|
|
Push Notifications + SSE Updates
|
|
```
|
|
|
|
### Key Improvements
|
|
|
|
1. **User Experience**
|
|
- **Instant Response**: No more waiting 30-60 seconds for processing
|
|
- **Real-time Updates**: Live progress tracking via Server-Sent Events
|
|
- **Multi-tasking**: Users can submit multiple URLs simultaneously
|
|
- **Error Recovery**: Retry failed extractions with one click
|
|
|
|
2. **Reliability**
|
|
- **Error Classification**: Distinguishes recoverable from permanent failures
|
|
- **Automatic Retries**: Configurable retry logic for transient issues
|
|
- **Progress Persistence**: Queue state survives server restarts
|
|
- **Timeout Handling**: Proper cleanup of stuck processes
|
|
|
|
3. **Performance**
|
|
- **Concurrent Processing**: Process multiple recipes simultaneously
|
|
- **Resource Management**: Configurable concurrency limits
|
|
- **Memory Efficiency**: In-memory queue with event-driven updates
|
|
- **Background Processing**: Doesn't block user interface
|
|
|
|
4. **Observability**
|
|
- **Detailed Logging**: Comprehensive logging throughout processing pipeline
|
|
- **Progress Tracking**: Phase-by-phase progress reporting
|
|
- **Error Details**: Specific error messages and recovery suggestions
|
|
- **Analytics**: Processing metrics and success rates
|
|
|
|
## API Changes
|
|
|
|
### New Endpoints
|
|
|
|
#### Queue Management
|
|
|
|
```typescript
|
|
// Enqueue URL for processing
|
|
POST /api/queue
|
|
Body: { url: "https://instagram.com/p/abc123" }
|
|
Response: { id: "uuid", status: "pending", url: "...", createdAt: "..." }
|
|
|
|
// List queue items with filtering
|
|
GET /api/queue?status=error&limit=10&offset=0
|
|
Response: { items: [...], total: 42, hasMore: true }
|
|
|
|
// Get specific queue item
|
|
GET /api/queue/{id}
|
|
Response: { id: "...", status: "success", phases: [...], results: {...} }
|
|
|
|
// Retry failed item
|
|
POST /api/queue/{id}/retry
|
|
Response: { success: true, message: "Item queued for retry" }
|
|
|
|
// Real-time updates (Server-Sent Events)
|
|
GET /api/queue/stream?itemId={id}
|
|
Events: connection, queue-update, ping
|
|
```
|
|
|
|
#### Push Notifications
|
|
|
|
```typescript
|
|
// Subscribe to push notifications
|
|
POST /api/notifications/subscribe
|
|
Body: { subscription: {...}, clientId: "..." }
|
|
Response: { success: true, subscriptionCount: 5 }
|
|
|
|
// Get VAPID public key
|
|
GET /api/notifications/vapid-key
|
|
Response: { publicKey: "BDummyPublicKey..." }
|
|
```
|
|
|
|
### Deprecated Endpoints
|
|
|
|
These endpoints are marked for removal and should not be used in new code:
|
|
|
|
```typescript
|
|
// ❌ DEPRECATED: Synchronous extraction
|
|
POST / api / extract;
|
|
// 👉 Use: POST /api/queue
|
|
|
|
// ❌ DEPRECATED: Long-polling progress
|
|
GET / api / extract - stream;
|
|
// 👉 Use: GET /api/queue/stream
|
|
```
|
|
|
|
## Data Structure Changes
|
|
|
|
### Queue Items
|
|
|
|
New queue items follow this structure:
|
|
|
|
```typescript
|
|
interface QueueItem {
|
|
id: string; // UUID v4
|
|
url: string; // Instagram URL
|
|
status: 'pending' | 'in_progress' | 'success' | 'error' | 'unhealthy';
|
|
|
|
// Processing phases with individual progress
|
|
phases: Array<{
|
|
name: 'extraction' | 'parsing' | 'uploading';
|
|
status: 'pending' | 'in_progress' | 'completed' | 'error';
|
|
startedAt?: string;
|
|
completedAt?: string;
|
|
progress?: number; // 0-100
|
|
}>;
|
|
|
|
// Results (populated on success)
|
|
results?: {
|
|
recipe?: Recipe; // Extracted recipe data
|
|
tandoorUrl?: string; // Link to uploaded recipe
|
|
extractedText?: string; // Raw extracted text
|
|
thumbnail?: string; // Image URL
|
|
};
|
|
|
|
// Error information
|
|
error?: string;
|
|
|
|
// Timestamps
|
|
createdAt: string;
|
|
updatedAt: string;
|
|
}
|
|
```
|
|
|
|
### Progress Events
|
|
|
|
Real-time updates are sent via Server-Sent Events:
|
|
|
|
```typescript
|
|
interface QueueStatusUpdate {
|
|
itemId: string;
|
|
status: QueueItem['status'];
|
|
timestamp: string;
|
|
progress?: Array<{...}>; // Phase updates
|
|
results?: {...}; // Final results
|
|
error?: string; // Error message
|
|
}
|
|
```
|
|
|
|
## Migration Steps
|
|
|
|
### For Frontend Applications
|
|
|
|
1. **Replace Synchronous Calls**
|
|
|
|
```typescript
|
|
// ❌ Old synchronous approach
|
|
const response = await fetch('/api/extract', {
|
|
method: 'POST',
|
|
body: JSON.stringify({ url })
|
|
});
|
|
const result = await response.json(); // Wait 30-60 seconds
|
|
|
|
// ✅ New async queue approach
|
|
const response = await fetch('/api/queue', {
|
|
method: 'POST',
|
|
body: JSON.stringify({ url })
|
|
});
|
|
const queueItem = await response.json(); // Immediate response
|
|
|
|
// Navigate to dashboard for real-time updates
|
|
window.location.href = `/?highlight=${queueItem.id}`;
|
|
```
|
|
|
|
2. **Add Real-time Updates**
|
|
|
|
```typescript
|
|
// Setup Server-Sent Events for progress tracking
|
|
const eventSource = new EventSource(`/api/queue/stream?itemId=${itemId}`);
|
|
|
|
eventSource.addEventListener('queue-update', (event) => {
|
|
const update = JSON.parse(event.data);
|
|
updateUI(update);
|
|
});
|
|
```
|
|
|
|
3. **Handle New Error States**
|
|
```typescript
|
|
// Handle different queue statuses
|
|
switch (item.status) {
|
|
case 'pending':
|
|
showPendingState();
|
|
break;
|
|
case 'in_progress':
|
|
showProgressBar(item.phases);
|
|
break;
|
|
case 'success':
|
|
showResults(item.results);
|
|
break;
|
|
case 'error':
|
|
showErrorWithRetry(item.error, item.id);
|
|
break;
|
|
case 'unhealthy':
|
|
showRetryableError(item.error, item.id);
|
|
break;
|
|
}
|
|
```
|
|
|
|
### For Backend Integrations
|
|
|
|
1. **Update API Calls**
|
|
|
|
```python
|
|
# ❌ Old synchronous API
|
|
response = requests.post('/api/extract', json={'url': url})
|
|
# This would block for 30-60 seconds
|
|
|
|
# ✅ New async queue API
|
|
response = requests.post('/api/queue', json={'url': url})
|
|
queue_item = response.json()
|
|
|
|
# Poll or use SSE for updates
|
|
while True:
|
|
item = requests.get(f'/api/queue/{queue_item["id"]}').json()
|
|
if item['status'] in ['success', 'error']:
|
|
break
|
|
time.sleep(5) # Poll every 5 seconds
|
|
```
|
|
|
|
2. **Implement SSE Client** (Python example)
|
|
|
|
```python
|
|
import sseclient
|
|
|
|
def listen_to_queue_updates(item_id):
|
|
messages = sseclient.SSEClient(f'/api/queue/stream?itemId={item_id}')
|
|
for msg in messages:
|
|
if msg.event == 'queue-update':
|
|
update = json.loads(msg.data)
|
|
handle_update(update)
|
|
if update['status'] in ['success', 'error']:
|
|
break
|
|
```
|
|
|
|
## Configuration Changes
|
|
|
|
### Environment Variables
|
|
|
|
New configuration options for the queue system:
|
|
|
|
```env
|
|
# Queue processing settings
|
|
QUEUE_CONCURRENCY=2 # Number of concurrent items
|
|
QUEUE_TIMEOUT_MS=30000 # Processing timeout
|
|
QUEUE_RETRY_ATTEMPTS=3 # Maximum retry attempts
|
|
|
|
# Push notification settings (optional)
|
|
VAPID_PUBLIC_KEY=BDummyPublicKey...
|
|
VAPID_PRIVATE_KEY=DummyPrivateKey...
|
|
|
|
# Existing LLM and Tandoor settings remain the same
|
|
LLM_API_BASE_URL=https://api.openai.com/v1
|
|
TANDOOR_BASE_URL=https://your-tandoor.com
|
|
```
|
|
|
|
## Testing Changes
|
|
|
|
### New Test Categories
|
|
|
|
1. **Queue Manager Tests** (28 tests)
|
|
- CRUD operations
|
|
- Event subscriptions
|
|
- Filtering and pagination
|
|
|
|
2. **Queue Processor Tests** (5 integration tests)
|
|
- Three-phase processing pipeline
|
|
- Error handling and recovery
|
|
- Concurrency management
|
|
|
|
3. **API Endpoint Tests** (17 tests)
|
|
- Queue management operations
|
|
- Input validation
|
|
- Error responses
|
|
|
|
4. **SSE Stream Tests** (6 tests)
|
|
- Real-time event delivery
|
|
- Connection management
|
|
- Event filtering
|
|
|
|
### Testing the Migration
|
|
|
|
```bash
|
|
# Run full test suite
|
|
npm test
|
|
|
|
# Test specific components
|
|
npm test queue-manager
|
|
npm test queue-processor
|
|
npm test queue-api
|
|
npm test queue-sse
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
### Before Migration
|
|
|
|
- **Blocking Operations**: Each request blocked a server thread
|
|
- **Single Processing**: One extraction at a time
|
|
- **No Progress**: Users waited without feedback
|
|
- **Memory Usage**: High memory usage during long operations
|
|
|
|
### After Migration
|
|
|
|
- **Non-blocking**: Requests return immediately
|
|
- **Concurrent Processing**: Multiple extractions in parallel
|
|
- **Real-time Feedback**: Live progress updates
|
|
- **Efficient Memory**: Event-driven, minimal memory footprint
|
|
|
|
### Performance Metrics
|
|
|
|
- **Response Time**: 50ms (queue) vs 30-60s (synchronous)
|
|
- **Throughput**: 2x concurrent processing vs 1x sequential
|
|
- **User Experience**: Immediate feedback vs long waiting
|
|
- **Error Recovery**: Automatic retries vs manual restart
|
|
|
|
## Rollback Plan
|
|
|
|
If issues arise, the system can be rolled back by:
|
|
|
|
1. **Disable Queue Processing**
|
|
|
|
```env
|
|
QUEUE_PROCESSING_ENABLED=false
|
|
```
|
|
|
|
2. **Re-enable Legacy Endpoints** (if preserved)
|
|
|
|
```typescript
|
|
// Temporary fallback to synchronous processing
|
|
app.post('/api/extract', legacyExtractHandler);
|
|
```
|
|
|
|
3. **Database Migration** (if applicable)
|
|
- Queue data is in-memory and will be lost on restart
|
|
- No persistent data migration needed for rollback
|
|
|
|
## Troubleshooting
|
|
|
|
### Common Issues
|
|
|
|
1. **Queue Items Stuck in 'pending'**
|
|
- **Cause**: Queue processor not started
|
|
- **Solution**: Check logs for processor initialization errors
|
|
|
|
2. **SSE Connection Failures**
|
|
- **Cause**: HTTPS certificate issues or browser security
|
|
- **Solution**: Verify SSL setup and CORS configuration
|
|
|
|
3. **Push Notifications Not Working**
|
|
- **Cause**: Missing VAPID keys or HTTPS requirement
|
|
- **Solution**: Generate VAPID keys and ensure HTTPS
|
|
|
|
4. **High Memory Usage**
|
|
- **Cause**: Too many concurrent queue items
|
|
- **Solution**: Reduce `QUEUE_CONCURRENCY` setting
|
|
|
|
### Debugging Tools
|
|
|
|
```bash
|
|
# Check queue status
|
|
curl https://localhost:5173/api/queue
|
|
|
|
# Monitor SSE stream
|
|
curl -N -H "Accept: text/event-stream" \
|
|
https://localhost:5173/api/queue/stream
|
|
|
|
# Test push notification subscription
|
|
curl -X POST https://localhost:5173/api/notifications/vapid-key
|
|
```
|
|
|
|
## Conclusion
|
|
|
|
The migration to an async queue system represents a significant architectural improvement that provides:
|
|
|
|
- **Better User Experience**: Immediate responses and real-time progress
|
|
- **Improved Reliability**: Error recovery and retry mechanisms
|
|
- **Enhanced Performance**: Concurrent processing and resource efficiency
|
|
- **Modern Features**: Push notifications and PWA capabilities
|
|
|
|
The new system maintains backward compatibility during the transition period while providing a clear migration path for all integrations.
|
|
|
|
For questions or issues during migration, refer to the comprehensive test suite and documentation, or open an issue in the project repository.
|