Task 4.8: WebSocket & Real-Time APIs
Overview
This task implements WebSocket support for the SmartRAG system, enabling real-time bidirectional communication between the server and clients. The implementation provides real-time updates for query processing, system notifications, data ingestion progress, and schema changes.
Objectives
- ✅ Implement WebSocket connection management for multiple concurrent clients
- ✅ Create WebSocket endpoints for real-time query and system updates
- ✅ Build subscription-based notification system
- ✅ Support user-specific and broadcast messaging
- ✅ Implement robust error handling and connection lifecycle management
- ✅ Create comprehensive tests for WebSocket functionality
- ✅ Document WebSocket API usage and integration patterns
Architecture
Connection Management
graph TB
subgraph "WebSocket Architecture"
Client1[Client 1] -->|WS Connection| WSEndpoint[WebSocket Endpoint]
Client2[Client 2] -->|WS Connection| WSEndpoint
Client3[Client 3] -->|WS Connection| WSEndpoint
WSEndpoint --> CM[Connection Manager]
CM --> CS[Connection Store]
CM --> SS[Subscription Store]
CM --> US[User Store]
subgraph "Message Broadcasting"
NS[Notification Service] --> CM
QP[Query Processor] --> NS
IS[Ingestion Service] --> NS
SM[Schema Manager] --> NS
end
end
Message Flow
sequenceDiagram
participant C as Client
participant WS as WebSocket Endpoint
participant CM as Connection Manager
participant NS as Notification Service
participant QP as Query Processor
C->>WS: Connect
WS->>CM: Register Connection
CM->>C: Connection Established
C->>WS: Subscribe to Updates
WS->>CM: Update Subscriptions
CM->>C: Subscription Confirmed
QP->>NS: Query Progress Update
NS->>CM: Broadcast Update
CM->>C: Query Update Message
C->>WS: Disconnect
WS->>CM: Unregister Connection
Implementation Details
1. Connection Manager
The ConnectionManager class handles all WebSocket connections:
class ConnectionManager:
"""Manages WebSocket connections and message broadcasting"""
async def connect(self, websocket: WebSocket, user_id: Optional[str] = None) -> UUID
async def disconnect(self, connection_id: UUID)
async def subscribe(self, connection_id: UUID, request: SubscriptionRequest)
async def broadcast_query_update(self, update: QueryUpdateMessage)
async def send_to_user(self, user_id: str, message: WebSocketMessage)
Key features:
- Thread-safe connection management with asyncio locks
- User-based connection tracking for targeted messaging
- Subscription-based message filtering
- Automatic cleanup of stale connections
- Connection statistics and monitoring
2. WebSocket Endpoints
Two main endpoints provide real-time functionality:
/ws/query - Query Updates
- Real-time query processing updates
- Partial result streaming
- Progress tracking
- Error notifications
/ws/updates - System Updates
- System-wide notifications
- Data ingestion progress
- Schema change alerts
- Maintenance announcements
3. Message Types
The system supports various message types:
class MessageType(str, Enum):
CONNECTION_ESTABLISHED = "connection_established"
QUERY_UPDATE = "query_update"
SYSTEM_UPDATE = "system_update"
INGESTION_PROGRESS = "ingestion_progress"
SCHEMA_CHANGE = "schema_change"
ERROR = "error"
4. Subscription System
Clients can subscribe to specific update types:
class SubscriptionType(str, Enum):
QUERY_UPDATES = "query_updates"
SYSTEM_UPDATES = "system_updates"
INGESTION_PROGRESS = "ingestion_progress"
SCHEMA_CHANGES = "schema_changes"
ALL_UPDATES = "all_updates"
API Usage
Client Connection
// Connect to WebSocket
const ws = new WebSocket('ws://localhost:8000/api/v1/ws/updates?token=<jwt_token>');
ws.onopen = (event) => {
console.log('Connected to WebSocket');
};
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
console.log('Received:', message);
};
Subscribing to Updates
// Subscribe to specific update types
ws.send(JSON.stringify({
action: "subscribe",
data: {
subscription_types: ["query_updates", "system_updates"],
filters: { user_id: "user123" }
}
}));
Handling Messages
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
switch(message.type) {
case 'query_update':
handleQueryUpdate(message.data);
break;
case 'system_update':
handleSystemUpdate(message.data);
break;
case 'ingestion_progress':
handleIngestionProgress(message.data);
break;
// ... handle other message types
}
};
Integration Examples
Query Processing with Real-time Updates
from backend.api.services.websocket_notifications import websocket_notifications
async def process_query_with_updates(query_id: str, query_text: str):
# Notify query started
await websocket_notifications.send_query_update(
query_id=query_id,
status="processing",
progress=0.0
)
# Process query with progress updates
for step, progress in enumerate([0.2, 0.5, 0.8, 1.0]):
# ... processing logic ...
await websocket_notifications.send_query_update(
query_id=query_id,
status="processing",
progress=progress,
partial_result=f"Step {step + 1} complete..."
)
Data Ingestion Progress
async def ingest_documents_with_progress(job_id: str, documents: List[Document]):
total = len(documents)
for i, doc in enumerate(documents):
# Process document
await process_document(doc)
# Send progress update
await websocket_notifications.send_ingestion_progress(
job_id=job_id,
status="processing",
progress=(i + 1) / total,
documents_processed=i + 1,
total_documents=total
)
Testing
Unit Tests
The implementation includes comprehensive unit tests:
# Test connection management
async def test_connect(connection_manager, mock_websocket):
connection_id = await connection_manager.connect(mock_websocket)
assert connection_manager.active_connections == 1
# Test message broadcasting
async def test_broadcast_query_update(connection_manager, mock_websocket):
# Subscribe to updates
await connection_manager.subscribe(connection_id, request)
# Broadcast update
await connection_manager.broadcast_query_update(update)
# Verify message received
mock_websocket.send_json.assert_called_once()
Integration Tests
Integration tests verify end-to-end functionality:
def test_websocket_subscription(client):
with client.websocket_connect("/api/v1/ws/updates") as websocket:
# Test subscription flow
websocket.send_json({
"action": "subscribe",
"data": {"subscription_types": ["query_updates"]}
})
response = websocket.receive_json()
assert response["type"] == "subscription_created"
Security Considerations
- Authentication: WebSocket connections support JWT token authentication via query parameters
- Authorization: User-based message filtering ensures users only receive authorized updates
- Rate Limiting: Connection limits prevent resource exhaustion
- Input Validation: All incoming messages are validated before processing
- Error Handling: Graceful error handling prevents connection leaks
Performance Optimizations
- Connection Pooling: Efficient management of concurrent connections
- Message Batching: Multiple updates can be batched for efficiency
- Selective Broadcasting: Only send messages to subscribed connections
- Async Processing: Non-blocking message handling
- Stale Connection Cleanup: Automatic removal of inactive connections
Monitoring and Metrics
The WebSocket system exposes metrics for monitoring:
- Active connection count
- Subscription statistics by type
- Message throughput rates
- Error rates and types
- Connection duration metrics
Future Enhancements
- Message Persistence: Store messages for offline clients
- Compression: Implement message compression for large payloads
- Clustering: Support WebSocket connections across multiple servers
- Binary Protocol: Support binary message format for efficiency
- GraphQL Subscriptions: Add GraphQL subscription support
Conclusion
The WebSocket implementation provides a robust foundation for real-time features in the SmartRAG system. It enables responsive user experiences through live updates while maintaining security, scalability, and reliability.