Pular para conteúdo

Streaming Implementation - Cidadão.AI API

Status: ✅ Fully Implemented Technology: Server-Sent Events (SSE) + WebSockets Last Updated: 2025-11-14


Overview

The Cidadão.AI backend has complete streaming support across multiple endpoints using two complementary technologies:

  1. Server-Sent Events (SSE) - HTTP-based unidirectional streaming
  2. WebSockets - Bidirectional real-time communication

SSE Streaming Endpoints

1. Chat Streaming (/api/v1/chat/stream)

Purpose: Stream conversational AI responses in real-time

Endpoint: POST /api/v1/chat/stream

Features: - Progressive response generation - Intent detection feedback - Agent selection notification - Token-by-token streaming - Typing simulation effect

Event Types:

{"type": "start", "timestamp": "..."}
{"type": "detecting", "message": "Analisando sua mensagem..."}
{"type": "intent", "intent": "greeting", "confidence": 0.95}
{"type": "agent_selected", "agent_id": "drummond", "agent_name": "..."}
{"type": "chunk", "content": "Olá! Sou"}
{"type": "chunk", "content": "Carlos Drummond"}
{"type": "complete", "total_tokens": 150}

Example Usage:

curl -X POST "http://localhost:8000/api/v1/chat/stream" \
  -H "Content-Type: application/json" \
  -d '{"message": "Olá, como você pode me ajudar?"}' \
  --no-buffer

Implementation: src/api/routes/chat.py:stream_message()


2. Maritaca Direct Streaming (/api/v1/chat/direct/maritaca/stream)

Purpose: Direct streaming from Maritaca.ai LLM

Endpoint: POST /api/v1/chat/direct/maritaca/stream

Features: - Native LLM streaming - Lower latency - Real-time token generation - Full Maritaca.ai API compatibility - Cost tracking per token

Event Format:

{"type": "token", "content": "palavra"}
{"type": "usage", "tokens": 50, "cost": 0.001}
{"type": "done"}

Example Usage:

curl -X POST "http://localhost:8000/api/v1/chat/direct/maritaca/stream" \
  -H "Content-Type: application/json" \
  -d '{
    "messages": [
      {"role": "user", "content": "Analise contratos de saúde"}
    ],
    "stream": true
  }' \
  --no-buffer

Implementation: src/api/routes/chat.py:chat_with_maritaca_stream()

LLM Integration: src/services/maritaca_direct_service.py


3. Investigation Streaming (/api/v1/investigations/stream/{investigation_id})

Purpose: Real-time investigation progress and anomaly detection

Endpoint: GET /api/v1/investigations/stream/{investigation_id}

Features: - Live progress updates - Phase transitions - Anomaly notifications as detected - Records processed count - Completion status

Event Types:

{
  "type": "progress",
  "investigation_id": "uuid",
  "progress": 45.5,
  "current_phase": "analyzing_contracts",
  "records_processed": 150,
  "anomalies_detected": 3,
  "timestamp": "2025-11-14T10:30:00"
}

{
  "type": "anomaly",
  "anomaly_id": "uuid",
  "anomaly_type": "price_deviation",
  "severity": "high",
  "contract_id": "123456",
  "details": {...}
}

{
  "type": "complete",
  "total_anomalies": 12,
  "execution_time": "45.2s"
}

Example Usage:

curl "http://localhost:8000/api/v1/investigations/stream/inv-12345" \
  -H "Authorization: Bearer YOUR_TOKEN" \
  --no-buffer

Implementation: src/api/routes/investigations.py:stream_investigation_results()


4. Audit Log Streaming (/api/v1/audit/export)

Purpose: Stream large audit log exports without memory overflow

Endpoint: GET /api/v1/audit/export

Features: - Memory-efficient log export - Filter support (date range, severity, event type) - CSV/JSON streaming - No size limits

Query Parameters: - start_date: ISO format date - end_date: ISO format date - severity: low|medium|high|critical - format: csv|json (default: csv)

Example Usage:

curl "http://localhost:8000/api/v1/audit/export?start_date=2025-01-01&format=csv" \
  -H "Authorization: Bearer YOUR_TOKEN" \
  > audit_logs.csv

Implementation: src/api/routes/audit.py


WebSocket Endpoints

1. General WebSocket (/api/v1/ws)

Purpose: Bidirectional real-time communication

Features: - Message batching (reduces network overhead) - Connection keep-alive - Automatic reconnection - User authentication via query param

Connection:

const ws = new WebSocket(
  'ws://localhost:8000/api/v1/ws?token=YOUR_JWT&connection_type=general'
);

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Received:', data);
};

Implementation: src/api/routes/websocket_chat.py


2. Chat WebSocket (/api/v1/ws/chat/{session_id})

Purpose: Real-time chat sessions

Features: - Session-based conversations - Multi-user support - Message history - Typing indicators - Agent status updates

Connection:

const ws = new WebSocket(
  'ws://localhost:8000/api/v1/ws/chat/session-123'
);

// Send message
ws.send(JSON.stringify({
  type: 'message',
  content: 'Quero investigar contratos'
}));

// Receive response
ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  if (msg.type === 'agent_response') {
    console.log(msg.content);
  }
};

Implementation: src/api/routes/websocket_chat.py


3. Investigation WebSocket (/api/v1/ws/investigations/{investigation_id})

Purpose: Live investigation updates via WebSocket

Features: - Same as SSE investigation streaming - Bidirectional (can send commands) - Lower latency than SSE - Better for real-time dashboards

Connection:

const ws = new WebSocket(
  'ws://localhost:8000/api/v1/ws/investigations/inv-12345'
);

ws.onmessage = (event) => {
  const update = JSON.parse(event.data);

  if (update.type === 'progress') {
    updateProgressBar(update.progress);
  } else if (update.type === 'anomaly') {
    displayAnomaly(update.anomaly);
  }
};

Implementation: src/api/routes/websocket_chat.py


Streaming Middleware

1. Streaming Compression Middleware

Location: src/api/middleware/streaming_compression.py

Features: - Compresses SSE streams on-the-fly - Configurable chunk size (default: 8KB) - Gzip compression - Maintains streaming semantics

Configuration:

app.add_middleware(
    StreamingCompressionMiddleware,
    minimum_size=256,           # Only compress if > 256 bytes
    compression_level=6,        # Gzip level (1-9)
    chunk_size=8192,           # 8KB chunks
)

Performance Impact: - ~60% bandwidth reduction - Minimal latency increase (<10ms) - Better for mobile/slow connections


Performance Characteristics

Endpoint Latency (p50) Latency (p95) Throughput
/chat/stream 150ms 350ms ~100 msg/s
/direct/maritaca/stream 200ms 500ms ~50 req/s
/investigations/stream 50ms 120ms ~200 updates/s
WebSocket (chat) 20ms 80ms ~500 msg/s
WebSocket (investigations) 15ms 60ms ~1000 updates/s

Tested with: 100 concurrent connections, 1000 req/min load


Client Integration Examples

JavaScript/TypeScript (Browser)

// SSE Streaming
async function streamChat(message: string) {
  const response = await fetch('/api/v1/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message })
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split('\n');

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6));
        handleEvent(data);
      }
    }
  }
}

// WebSocket
function connectWebSocket(sessionId: string) {
  const ws = new WebSocket(`ws://localhost:8000/api/v1/ws/chat/${sessionId}`);

  ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    handleMessage(data);
  };

  return ws;
}

Python

import httpx

# SSE Streaming
async def stream_chat(message: str):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            'POST',
            'http://localhost:8000/api/v1/chat/stream',
            json={'message': message}
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith('data: '):
                    data = json.loads(line[6:])
                    print(data)

# WebSocket
import websockets

async def connect_websocket():
    async with websockets.connect(
        'ws://localhost:8000/api/v1/ws/chat/session-123'
    ) as ws:
        await ws.send(json.dumps({
            'type': 'message',
            'content': 'Hello'
        }))

        async for message in ws:
            data = json.loads(message)
            print(data)

cURL

# SSE Streaming
curl -N -X POST "http://localhost:8000/api/v1/chat/stream" \
  -H "Content-Type: application/json" \
  -d '{"message": "Olá"}' \
  --no-buffer

# WebSocket (using websocat)
websocat ws://localhost:8000/api/v1/ws/chat/session-123

Error Handling

SSE Errors

Errors are sent as special SSE events:

{
  "type": "error",
  "error_code": "AGENT_UNAVAILABLE",
  "message": "Serviço temporariamente indisponível",
  "fallback_endpoint": "/api/v1/chat/message",
  "retry_after": 5
}

WebSocket Errors

WebSocket errors trigger closure with specific codes:

Code Meaning Action
1008 Authentication failed Refresh token and reconnect
1011 Server error Retry with exponential backoff
1012 Service restart Reconnect immediately

Monitoring & Metrics

All streaming endpoints expose Prometheus metrics:

# Request duration
http_request_duration_seconds{endpoint="/api/v1/chat/stream"}

# Active connections
websocket_connections_active{type="chat"}

# Streaming throughput
sse_events_sent_total{endpoint="/investigations/stream"}

# Error rates
streaming_errors_total{type="sse",endpoint="/chat/stream"}

Grafana Dashboard: Available at /api/v1/observability (see monitoring docs)


Configuration

Environment Variables

# Enable/disable streaming features
ENABLE_SSE_STREAMING=true
ENABLE_WEBSOCKET=true

# Compression
STREAMING_COMPRESSION=true
STREAMING_COMPRESSION_LEVEL=6
STREAMING_CHUNK_SIZE=8192

# Performance tuning
SSE_KEEPALIVE_INTERVAL=30        # seconds
WEBSOCKET_PING_INTERVAL=30       # seconds
WEBSOCKET_TIMEOUT=300            # seconds
MAX_CONCURRENT_STREAMS=1000

# Buffer sizes
SSE_BUFFER_SIZE=65536            # 64KB
WEBSOCKET_BUFFER_SIZE=131072     # 128KB

Best Practices

For SSE

  1. Always use --no-buffer with cURL: Prevents buffering
  2. Set proper headers: Cache-Control: no-cache, X-Accel-Buffering: no
  3. Handle reconnections: Implement exponential backoff
  4. Monitor latency: Use Last-Event-ID for resumption

For WebSockets

  1. Implement ping/pong: Detect dead connections
  2. Handle reconnections: Auto-reconnect with state recovery
  3. Use message batching: Reduce network overhead
  4. Validate messages: Always validate incoming JSON

General

  1. Use compression: Enable for bandwidth savings
  2. Monitor metrics: Track active streams and error rates
  3. Set timeouts: Prevent zombie connections
  4. Rate limit: Protect against abuse

Troubleshooting

SSE not streaming in browser

Symptom: EventSource shows buffered responses

Solution: Check nginx/proxy configuration:

# Disable buffering for SSE
location /api/v1/chat/stream {
    proxy_buffering off;
    proxy_cache off;
    proxy_set_header X-Accel-Buffering no;
}

WebSocket connection fails

Symptom: Immediate connection closure

Causes: 1. Missing authentication token 2. Invalid session ID 3. Firewall blocking WebSocket upgrade 4. Nginx not configured for WebSocket

Solution: Check connection URL and proxy config:

location /api/v1/ws {
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}

High memory usage

Symptom: Memory grows with active streams

Causes: 1. Large response buffering 2. No stream cleanup 3. Memory leaks in generators

Solution: - Enable streaming compression - Implement proper cleanup in finally blocks - Use StreamingResponse with generators - Monitor with /health/metrics


Future Enhancements

  • HTTP/2 Server Push for preloading
  • GraphQL subscriptions support
  • MQTT integration for IoT devices
  • Stream multiplexing for efficiency
  • Adaptive compression based on network
  • Client SDK with auto-reconnect
  • Stream analytics dashboard

  • WebSocket Infrastructure: src/infrastructure/websocket/
  • Streaming Middleware: src/api/middleware/streaming_compression.py
  • Maritaca Integration: src/services/maritaca_direct_service.py
  • Chat Service: src/services/chat_service.py
  • Investigation Service: src/services/investigation_service.py

Questions? Check the inline documentation in route files or contact the architecture team.