Real-Time Systems & Messaging

Building Real-Time AI Apps: Complete Guide to WebSockets and LLM Streaming

MatterAI Agent
MatterAI Agent
8 min read·

Building Real-Time AI Applications with WebSockets and Streaming Responses

Real-time AI applications require bidirectional communication with low-latency token streaming. This guide covers implementing WebSocket connections that pipe LLM response streams directly to clients, enabling immediate text rendering as tokens are generated.

Architecture Overview

The data flow follows this pattern: Client sends prompt via WebSocket → Server validates and forwards to AI API → AI API streams tokens → Server forwards chunks via WebSocket → Client renders incrementally.

Key components:

  • WebSocket Server: Maintains persistent connections, handles authentication and message routing
  • AI Stream Consumer: Async generator that yields tokens from the LLM API
  • Message Framing: JSON-wrapped chunks with metadata (type, content, done status)
  • Connection Management: Reconnection logic, heartbeats, and timeout handling

Server-Side Implementation

FastAPI WebSocket Handler

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.websockets import WebSocketState
from pydantic import BaseModel, constr
import json
import asyncio
import time
from collections import defaultdict
from typing import Optional
from openai import AsyncOpenAI

app = FastAPI()

# CORS configuration
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourdomain.com"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# Reuse AI client across requests
ai_client = AsyncOpenAI()

# Rate limiting: max 10 requests per minute per IP
rate_limiter = defaultdict(list)
RATE_LIMIT = 10
RATE_WINDOW = 60

# Cleanup task for rate limiter memory
async def cleanup_rate_limiter():
    while True:
        await asyncio.sleep(RATE_WINDOW)
        now = time.time()
        for ip in list(rate_limiter.keys()):
            rate_limiter[ip] = [t for t in rate_limiter[ip] if now - t < RATE_WINDOW]
            if not rate_limiter[ip]:
                del rate_limiter[ip]

class PromptRequest(BaseModel):
    prompt: constr(min_length=1, max_length=4000)

async def stream_llm_response(prompt: str):
    """Async generator yielding tokens from AI API"""
    stream = await ai_client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )
    
    try:
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content
    finally:
        # Ensure stream is closed on client disconnect
        await stream.close()

async def verify_auth(websocket: WebSocket) -> bool:
    """Verify JWT or API key from headers or subprotocol"""
    # Check Authorization header first
    token = websocket.headers.get("authorization")
    
    # Fallback to query params for clients that can't set headers
    if not token:
        token = websocket.query_params.get("token")
    
    if not token or not token.startswith("Bearer "):
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return False
    
    # Extract and validate token with your auth provider
    # token_value = token.split(" ", 1)[1]
    # if not validate_jwt_token(token_value):
    #     await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
    #     return False
    
    return True

async def check_rate_limit(client_ip: str, websocket: WebSocket) -> bool:
    """Enforce rate limiting"""
    now = time.time()
    rate_limiter[client_ip] = [t for t in rate_limiter[client_ip] if now - t < RATE_WINDOW]
    if len(rate_limiter[client_ip]) >= RATE_LIMIT:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return False
    rate_limiter[client_ip].append(now)
    return True

@app.websocket("/ws/ai")
async def websocket_endpoint(
    websocket: WebSocket,
    client_ip: str = "127.0.0.1"  # Use real client IP from headers in production
):
    await websocket.accept()
    
    # Background task for heartbeat handling
    heartbeat_task = None
    
    try:
        # Authentication
        if not await verify_auth(websocket):
            return
        
        # Rate limiting
        if not await check_rate_limit(client_ip, websocket):
            return
        
        # Handle ping/pong for heartbeat - runs concurrently
        async def handle_messages():
            while True:
                try:
                    message = await asyncio.wait_for(websocket.receive_json(), timeout=1.0)
                    if message.get("type") == "ping":
                        await websocket.send_json({"type": "pong"})
                except asyncio.TimeoutError:
                    continue
                except Exception:
                    break
        
        heartbeat_task = asyncio.create_task(handle_messages())
        
        # Receive user prompt with timeout
        try:
            data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
            request = PromptRequest(**json.loads(data))
        except asyncio.TimeoutError:
            await websocket.send_json({"type": "error", "message": "Request timeout"})
            return
        except Exception as e:
            await websocket.send_json({"type": "error", "message": f"Invalid request: {str(e)}"})
            return
        
        # Stream AI response with per-iteration timeout
        try:
            async for chunk in stream_llm_response(request.prompt):
                # Timeout applies to each iteration, not the entire stream
                try:
                    await asyncio.wait_for(
                        websocket.send_json({
                            "type": "token",
                            "content": chunk,
                            "done": False
                        }),
                        timeout=5.0
                    )
                except asyncio.TimeoutError:
                    raise TimeoutError("Client not receiving data fast enough")
        except asyncio.TimeoutError:
            await websocket.send_json({"type": "error", "message": "Request timeout"})
            return
        except Exception as e:
            await websocket.send_json({"type": "error", "message": str(e)})
            return
        
        # Signal completion
        await websocket.send_json({"type": "done", "done": True})
        
    except WebSocketDisconnect:
        pass
    except Exception as e:
        await websocket.send_json({"type": "error", "message": str(e)})
    finally:
        # Cancel heartbeat task
        if heartbeat_task:
            heartbeat_task.cancel()
            try:
                await heartbeat_task
            except asyncio.CancelledError:
                pass
        
        if websocket.client_state == WebSocketState.CONNECTED:
            await websocket.close()

# Start cleanup task on startup
@app.on_event("startup")
async def startup_event():
    asyncio.create_task(cleanup_rate_limiter())

Node.js Alternative

const WebSocket = require('ws');
const OpenAI = require('openai');

const openai = new OpenAI();

const wss = new WebSocket.Server({ 
    port: 8080,
    perMessageDeflate: {
        zlibDeflateOptions: {
            level: 3
        },
        zlibInflateOptions: {},
        clientNoContextTakeover: true,
        serverNoContextTakeover: true,
        serverMaxWindowBits: 15,
        concurrencyLimit: 10,
        threshold: 1024
    },
    verifyClient: (info, cb) => {
        // CORS check
        const origin = info.origin;
        const allowedOrigins = ['https://yourdomain.com'];
        if (!allowedOrigins.includes(origin)) {
            return cb(false, 1008, 'Forbidden');
        }
        
        // Auth check - prefer Authorization header
        const authHeader = info.req.headers['authorization'];
        const token = authHeader || new URL(info.req.url, 'http://localhost').searchParams.get('token');
        
        if (!token || !token.startsWith('Bearer ')) {
            return cb(false, 1008, 'Unauthorized');
        }
        
        cb(true);
    }
});

// Rate limiting store
const rateLimiter = new Map();
const RATE_LIMIT = 10;
const RATE_WINDOW = 60000;

// Cleanup inactive IPs periodically
setInterval(() => {
    const now = Date.now();
    for (const [ip, requests] of rateLimiter.entries()) {
        const validRequests = requests.filter(t => now - t < RATE_WINDOW);
        if (validRequests.length === 0) {
            rateLimiter.delete(ip);
        } else {
            rateLimiter.set(ip, validRequests);
        }
    }
}, RATE_WINDOW);

function checkRateLimit(ip) {
    const now = Date.now();
    const requests = rateLimiter.get(ip) || [];
    const validRequests = requests.filter(t => now - t < RATE_WINDOW);
    
    if (validRequests.length >= RATE_LIMIT) {
        return false;
    }
    
    validRequests.push(now);
    rateLimiter.set(ip, validRequests);
    return true;
}

function safeSend(ws, data) {
    try {
        if (ws.readyState === WebSocket.OPEN) {
            ws.send(data);
            return true;
        }
    } catch (error) {
        console.error('Send error:', error.message);
    }
    return false;
}

wss.on('connection', (ws, req) => {
    const clientIp = req.socket.remoteAddress;
    let activeStream = null;
    
    if (!checkRateLimit(clientIp)) {
        ws.close(1008, 'Rate limit exceeded');
        return;
    }
    
    // Handle ping/pong for heartbeat
    ws.on('ping', () => {
        ws.pong();
    });
    
    ws.on('close', () => {
        // Cancel active AI stream if connection closes
        if (activeStream) {
            activeStream.controller.abort();
        }
    });
    
    ws.on('message', async (message) => {
        try {
            const data = JSON.parse(message);
            
            // Handle ping messages
            if (data.type === 'ping') {
                safeSend(ws, JSON.stringify({ type: 'pong' }));
                return;
            }
            
            // Validate prompt
            if (!data.prompt || typeof data.prompt !== 'string' || data.prompt.length > 4000) {
                safeSend(ws, JSON.stringify({ type: 'error', message: 'Invalid prompt' }));
                return;
            }
            
            // Set timeout for request
            const timeout = setTimeout(() => {
                safeSend(ws, JSON.stringify({ type: 'error', message: 'Request timeout' }));
                ws.close(1008, 'Timeout');
            }, 120000);
            
            const controller = new AbortController();
            activeStream = { controller };
            
            const stream = await openai.chat.completions.create({
                model: 'gpt-4',
                messages: [{ role: 'user', content: data.prompt }],
                stream: true,
                abortSignal: controller.signal
            });
            
            for await (const chunk of stream) {
                const content = chunk.choices[0]?.delta?.content;
                if (content) {
                    if (!safeSend(ws, JSON.stringify({ type: 'token', content, done: false }))) {
                        break;
                    }
                }
            }
            
            clearTimeout(timeout);
            activeStream = null;
            safeSend(ws, JSON.stringify({ type: 'done', done: true }));
        } catch (error) {
            if (error.name !== 'AbortError') {
                safeSend(ws, JSON.stringify({ type: 'error', message: error.message }));
            }
        }
    });
});

Client-Side Implementation

WebSocket Connection Handler

class AIWebSocket {
    constructor(url, options = {}) {
        this.url = url;
        this.buffer = '';
        this.onToken = null;
        this.onDone = null;
        this.onError = null;
        this.onConnected = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
        this.reconnectDelay = options.reconnectDelay || 1000;
        this.heartbeatInterval = options.heartbeatInterval || 30000;
        this.requestTimeout = options.requestTimeout || 120000;
        this.pendingMessages = [];
        this.activeRequestTimeout = null;
        this.isManuallyClosed = false;
        
        this.connect();
    }
    
    connect() {
        this.ws = new WebSocket(this.url);
        
        this.ws.onopen = () => {
            this.reconnectAttempts = 0;
            this.startHeartbeat();
            
            // Send any pending messages
            while (this.pendingMessages.length > 0) {
                this.ws.send(this.pendingMessages.shift());
            }
            
            this.onConnected?.();
        };
        
        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            
            switch (data.type) {
                case 'token':
                    this.buffer += data.content;
                    this.onToken?.(data.content, this.buffer);
                    break;
                case 'done':
                    this.clearRequestTimeout();
                    this.onDone?.(this.buffer);
                    break;
                case 'error':
                    this.clearRequestTimeout();
                    this.onError?.(data.message);
                    break;
                case 'pong':
                    // Connection alive, reset heartbeat
                    break;
            }
        };
        
        this.ws.onclose = () => {
            this.stopHeartbeat();
            this.clearRequestTimeout();
            
            if (!this.isManuallyClosed && this.reconnectAttempts < this.maxReconnectAttempts) {
                const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
                this.reconnectAttempts++;
                setTimeout(() => this.connect(), delay);
            }
        };
        
        this.ws.onerror = () => {
            this.clearRequestTimeout();
            this.onError?.('WebSocket connection error');
        };
    }
    
    startHeartbeat() {
        this.heartbeatTimer = setInterval(() => {
            if (this.ws.readyState === WebSocket.OPEN) {
                this.ws.send(JSON.stringify({ type: 'ping' }));
            }
        }, this.heartbeatInterval);
    }
    
    stopHeartbeat() {
        if (this.heartbeatTimer) {
            clearInterval(this.heartbeatTimer);
            this.heartbeatTimer = null;
        }
    }
    
    setRequestTimeout() {
        this.clearRequestTimeout();
        this.activeRequestTimeout = setTimeout(() => {
            this.onError?.('Request timeout');
            this.close();
        }, this.requestTimeout);
    }
    
    clearRequestTimeout() {
        if (this.activeRequestTimeout) {
            clearTimeout(this.activeRequestTimeout);
            this.activeRequestTimeout = null;
        }
    }
    
    sendPrompt(prompt) {
        const message = JSON.stringify({ prompt });
        
        if (this.ws.readyState === WebSocket.OPEN) {
            this.ws.send(message);
            this.buffer = '';
            this.setRequestTimeout();
        } else {
            // Queue message for when connection is established
            this.pendingMessages.push(message);
        }
    }
    
    close() {
        this.isManuallyClosed = true;
        this.stopHeartbeat();
        this.clearRequestTimeout();
        this.ws.close();
    }
}

React Integration Example

import { useState, useEffect, useRef, useCallback } from 'react';

function AIChat() {
    const [messages, setMessages] = useState([]);
    const [input, setInput] = useState('');
    const [isStreaming, setIsStreaming] = useState(false);
    const [isConnected, setIsConnected] = useState(false);
    const wsRef = useRef(null);
    const mountedRef = useRef(true);
    const currentMessageIdRef = useRef(null);
    
    useEffect(() => {
        mountedRef.current = true;
        currentMessageIdRef.current = null;
        
        // Use Authorization header via subprotocol or custom handshake
        // For browsers, token in URL is acceptable if using HTTPS and short-lived tokens
        wsRef.current = new AIWebSocket('wss://yourdomain.com/ws/ai?token=Bearer%20your-api-key', {
            maxReconnectAttempts: 5,
            reconnectDelay: 1000,
            heartbeatInterval: 30000,
            requestTimeout: 120000
        });
        
        wsRef.current.onConnected = () => {
            if (mountedRef.current) {
                setIsConnected(true);
            }
        };
        
        wsRef.current.onToken = (token, fullText) => {
            if (!mountedRef.current) return;
            
            setMessages(prev => {
                const messageId = currentMessageIdRef.current;
                
                // First token: append new assistant message
                if (messageId === null) {
                    const newMessage = { 
                        id: Date.now(), 
                        role: 'assistant', 
                        content: fullText 
                    };
                    currentMessageIdRef.current = newMessage.id;
                    return [...prev, newMessage];
                }
                
                // Subsequent tokens: update existing assistant message by ID
                return prev.map(msg => 
                    msg.id === messageId 
                        ? { ...msg, content: fullText }
                        : msg
                );
            });
        };
        
        wsRef.current.onDone = () => {
            if (mountedRef.current) {
                setIsStreaming(false);
                currentMessageIdRef.current = null;
            }
        };
        
        wsRef.current.onError = (error) => {
            if (mountedRef.current) {
                setIsStreaming(false);
                currentMessageIdRef.current = null;
                setMessages(prev => [...prev, { role: 'system', content: `Error: ${error}` }]);
            }
        };
        
        return () => {
            mountedRef.current = false;
            if (wsRef.current) {
                wsRef.current.close();
            }
        };
    }, []);
    
    const sendMessage = useCallback(() => {
        if (!input.trim() || !isConnected || isStreaming) return;
        
        setMessages(prev => [...prev, { role: 'user', content: input }]);
        setIsStreaming(true);
        wsRef.current.sendPrompt(input);
        setInput('');
    }, [input, isConnected, isStreaming]);
    
    return (
        <div>
            <div style={{ marginBottom: '10px' }}>
                Status: {isConnected ? 'Connected' : 'Disconnected'}
            </div>
            {messages.map((msg, i) => (
                <div key={msg.id || i}>
                    <strong>{msg.role}:</strong> {msg.content}
                </div>
            ))}
            {isStreaming && <span className="cursor"></span>}
            <input 
                value={input} 
                onChange={(e) => setInput(e.target.value)}
                onKeyDown={(e) => e.key === 'Enter' && sendMessage()}
                disabled={isStreaming || !isConnected}
            />
        </div>
    );
}

Optimization and Best Practices

Backpressure Management

Implement token buffering to prevent overwhelming the client during high-speed generation:

Python:

import asyncio

BUFFER_SIZE = 10
buffer = []

async for chunk in stream_llm_response(prompt):
    buffer.append(chunk)
    
    if len(buffer) >= BUFFER_SIZE:
        await websocket.send_json({
            "type": "token",
            "content": "".join(buffer),
            "done": False
        })
        buffer = []
        await asyncio.sleep(0.01)  # Yield control

# Flush remaining buffer
if buffer:
    await websocket.send_json({
        "type": "token",
        "content": "".join(buffer),
        "done": False
    })

JavaScript:

const BUFFER_SIZE = 10;
let buffer = [];

async function processStream() {
    for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content;
        if (content) {
            buffer.push(content);
            
            if (buffer.length >= BUFFER_SIZE) {
                ws.send(JSON.stringify({ 
                    type: 'token', 
                    content: buffer.join(''), 
                    done: false 
                }));
                buffer = [];
                await new Promise(resolve => setTimeout(resolve, 10)); // Yield control
            }
        }
    }
    
    // Flush remaining buffer
    if (buffer.length > 0) {
        ws.send(JSON.stringify({ 
            type: 'token', 
            content: buffer.join(''), 
            done: false 
        }));
    }
}

processStream().catch(console.error);

Connection Heartbeats

The AIWebSocket class implements application-level ping/pong heartbeats. The server responds to ping messages with pong, and the client resets its heartbeat timer on receiving pong responses. This mechanism detects dead connections and triggers reconnection when the heartbeat fails.

Error Handling Strategies

  • Reconnection Logic: Exponential backoff with configurable max attempts
  • Stream Recovery: Request last N tokens on reconnection (requires server-side state)
  • Timeout Handling: Abort long-running requests after threshold (default 120s)
  • Memory Leak Prevention: Use mounted ref checks in React callbacks
  • Message Queuing: Buffer messages sent before connection is ready
  • Rate Limiter Cleanup: Periodic removal of inactive IP entries to prevent memory growth
  • Stream Cancellation: Abort AI API streams when WebSocket closes to prevent wasted resources

Load Balancing Considerations

WebSocket connections require sticky sessions or connection-aware load balancers. Configure your load balancer to route all WebSocket frames from a client to the same backend server based on connection ID or source IP. Stateless architectures require a pub/sub system (Redis, NATS) to broadcast messages across instances.

Getting Started

  1. Choose backend framework: FastAPI (Python) or ws (Node.js)
  2. Configure authentication (JWT/API keys) and CORS origins
  3. Set up WebSocket endpoint with prompt validation and rate limiting
  4. Implement async generator for AI API streaming with timeout handling
  5. Create client WebSocket wrapper with message queuing and reconnection logic
  6. Add token buffering and application-level heartbeat mechanisms
  7. Enable WebSocket compression (permessage-deflate) for bandwidth optimization
  8. Configure load balancer with sticky sessions for multi-instance deployments
  9. Test with varying prompt lengths, connection speeds, and failure scenarios
  10. Monitor latency metrics: time-to-first-token and tokens-per-second

Critical implementation detail: Always flush the final buffer chunk and send a completion signal to prevent UI hanging. Ensure React components clean up WebSocket connections and check mounted state before state updates.

Share this Guide: