Skip to content

Real-Time Streaming

License Server Detail provides comprehensive real-time streaming capabilities through WebSocket and Server-Sent Events (SSE), with automatic fallback mechanisms for maximum reliability.

The streaming architecture supports:

  • WebSocket - Full-duplex communication with connection pooling
  • Server-Sent Events - Unidirectional server push with automatic reconnection
  • Graceful Degradation - Automatic fallback from WebSocket to SSE
  • Health Monitoring - Real-time connection health and RTT tracking
FeatureDescription
Connection PoolingEfficient connection management with configurable limits
Load BalancingMultiple strategies: round-robin, least-connections, health-weighted, sticky
Health MonitoringReal-time RTT measurement and connection health tracking
Circuit BreakerAutomatic failure detection with recovery
Memory ManagementAutomatic cleanup and leak prevention
import { LicenseMonitorWebSocketClient } from '@/lib/websocket/license-monitor-client';
const wsClient = new LicenseMonitorWebSocketClient({
url: 'ws://license-monitor:8080/ws/logs',
// Connection pooling
pooling: {
enabled: true,
maxConnections: 5,
minConnections: 1,
loadBalancingStrategy: 'health-weighted',
},
// Reconnection settings
reconnect: {
enabled: true,
maxAttempts: 10,
baseDelay: 1000,
maxDelay: 30000,
jitter: true,
},
// Health monitoring
health: {
pingInterval: 30000,
pingTimeout: 5000,
unhealthyThreshold: 3,
},
// Circuit breaker
circuitBreaker: {
enabled: true,
failureThreshold: 5,
cooldownMs: 10000,
},
});
// Distributes connections evenly across the pool
const client = new LicenseMonitorWebSocketClient({
pooling: {
enabled: true,
loadBalancingStrategy: 'round-robin',
},
});
// Subscribe to events
wsClient.on('message', (data) => {
console.log('Received:', data);
});
wsClient.on('connect', () => {
console.log('Connected to WebSocket');
});
wsClient.on('disconnect', (reason) => {
console.log('Disconnected:', reason);
});
wsClient.on('error', (error) => {
console.error('WebSocket error:', error);
});
wsClient.on('reconnecting', (attempt) => {
console.log(`Reconnection attempt ${attempt}`);
});
// Connect and subscribe to streams
await wsClient.connect();
wsClient.subscribe('logs', { level: 'error' });
wsClient.subscribe('metrics', { interval: 5000 });
// Get connection pool health
const health = wsClient.getHealth();
console.log({
totalConnections: health.totalConnections,
activeConnections: health.activeConnections,
healthyConnections: health.healthyConnections,
averageRTT: health.averageRTT,
circuitBreakerState: health.circuitBreaker.state,
});
// Output:
// {
// totalConnections: 5,
// activeConnections: 3,
// healthyConnections: 5,
// averageRTT: 45,
// circuitBreakerState: 'closed'
// }
FeatureDescription
EventSource ManagementFull control over SSE connections
Automatic ReconnectionExponential backoff with configurable limits
Message FilteringAdvanced filtering and validation
Health MonitoringConnection health metrics
API Key AuthSecure authentication for endpoints
import { LicenseMonitorSSE } from '@/lib/sse/license-monitor-sse';
const sseClient = new LicenseMonitorSSE({
baseUrl: 'http://license-monitor:8080',
apiKey: process.env.LICENSE_MONITOR_API_KEY,
// Reconnection settings
reconnect: {
enabled: true,
maxAttempts: 10,
baseDelay: 1000,
maxDelay: 30000,
},
// Message handling
messageValidation: true,
batchSize: 100,
batchTimeout: 1000,
});
// Log stream
const logStream = sseClient.streamLogs({
level: 'warn',
onMessage: (log) => {
console.log(`[${log.level}] ${log.message}`);
},
onError: (error) => {
console.error('Log stream error:', error);
},
});
// Metrics stream
const metricsStream = sseClient.streamMetrics({
interval: 5000,
onMessage: (metrics) => {
updateDashboard(metrics);
},
});
// Close streams when done
logStream.close();
metricsStream.close();
// Filter messages by criteria
const filteredStream = sseClient.streamLogs({
level: 'error',
filter: (message) => {
// Only include messages from specific servers
return message.serverId === 'server-1';
},
transform: (message) => {
// Transform message before delivery
return {
...message,
timestamp: new Date(message.timestamp),
};
},
onMessage: (log) => {
handleFilteredLog(log);
},
});

The streaming client automatically handles fallback from WebSocket to SSE:

┌─────────────────────────────────────────────────────────────┐
│ Streaming Client │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐│
│ │ Connection Manager ││
│ │ ││
│ │ 1. Try WebSocket connection ││
│ │ 2. If WebSocket fails → fallback to SSE ││
│ │ 3. Periodically attempt WebSocket recovery ││
│ │ 4. Seamless data flow during transitions ││
│ │ ││
│ └─────────────────────────────────────────────────────────┘│
│ │
│ ┌──────────────────┐ ┌──────────────────────────────┐ │
│ │ WebSocket │ │ SSE │ │
│ │ Client │ ───> │ Client │ │
│ │ │ │ │ │
│ │ Primary │ │ Fallback │ │
│ │ Full-duplex │ │ Server-push only │ │
│ └──────────────────┘ └──────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
import { LicenseMonitorStreamingClient } from '@/lib/streaming/license-monitor-streaming-client';
const streamingClient = new LicenseMonitorStreamingClient({
// WebSocket configuration
websocket: {
url: 'ws://license-monitor:8080/ws/logs',
pooling: {
enabled: true,
maxConnections: 3,
},
},
// SSE fallback configuration
sse: {
baseUrl: 'http://license-monitor:8080',
apiKey: process.env.LICENSE_MONITOR_API_KEY,
},
// Graceful degradation settings
degradation: {
enabled: true,
fallbackDelay: 5000, // Wait before falling back
recoveryInterval: 60000, // Try WebSocket recovery every minute
maxRecoveryAttempts: 5,
},
});
// The client handles protocol selection automatically
streamingClient.on('connectionChange', (info) => {
console.log(`Connection type: ${info.type}`); // 'websocket' or 'sse'
console.log(`Reason: ${info.reason}`);
});
streamingClient.on('message', (data) => {
// Messages flow seamlessly regardless of connection type
handleMessage(data);
});
streamingClient.on('degraded', () => {
console.warn('Fallen back to SSE');
});
streamingClient.on('recovered', () => {
console.log('Restored WebSocket connection');
});
// Start streaming
await streamingClient.connect();
streamingClient.subscribe('logs');
streamingClient.subscribe('metrics');
import { useGracefulDegradation } from '@/hooks/useGracefulDegradation';
function MonitoringDashboard() {
const {
connectionType,
isConnected,
isDegraded,
messages,
subscribe,
unsubscribe,
} = useGracefulDegradation({
websocketUrl: 'ws://license-monitor:8080/ws/combined',
sseBaseUrl: 'http://license-monitor:8080',
apiKey: process.env.NEXT_PUBLIC_LICENSE_MONITOR_API_KEY,
});
useEffect(() => {
subscribe('logs', { level: 'error' });
subscribe('metrics', { interval: 5000 });
return () => {
unsubscribe('logs');
unsubscribe('metrics');
};
}, []);
return (
<div>
<StatusIndicator
connected={isConnected}
degraded={isDegraded}
type={connectionType}
/>
<MessageList messages={messages} />
</div>
);
}
import { useServerHealth } from '@/hooks/useServerHealth';
function ServerStatus({ serverId }: { serverId: string }) {
const { health, isLoading, error, refetch } = useServerHealth(serverId, {
refetchInterval: 30000,
enableStreaming: true,
});
if (isLoading) return <Skeleton />;
if (error) return <ErrorMessage error={error} />;
return (
<HealthCard
status={health.status}
responseTime={health.responseTime}
uptime={health.uptime}
lastCheck={health.timestamp}
/>
);
}
┌─────────────┐
│ DISCONNECTED│
└──────┬──────┘
│ connect()
┌─────────────┐
│ CONNECTING │──────────────────────────────┐
└──────┬──────┘ │
│ success │ failure
▼ ▼
┌─────────────┐ ┌─────────────┐
│ CONNECTED │ │ RECONNECTING│
└──────┬──────┘ └──────┬──────┘
│ │
│ error/close │ max attempts
│ ────────────────────────────────────│
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ DEGRADED │◄──────────────────────│ FAILED │
│ (SSE) │ └─────────────┘
└──────┬──────┘
│ recovery
┌─────────────┐
│ CONNECTED │
│ (WebSocket) │
└─────────────┘
// The client automatically manages memory
const client = new LicenseMonitorWebSocketClient({
// Message buffer limits
messageBuffer: {
maxSize: 1000,
pruneThreshold: 800,
},
// Connection cleanup
cleanup: {
idleTimeout: 300000, // Close idle connections after 5 min
maxMessageAge: 60000, // Discard messages older than 1 min
},
});
// Batch message processing for performance
const sseClient = new LicenseMonitorSSE({
batchSize: 100, // Process up to 100 messages per batch
batchTimeout: 1000, // Or flush every second
onBatch: (messages) => {
// Process batch efficiently
bulkUpdateUI(messages);
},
});