🔄 Pipeline de Dados - Cidadão.AI
Última Atualização: 22 de Janeiro de 2025 Fontes de Dados: 30+ APIs governamentais brasileiras Volume Processado: Milhões de registros públicos diariamente Cache Hit Rate: 87% (target: >80% ✅)
📊 Visão Geral Executiva
O Cidadão.AI implementa um pipeline de dados ETL completo (Extract, Transform, Load) para coleta, processamento e análise de dados governamentais brasileiros em escala. O sistema integra 30+ APIs federais e estaduais com estratégias inteligentes de cache, fallback automático e deduplicação de dados.
Estatísticas do Pipeline
| Métrica | Valor Atual | Detalhes |
|---|---|---|
| APIs Integradas | 30+ | 8 federais + 11 estaduais + Portal da Transparência |
| Cache Hit Rate | 87% | L1: 40%, L2: 35%, L3: 12%, Origin: 13% |
| Latência Média | 80ms (p50) | 145ms (p95) com cache |
| Taxa de Sucesso | >95% | Com fallback automático |
| Deduplicação | Automática | Por ID, compound key ou content hash |
| Atualização | Tempo real | Com invalidação inteligente de cache |
🏗️ Arquitetura do Pipeline ETL
Fluxo Simplificado
Etapas do Pipeline
| Etapa | Componentes | Função | Performance |
|---|---|---|---|
| 📥 Extract | Portal Transp., PNCP, IBGE, DataSUS, INEP, TCEs (19 fontes) | Coleta de dados públicos | APIs REST |
| ⚙️ Transform | Orchestrator → Dedup → Validation → Enrichment | Processamento e qualidade | 4 estratégias |
| 💾 Load | L1 (Memory) → L2 (Redis) → L3 (PostgreSQL) | Cache multicamada | TTL: 5min/1h/24h |
| 🤖 Consume | 17 agentes especializados | Análises e investigações | Acesso L1 prioritário |
Ver pipeline ETL completo com todas as fontes e transformações
📥 EXTRACT: Fontes de Dados
🏛️ Nível Federal (8 APIs)
| API | Status | Tipo de Dados | Implementação |
|---|---|---|---|
| Portal da Transparência | ✅ Ativo | Contratos, Despesas, Convênios | src/tools/transparency_api.py |
| PNCP | ✅ Integrado | Contratos Padronizados (Lei 14.133/21) | src/services/transparency_apis/federal_apis/pncp_client.py |
| Compras.gov | ✅ Integrado | Licitações Federais | src/services/transparency_apis/federal_apis/compras_gov_client.py |
| BCB | ✅ Integrado | Indicadores Econômicos | src/services/transparency_apis/federal_apis/bcb_client.py |
| DataSUS | ✅ Pronto | Dados de Saúde Pública | src/services/transparency_apis/federal_apis/datasus_client.py |
| IBGE | ✅ Pronto | Dados Estatísticos e Demográficos | src/services/transparency_apis/federal_apis/ibge_client.py |
| INEP | ✅ Pronto | Dados de Educação | src/services/transparency_apis/federal_apis/inep_client.py |
| Minha Receita | ✅ Pronto | Consulta CNPJ/CPF | src/services/transparency_apis/federal_apis/minha_receita_client.py |
⚖️ Nível Estadual (11 fontes)
Tribunais de Contas dos Estados (TCEs):
- ✅ BA (Bahia) -
src/services/transparency_apis/tce_apis/tce_ba.py - ✅ CE (Ceará) -
src/services/transparency_apis/tce_apis/tce_ce.py - ✅ MG (Minas Gerais) -
src/services/transparency_apis/tce_apis/tce_mg.py - ✅ PE (Pernambuco) -
src/services/transparency_apis/tce_apis/tce_pe.py - ✅ RJ (Rio de Janeiro) -
src/services/transparency_apis/tce_apis/tce_rj.py - ✅ SP (São Paulo) -
src/services/transparency_apis/tce_apis/tce_sp.py
Portais Estaduais de Dados Abertos:
- ✅ RO (Rondônia) - Portal customizado
- ✅ SP, RJ, RS, SC, BA - Portais baseados em CKAN
🔍 Portal da Transparência: Situação Especial
Status: 78% dos endpoints retornam 403 Forbidden (limitação conhecida)
Endpoints Funcionais (22%):
/api-de-dados/contratos- ✅ Funcional/api-de-dados/despesas- ✅ Funcional/api-de-dados/convenios- ✅ Funcional
Estratégia de Fallback:
- Tentar Portal da Transparência primeiro
- Em caso de 403, usar PNCP (contratos padronizados)
- Complementar com Compras.gov (licitações)
- Enriquecer com TCEs estaduais (auditorias)
⚙️ TRANSFORM: Orquestração e Processamento
TransparencyOrchestrator
Localização: src/services/transparency_orchestrator.py
O orquestrador gerencia todas as APIs brasileiras com roteamento inteligente e fallback automático.
4 Estratégias de Query
class QueryStrategy(Enum):
"""Estratégias de consulta multi-source"""
FALLBACK = "fallback" # Tenta fontes em ordem de prioridade
AGGREGATE = "aggregate" # Combina resultados de todas as fontes
FASTEST = "fastest" # Retorna primeira resposta bem-sucedida
PARALLEL = "parallel" # Executa todas concorrentemente
Roteamento Inteligente
Para consultas estaduais:
# Detecta automaticamente quando filtro inclui "estado" ou "uf"
filters = {"estado": "MG", "ano": 2024}
# Ordem de prioridade:
# 1. TCE-MG (se disponível)
# 2. Portal Estadual MG
# 3. Portal Federal com filtro de estado (fallback)
Para consultas federais:
# Sem especificação de estado
filters = {"ano": 2024, "valor_minimo": 100000}
# Ordem de prioridade:
# 1. Portal da Transparência (primário)
# 2. PNCP (contratos padronizados)
# 3. Compras.gov (licitações)
Deduplicação de Dados
Registros são deduplicados usando esta ordem de prioridade:
- Campos de ID:
id,numero_contrato,numeroContrato - Chaves compostas:
codigoOrgao-ano-numero - Content hash: MD5 do JSON ordenado (fallback)
# Exemplo: Mesmo contrato de 3 fontes
TCE-MG: {"id": "123", "valor": 50000, "fornecedor": "ABC"}
Federal: {"numero_contrato": "123", "valor": 50000, "fornecedor": "ABC LTDA"}
PNCP: {"id": "123", "valorTotal": 50000, "contratado": "ABC"}
# Após deduplicação: 1 registro único
Result: {"id": "123", "valor": 50000, "fornecedor": "ABC", "sources": ["TCE-MG", "Federal", "PNCP"]}
Validação de Schema
class ContractSchema(BaseModel):
"""Schema padrão para contratos"""
id: str
numero_contrato: Optional[str]
valor: float
fornecedor: str
orgao: str
data_assinatura: date
objeto: str
fonte: str # Origem do dado
Enriquecimento de Dados
Geográfico (via IBGE):
- Coordenadas (lat/lon)
- Região, Estado, Município
- População, PIB per capita
Econômico (via BCB):
- Inflação acumulada (IPCA)
- Taxa Selic no período
- Valor ajustado pela inflação
💾 LOAD: Cache Multi-Layer
Arquitetura de 3 Camadas
Estratégia de Cache por Layer
| Layer | TTL | Tamanho | Hit Rate | Uso |
|---|---|---|---|---|
| L1: Memory | 5 minutos | 100MB | ~40% | Queries frequentes (últimos 5min) |
| L2: Redis | 1 hora | 1GB | ~35% | Queries recentes (última hora) |
| L3: PostgreSQL | 24 horas | Ilimitado | ~12% | Queries do dia (warm cache) |
| Origin: APIs Externas | N/A | N/A | ~13% | Cache miss (cold) |
Cache Hit Rate: 87%
Benefícios:
- Redução de 87% nas chamadas externas
- Latência média: 80ms (p50), 145ms (p95)
- Economia de custos em APIs pagas
- Resiliência: Sistema funciona mesmo com APIs offline
Invalidação Inteligente
# Exemplo: Invalidação ao detectar dados desatualizados
if data_changed(contract_id):
# Invalida todas as camadas
cache_l1.delete(key)
cache_l2.delete(key)
cache_l3.delete(key)
# Força refresh
fresh_data = await fetch_from_origin(contract_id)
await warm_cache(fresh_data) # Repopula L1, L2, L3
🔁 Fluxo de Investigação Completo
Exemplo: Consulta de Contratos de Saúde em MG
📊 Métricas de Performance
Latência por Camada
| Operação | Latência | Percentil |
|---|---|---|
| L1 Hit | ~2ms | p50 |
| L2 Hit | ~15ms | p75 |
| L3 Hit | ~50ms | p90 |
| Origin Fetch | ~600ms | p95 |
| With Enrichment | ~900ms | p99 |
Taxa de Sucesso por Fonte
| Fonte | Success Rate | Avg Response Time |
|---|---|---|
| Portal da Transparência | 22% | 450ms (quando funciona) |
| PNCP | 98% | 320ms |
| TCE-MG | 95% | 280ms |
| IBGE | 99% | 180ms |
| DataSUS | 97% | 410ms |
| Compras.gov | 94% | 390ms |
🛠️ Exemplos de Uso
Exemplo 1: Query Simples com Fallback
from src.services.data_service import data_service
# Busca contratos de 2024 - tenta Portal Federal, depois PNCP
result = await data_service.get_contracts_multi_source(
filters={"ano": 2024}
)
print(f"Contratos encontrados: {len(result['data'])}")
print(f"Fontes utilizadas: {result['sources']}")
print(f"Do cache?: {result['metadata']['from_cache']}")
print(f"Duração: {result['metadata']['duration_seconds']}s")
Exemplo 2: Query Estadual com Agregação
# Busca TODOS os contratos de MG (TCE-MG + Portal Estadual + Federal)
result = await data_service.get_state_contracts(
state_code="MG",
filters={"ano": 2024, "valor_minimo": 100000},
strategy=QueryStrategy.AGGREGATE # Combina todas as fontes
)
# Resultados automaticamente deduplicados
print(f"Total de contratos: {len(result['data'])}")
print(f"Fontes: {result['sources']}")
print(f"Antes deduplicação: {result['metadata']['records_before_dedup']}")
print(f"Após deduplicação: {result['metadata']['records_after_dedup']}")
Exemplo 3: Resposta Mais Rápida (Race Condition)
# Retorna a PRIMEIRA resposta bem-sucedida
result = await data_service.search_contracts_fastest(
filters={"numero_contrato": "12345/2024"}
)
print(f"Fonte mais rápida: {result['metadata']['fastest_source']}")
print(f"Tempo: {result['metadata']['duration_ms']}ms")
Exemplo 4: Monitoramento de Performance
# Estatísticas do orquestrador
stats = data_service.get_orchestrator_stats()
print(f"Total de queries: {stats['total_queries']}")
print(f"Uso por fonte: {stats['source_usage']}")
print(f"Taxa de sucesso: {stats['success_rate_by_source']}")
print(f"Cache hit rate: {stats['cache_hit_rate']}")
🔐 Qualidade e Confiabilidade
Data Quality Checks
class DataQualityValidator:
"""Validador de qualidade dos dados"""
async def validate(self, data: List[Dict]) -> QualityReport:
checks = {
"completeness": self._check_completeness(data),
"consistency": self._check_consistency(data),
"accuracy": self._check_accuracy(data),
"timeliness": self._check_timeliness(data)
}
return QualityReport(
score=sum(checks.values()) / len(checks),
checks=checks,
passed=all(checks.values())
)
Circuit Breaker Pattern
from src.services.orchestration.resilience.circuit_breaker import CircuitBreaker
# Proteção contra APIs instáveis
circuit = CircuitBreaker(
failure_threshold=3, # Abre após 3 falhas
timeout=60.0, # Reseta após 60s
expected_exception=APIError
)
result = await circuit.call(fetch_from_tce_mg)