Real-Time Streaming
License Server Detail provides comprehensive real-time streaming capabilities through WebSocket and Server-Sent Events (SSE), with automatic fallback mechanisms for maximum reliability.
Overview
Section titled “Overview”The streaming architecture supports:
- WebSocket - Full-duplex communication with connection pooling
- Server-Sent Events - Unidirectional server push with automatic reconnection
- Graceful Degradation - Automatic fallback from WebSocket to SSE
- Health Monitoring - Real-time connection health and RTT tracking
WebSocket Client
Section titled “WebSocket Client”Features
Section titled “Features”| Feature | Description |
|---|---|
| Connection Pooling | Efficient connection management with configurable limits |
| Load Balancing | Multiple strategies: round-robin, least-connections, health-weighted, sticky |
| Health Monitoring | Real-time RTT measurement and connection health tracking |
| Circuit Breaker | Automatic failure detection with recovery |
| Memory Management | Automatic cleanup and leak prevention |
Configuration
Section titled “Configuration”import { LicenseMonitorWebSocketClient } from '@/lib/websocket/license-monitor-client';
const wsClient = new LicenseMonitorWebSocketClient({ url: 'ws://license-monitor:8080/ws/logs',
// Connection pooling pooling: { enabled: true, maxConnections: 5, minConnections: 1, loadBalancingStrategy: 'health-weighted', },
// Reconnection settings reconnect: { enabled: true, maxAttempts: 10, baseDelay: 1000, maxDelay: 30000, jitter: true, },
// Health monitoring health: { pingInterval: 30000, pingTimeout: 5000, unhealthyThreshold: 3, },
// Circuit breaker circuitBreaker: { enabled: true, failureThreshold: 5, cooldownMs: 10000, },});Load Balancing Strategies
Section titled “Load Balancing Strategies”// Distributes connections evenly across the poolconst client = new LicenseMonitorWebSocketClient({ pooling: { enabled: true, loadBalancingStrategy: 'round-robin', },});// Sends to connection with fewest active messagesconst client = new LicenseMonitorWebSocketClient({ pooling: { enabled: true, loadBalancingStrategy: 'least-connections', },});// Prefers healthier connections based on RTTconst client = new LicenseMonitorWebSocketClient({ pooling: { enabled: true, loadBalancingStrategy: 'health-weighted', },});// Routes same message types to same connectionconst client = new LicenseMonitorWebSocketClient({ pooling: { enabled: true, loadBalancingStrategy: 'sticky', },});Event Handling
Section titled “Event Handling”// Subscribe to eventswsClient.on('message', (data) => { console.log('Received:', data);});
wsClient.on('connect', () => { console.log('Connected to WebSocket');});
wsClient.on('disconnect', (reason) => { console.log('Disconnected:', reason);});
wsClient.on('error', (error) => { console.error('WebSocket error:', error);});
wsClient.on('reconnecting', (attempt) => { console.log(`Reconnection attempt ${attempt}`);});
// Connect and subscribe to streamsawait wsClient.connect();wsClient.subscribe('logs', { level: 'error' });wsClient.subscribe('metrics', { interval: 5000 });Health Metrics
Section titled “Health Metrics”// Get connection pool healthconst health = wsClient.getHealth();
console.log({ totalConnections: health.totalConnections, activeConnections: health.activeConnections, healthyConnections: health.healthyConnections, averageRTT: health.averageRTT, circuitBreakerState: health.circuitBreaker.state,});
// Output:// {// totalConnections: 5,// activeConnections: 3,// healthyConnections: 5,// averageRTT: 45,// circuitBreakerState: 'closed'// }Server-Sent Events Client
Section titled “Server-Sent Events Client”Features
Section titled “Features”| Feature | Description |
|---|---|
| EventSource Management | Full control over SSE connections |
| Automatic Reconnection | Exponential backoff with configurable limits |
| Message Filtering | Advanced filtering and validation |
| Health Monitoring | Connection health metrics |
| API Key Auth | Secure authentication for endpoints |
Configuration
Section titled “Configuration”import { LicenseMonitorSSE } from '@/lib/sse/license-monitor-sse';
const sseClient = new LicenseMonitorSSE({ baseUrl: 'http://license-monitor:8080', apiKey: process.env.LICENSE_MONITOR_API_KEY,
// Reconnection settings reconnect: { enabled: true, maxAttempts: 10, baseDelay: 1000, maxDelay: 30000, },
// Message handling messageValidation: true, batchSize: 100, batchTimeout: 1000,});Stream Types
Section titled “Stream Types”// Log streamconst logStream = sseClient.streamLogs({ level: 'warn', onMessage: (log) => { console.log(`[${log.level}] ${log.message}`); }, onError: (error) => { console.error('Log stream error:', error); },});
// Metrics streamconst metricsStream = sseClient.streamMetrics({ interval: 5000, onMessage: (metrics) => { updateDashboard(metrics); },});
// Close streams when donelogStream.close();metricsStream.close();Event Filtering
Section titled “Event Filtering”// Filter messages by criteriaconst filteredStream = sseClient.streamLogs({ level: 'error', filter: (message) => { // Only include messages from specific servers return message.serverId === 'server-1'; }, transform: (message) => { // Transform message before delivery return { ...message, timestamp: new Date(message.timestamp), }; }, onMessage: (log) => { handleFilteredLog(log); },});Graceful Degradation
Section titled “Graceful Degradation”The streaming client automatically handles fallback from WebSocket to SSE:
Architecture
Section titled “Architecture”┌─────────────────────────────────────────────────────────────┐│ Streaming Client │├─────────────────────────────────────────────────────────────┤│ ││ ┌─────────────────────────────────────────────────────────┐││ │ Connection Manager │││ │ │││ │ 1. Try WebSocket connection │││ │ 2. If WebSocket fails → fallback to SSE │││ │ 3. Periodically attempt WebSocket recovery │││ │ 4. Seamless data flow during transitions │││ │ │││ └─────────────────────────────────────────────────────────┘││ ││ ┌──────────────────┐ ┌──────────────────────────────┐ ││ │ WebSocket │ │ SSE │ ││ │ Client │ ───> │ Client │ ││ │ │ │ │ ││ │ Primary │ │ Fallback │ ││ │ Full-duplex │ │ Server-push only │ ││ └──────────────────┘ └──────────────────────────────┘ │└─────────────────────────────────────────────────────────────┘Configuration
Section titled “Configuration”import { LicenseMonitorStreamingClient } from '@/lib/streaming/license-monitor-streaming-client';
const streamingClient = new LicenseMonitorStreamingClient({ // WebSocket configuration websocket: { url: 'ws://license-monitor:8080/ws/logs', pooling: { enabled: true, maxConnections: 3, }, },
// SSE fallback configuration sse: { baseUrl: 'http://license-monitor:8080', apiKey: process.env.LICENSE_MONITOR_API_KEY, },
// Graceful degradation settings degradation: { enabled: true, fallbackDelay: 5000, // Wait before falling back recoveryInterval: 60000, // Try WebSocket recovery every minute maxRecoveryAttempts: 5, },});// The client handles protocol selection automaticallystreamingClient.on('connectionChange', (info) => { console.log(`Connection type: ${info.type}`); // 'websocket' or 'sse' console.log(`Reason: ${info.reason}`);});
streamingClient.on('message', (data) => { // Messages flow seamlessly regardless of connection type handleMessage(data);});
streamingClient.on('degraded', () => { console.warn('Fallen back to SSE');});
streamingClient.on('recovered', () => { console.log('Restored WebSocket connection');});
// Start streamingawait streamingClient.connect();streamingClient.subscribe('logs');streamingClient.subscribe('metrics');React Hooks
Section titled “React Hooks”useGracefulDegradation
Section titled “useGracefulDegradation”import { useGracefulDegradation } from '@/hooks/useGracefulDegradation';
function MonitoringDashboard() { const { connectionType, isConnected, isDegraded, messages, subscribe, unsubscribe, } = useGracefulDegradation({ websocketUrl: 'ws://license-monitor:8080/ws/combined', sseBaseUrl: 'http://license-monitor:8080', apiKey: process.env.NEXT_PUBLIC_LICENSE_MONITOR_API_KEY, });
useEffect(() => { subscribe('logs', { level: 'error' }); subscribe('metrics', { interval: 5000 });
return () => { unsubscribe('logs'); unsubscribe('metrics'); }; }, []);
return ( <div> <StatusIndicator connected={isConnected} degraded={isDegraded} type={connectionType} /> <MessageList messages={messages} /> </div> );}useServerHealth
Section titled “useServerHealth”import { useServerHealth } from '@/hooks/useServerHealth';
function ServerStatus({ serverId }: { serverId: string }) { const { health, isLoading, error, refetch } = useServerHealth(serverId, { refetchInterval: 30000, enableStreaming: true, });
if (isLoading) return <Skeleton />; if (error) return <ErrorMessage error={error} />;
return ( <HealthCard status={health.status} responseTime={health.responseTime} uptime={health.uptime} lastCheck={health.timestamp} /> );}Connection Lifecycle
Section titled “Connection Lifecycle”State Diagram
Section titled “State Diagram”┌─────────────┐│ DISCONNECTED│└──────┬──────┘ │ connect() ▼┌─────────────┐│ CONNECTING │──────────────────────────────┐└──────┬──────┘ │ │ success │ failure ▼ ▼┌─────────────┐ ┌─────────────┐│ CONNECTED │ │ RECONNECTING│└──────┬──────┘ └──────┬──────┘ │ │ │ error/close │ max attempts │ ────────────────────────────────────│ │ │ ▼ ▼┌─────────────┐ ┌─────────────┐│ DEGRADED │◄──────────────────────│ FAILED ││ (SSE) │ └─────────────┘└──────┬──────┘ │ recovery ▼┌─────────────┐│ CONNECTED ││ (WebSocket) │└─────────────┘Performance Considerations
Section titled “Performance Considerations”Memory Management
Section titled “Memory Management”// The client automatically manages memoryconst client = new LicenseMonitorWebSocketClient({ // Message buffer limits messageBuffer: { maxSize: 1000, pruneThreshold: 800, },
// Connection cleanup cleanup: { idleTimeout: 300000, // Close idle connections after 5 min maxMessageAge: 60000, // Discard messages older than 1 min },});Batching
Section titled “Batching”// Batch message processing for performanceconst sseClient = new LicenseMonitorSSE({ batchSize: 100, // Process up to 100 messages per batch batchTimeout: 1000, // Or flush every second onBatch: (messages) => { // Process batch efficiently bulkUpdateUI(messages); },});Next Steps
Section titled “Next Steps”- Dashboard Features - Using real-time data in the UI
- Data Model - Understanding data structures
- Performance - Optimization tips