Streaming Implementation - Cidadão.AI API¶
Status: ✅ Fully Implemented Technology: Server-Sent Events (SSE) + WebSockets Last Updated: 2025-11-14
Overview¶
The Cidadão.AI backend has complete streaming support across multiple endpoints using two complementary technologies:
- Server-Sent Events (SSE) - HTTP-based unidirectional streaming
- WebSockets - Bidirectional real-time communication
SSE Streaming Endpoints¶
1. Chat Streaming (/api/v1/chat/stream)¶
Purpose: Stream conversational AI responses in real-time
Endpoint: POST /api/v1/chat/stream
Features: - Progressive response generation - Intent detection feedback - Agent selection notification - Token-by-token streaming - Typing simulation effect
Event Types:
{"type": "start", "timestamp": "..."}
{"type": "detecting", "message": "Analisando sua mensagem..."}
{"type": "intent", "intent": "greeting", "confidence": 0.95}
{"type": "agent_selected", "agent_id": "drummond", "agent_name": "..."}
{"type": "chunk", "content": "Olá! Sou"}
{"type": "chunk", "content": "Carlos Drummond"}
{"type": "complete", "total_tokens": 150}
Example Usage:
curl -X POST "http://localhost:8000/api/v1/chat/stream" \
-H "Content-Type: application/json" \
-d '{"message": "Olá, como você pode me ajudar?"}' \
--no-buffer
Implementation: src/api/routes/chat.py:stream_message()
2. Maritaca Direct Streaming (/api/v1/chat/direct/maritaca/stream)¶
Purpose: Direct streaming from Maritaca.ai LLM
Endpoint: POST /api/v1/chat/direct/maritaca/stream
Features: - Native LLM streaming - Lower latency - Real-time token generation - Full Maritaca.ai API compatibility - Cost tracking per token
Event Format:
{"type": "token", "content": "palavra"}
{"type": "usage", "tokens": 50, "cost": 0.001}
{"type": "done"}
Example Usage:
curl -X POST "http://localhost:8000/api/v1/chat/direct/maritaca/stream" \
-H "Content-Type: application/json" \
-d '{
"messages": [
{"role": "user", "content": "Analise contratos de saúde"}
],
"stream": true
}' \
--no-buffer
Implementation: src/api/routes/chat.py:chat_with_maritaca_stream()
LLM Integration: src/services/maritaca_direct_service.py
3. Investigation Streaming (/api/v1/investigations/stream/{investigation_id})¶
Purpose: Real-time investigation progress and anomaly detection
Endpoint: GET /api/v1/investigations/stream/{investigation_id}
Features: - Live progress updates - Phase transitions - Anomaly notifications as detected - Records processed count - Completion status
Event Types:
{
"type": "progress",
"investigation_id": "uuid",
"progress": 45.5,
"current_phase": "analyzing_contracts",
"records_processed": 150,
"anomalies_detected": 3,
"timestamp": "2025-11-14T10:30:00"
}
{
"type": "anomaly",
"anomaly_id": "uuid",
"anomaly_type": "price_deviation",
"severity": "high",
"contract_id": "123456",
"details": {...}
}
{
"type": "complete",
"total_anomalies": 12,
"execution_time": "45.2s"
}
Example Usage:
curl "http://localhost:8000/api/v1/investigations/stream/inv-12345" \
-H "Authorization: Bearer YOUR_TOKEN" \
--no-buffer
Implementation: src/api/routes/investigations.py:stream_investigation_results()
4. Audit Log Streaming (/api/v1/audit/export)¶
Purpose: Stream large audit log exports without memory overflow
Endpoint: GET /api/v1/audit/export
Features: - Memory-efficient log export - Filter support (date range, severity, event type) - CSV/JSON streaming - No size limits
Query Parameters:
- start_date: ISO format date
- end_date: ISO format date
- severity: low|medium|high|critical
- format: csv|json (default: csv)
Example Usage:
curl "http://localhost:8000/api/v1/audit/export?start_date=2025-01-01&format=csv" \
-H "Authorization: Bearer YOUR_TOKEN" \
> audit_logs.csv
Implementation: src/api/routes/audit.py
WebSocket Endpoints¶
1. General WebSocket (/api/v1/ws)¶
Purpose: Bidirectional real-time communication
Features: - Message batching (reduces network overhead) - Connection keep-alive - Automatic reconnection - User authentication via query param
Connection:
const ws = new WebSocket(
'ws://localhost:8000/api/v1/ws?token=YOUR_JWT&connection_type=general'
);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
};
Implementation: src/api/routes/websocket_chat.py
2. Chat WebSocket (/api/v1/ws/chat/{session_id})¶
Purpose: Real-time chat sessions
Features: - Session-based conversations - Multi-user support - Message history - Typing indicators - Agent status updates
Connection:
const ws = new WebSocket(
'ws://localhost:8000/api/v1/ws/chat/session-123'
);
// Send message
ws.send(JSON.stringify({
type: 'message',
content: 'Quero investigar contratos'
}));
// Receive response
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === 'agent_response') {
console.log(msg.content);
}
};
Implementation: src/api/routes/websocket_chat.py
3. Investigation WebSocket (/api/v1/ws/investigations/{investigation_id})¶
Purpose: Live investigation updates via WebSocket
Features: - Same as SSE investigation streaming - Bidirectional (can send commands) - Lower latency than SSE - Better for real-time dashboards
Connection:
const ws = new WebSocket(
'ws://localhost:8000/api/v1/ws/investigations/inv-12345'
);
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
if (update.type === 'progress') {
updateProgressBar(update.progress);
} else if (update.type === 'anomaly') {
displayAnomaly(update.anomaly);
}
};
Implementation: src/api/routes/websocket_chat.py
Streaming Middleware¶
1. Streaming Compression Middleware¶
Location: src/api/middleware/streaming_compression.py
Features: - Compresses SSE streams on-the-fly - Configurable chunk size (default: 8KB) - Gzip compression - Maintains streaming semantics
Configuration:
app.add_middleware(
StreamingCompressionMiddleware,
minimum_size=256, # Only compress if > 256 bytes
compression_level=6, # Gzip level (1-9)
chunk_size=8192, # 8KB chunks
)
Performance Impact: - ~60% bandwidth reduction - Minimal latency increase (<10ms) - Better for mobile/slow connections
Performance Characteristics¶
| Endpoint | Latency (p50) | Latency (p95) | Throughput |
|---|---|---|---|
/chat/stream |
150ms | 350ms | ~100 msg/s |
/direct/maritaca/stream |
200ms | 500ms | ~50 req/s |
/investigations/stream |
50ms | 120ms | ~200 updates/s |
| WebSocket (chat) | 20ms | 80ms | ~500 msg/s |
| WebSocket (investigations) | 15ms | 60ms | ~1000 updates/s |
Tested with: 100 concurrent connections, 1000 req/min load
Client Integration Examples¶
JavaScript/TypeScript (Browser)¶
// SSE Streaming
async function streamChat(message: string) {
const response = await fetch('/api/v1/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
handleEvent(data);
}
}
}
}
// WebSocket
function connectWebSocket(sessionId: string) {
const ws = new WebSocket(`ws://localhost:8000/api/v1/ws/chat/${sessionId}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
handleMessage(data);
};
return ws;
}
Python¶
import httpx
# SSE Streaming
async def stream_chat(message: str):
async with httpx.AsyncClient() as client:
async with client.stream(
'POST',
'http://localhost:8000/api/v1/chat/stream',
json={'message': message}
) as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
data = json.loads(line[6:])
print(data)
# WebSocket
import websockets
async def connect_websocket():
async with websockets.connect(
'ws://localhost:8000/api/v1/ws/chat/session-123'
) as ws:
await ws.send(json.dumps({
'type': 'message',
'content': 'Hello'
}))
async for message in ws:
data = json.loads(message)
print(data)
cURL¶
# SSE Streaming
curl -N -X POST "http://localhost:8000/api/v1/chat/stream" \
-H "Content-Type: application/json" \
-d '{"message": "Olá"}' \
--no-buffer
# WebSocket (using websocat)
websocat ws://localhost:8000/api/v1/ws/chat/session-123
Error Handling¶
SSE Errors¶
Errors are sent as special SSE events:
{
"type": "error",
"error_code": "AGENT_UNAVAILABLE",
"message": "Serviço temporariamente indisponível",
"fallback_endpoint": "/api/v1/chat/message",
"retry_after": 5
}
WebSocket Errors¶
WebSocket errors trigger closure with specific codes:
| Code | Meaning | Action |
|---|---|---|
| 1008 | Authentication failed | Refresh token and reconnect |
| 1011 | Server error | Retry with exponential backoff |
| 1012 | Service restart | Reconnect immediately |
Monitoring & Metrics¶
All streaming endpoints expose Prometheus metrics:
# Request duration
http_request_duration_seconds{endpoint="/api/v1/chat/stream"}
# Active connections
websocket_connections_active{type="chat"}
# Streaming throughput
sse_events_sent_total{endpoint="/investigations/stream"}
# Error rates
streaming_errors_total{type="sse",endpoint="/chat/stream"}
Grafana Dashboard: Available at /api/v1/observability (see monitoring docs)
Configuration¶
Environment Variables¶
# Enable/disable streaming features
ENABLE_SSE_STREAMING=true
ENABLE_WEBSOCKET=true
# Compression
STREAMING_COMPRESSION=true
STREAMING_COMPRESSION_LEVEL=6
STREAMING_CHUNK_SIZE=8192
# Performance tuning
SSE_KEEPALIVE_INTERVAL=30 # seconds
WEBSOCKET_PING_INTERVAL=30 # seconds
WEBSOCKET_TIMEOUT=300 # seconds
MAX_CONCURRENT_STREAMS=1000
# Buffer sizes
SSE_BUFFER_SIZE=65536 # 64KB
WEBSOCKET_BUFFER_SIZE=131072 # 128KB
Best Practices¶
For SSE¶
- Always use
--no-bufferwith cURL: Prevents buffering - Set proper headers:
Cache-Control: no-cache,X-Accel-Buffering: no - Handle reconnections: Implement exponential backoff
- Monitor latency: Use
Last-Event-IDfor resumption
For WebSockets¶
- Implement ping/pong: Detect dead connections
- Handle reconnections: Auto-reconnect with state recovery
- Use message batching: Reduce network overhead
- Validate messages: Always validate incoming JSON
General¶
- Use compression: Enable for bandwidth savings
- Monitor metrics: Track active streams and error rates
- Set timeouts: Prevent zombie connections
- Rate limit: Protect against abuse
Troubleshooting¶
SSE not streaming in browser¶
Symptom: EventSource shows buffered responses
Solution: Check nginx/proxy configuration:
# Disable buffering for SSE
location /api/v1/chat/stream {
proxy_buffering off;
proxy_cache off;
proxy_set_header X-Accel-Buffering no;
}
WebSocket connection fails¶
Symptom: Immediate connection closure
Causes: 1. Missing authentication token 2. Invalid session ID 3. Firewall blocking WebSocket upgrade 4. Nginx not configured for WebSocket
Solution: Check connection URL and proxy config:
location /api/v1/ws {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
High memory usage¶
Symptom: Memory grows with active streams
Causes: 1. Large response buffering 2. No stream cleanup 3. Memory leaks in generators
Solution:
- Enable streaming compression
- Implement proper cleanup in finally blocks
- Use StreamingResponse with generators
- Monitor with /health/metrics
Future Enhancements¶
- HTTP/2 Server Push for preloading
- GraphQL subscriptions support
- MQTT integration for IoT devices
- Stream multiplexing for efficiency
- Adaptive compression based on network
- Client SDK with auto-reconnect
- Stream analytics dashboard
Related Documentation¶
- WebSocket Infrastructure:
src/infrastructure/websocket/ - Streaming Middleware:
src/api/middleware/streaming_compression.py - Maritaca Integration:
src/services/maritaca_direct_service.py - Chat Service:
src/services/chat_service.py - Investigation Service:
src/services/investigation_service.py
Questions? Check the inline documentation in route files or contact the architecture team.