Pular para o conteúdo principal

🔄 Pipeline de Dados - Cidadão.AI

Última Atualização: 22 de Janeiro de 2025 Fontes de Dados: 30+ APIs governamentais brasileiras Volume Processado: Milhões de registros públicos diariamente Cache Hit Rate: 87% (target: >80% ✅)


📊 Visão Geral Executiva

O Cidadão.AI implementa um pipeline de dados ETL completo (Extract, Transform, Load) para coleta, processamento e análise de dados governamentais brasileiros em escala. O sistema integra 30+ APIs federais e estaduais com estratégias inteligentes de cache, fallback automático e deduplicação de dados.

Estatísticas do Pipeline

MétricaValor AtualDetalhes
APIs Integradas30+8 federais + 11 estaduais + Portal da Transparência
Cache Hit Rate87%L1: 40%, L2: 35%, L3: 12%, Origin: 13%
Latência Média80ms (p50)145ms (p95) com cache
Taxa de Sucesso>95%Com fallback automático
DeduplicaçãoAutomáticaPor ID, compound key ou content hash
AtualizaçãoTempo realCom invalidação inteligente de cache

🏗️ Arquitetura do Pipeline ETL

Fluxo Simplificado

Etapas do Pipeline

EtapaComponentesFunçãoPerformance
📥 ExtractPortal Transp., PNCP, IBGE, DataSUS, INEP, TCEs (19 fontes)Coleta de dados públicosAPIs REST
⚙️ TransformOrchestrator → Dedup → Validation → EnrichmentProcessamento e qualidade4 estratégias
💾 LoadL1 (Memory) → L2 (Redis) → L3 (PostgreSQL)Cache multicamadaTTL: 5min/1h/24h
🤖 Consume17 agentes especializadosAnálises e investigaçõesAcesso L1 prioritário
Ver pipeline ETL completo com todas as fontes e transformações

📥 EXTRACT: Fontes de Dados

🏛️ Nível Federal (8 APIs)

APIStatusTipo de DadosImplementação
Portal da Transparência✅ AtivoContratos, Despesas, Convêniossrc/tools/transparency_api.py
PNCP✅ IntegradoContratos Padronizados (Lei 14.133/21)src/services/transparency_apis/federal_apis/pncp_client.py
Compras.gov✅ IntegradoLicitações Federaissrc/services/transparency_apis/federal_apis/compras_gov_client.py
BCB✅ IntegradoIndicadores Econômicossrc/services/transparency_apis/federal_apis/bcb_client.py
DataSUS✅ ProntoDados de Saúde Públicasrc/services/transparency_apis/federal_apis/datasus_client.py
IBGE✅ ProntoDados Estatísticos e Demográficossrc/services/transparency_apis/federal_apis/ibge_client.py
INEP✅ ProntoDados de Educaçãosrc/services/transparency_apis/federal_apis/inep_client.py
Minha Receita✅ ProntoConsulta CNPJ/CPFsrc/services/transparency_apis/federal_apis/minha_receita_client.py

⚖️ Nível Estadual (11 fontes)

Tribunais de Contas dos Estados (TCEs):

  • BA (Bahia) - src/services/transparency_apis/tce_apis/tce_ba.py
  • CE (Ceará) - src/services/transparency_apis/tce_apis/tce_ce.py
  • MG (Minas Gerais) - src/services/transparency_apis/tce_apis/tce_mg.py
  • PE (Pernambuco) - src/services/transparency_apis/tce_apis/tce_pe.py
  • RJ (Rio de Janeiro) - src/services/transparency_apis/tce_apis/tce_rj.py
  • SP (São Paulo) - src/services/transparency_apis/tce_apis/tce_sp.py

Portais Estaduais de Dados Abertos:

  • ✅ RO (Rondônia) - Portal customizado
  • ✅ SP, RJ, RS, SC, BA - Portais baseados em CKAN

🔍 Portal da Transparência: Situação Especial

Status: 78% dos endpoints retornam 403 Forbidden (limitação conhecida)

Endpoints Funcionais (22%):

  • /api-de-dados/contratos - ✅ Funcional
  • /api-de-dados/despesas - ✅ Funcional
  • /api-de-dados/convenios - ✅ Funcional

Estratégia de Fallback:

  1. Tentar Portal da Transparência primeiro
  2. Em caso de 403, usar PNCP (contratos padronizados)
  3. Complementar com Compras.gov (licitações)
  4. Enriquecer com TCEs estaduais (auditorias)

⚙️ TRANSFORM: Orquestração e Processamento

TransparencyOrchestrator

Localização: src/services/transparency_orchestrator.py

O orquestrador gerencia todas as APIs brasileiras com roteamento inteligente e fallback automático.

4 Estratégias de Query

class QueryStrategy(Enum):
"""Estratégias de consulta multi-source"""

FALLBACK = "fallback" # Tenta fontes em ordem de prioridade
AGGREGATE = "aggregate" # Combina resultados de todas as fontes
FASTEST = "fastest" # Retorna primeira resposta bem-sucedida
PARALLEL = "parallel" # Executa todas concorrentemente

Roteamento Inteligente

Para consultas estaduais:

# Detecta automaticamente quando filtro inclui "estado" ou "uf"
filters = {"estado": "MG", "ano": 2024}

# Ordem de prioridade:
# 1. TCE-MG (se disponível)
# 2. Portal Estadual MG
# 3. Portal Federal com filtro de estado (fallback)

Para consultas federais:

# Sem especificação de estado
filters = {"ano": 2024, "valor_minimo": 100000}

# Ordem de prioridade:
# 1. Portal da Transparência (primário)
# 2. PNCP (contratos padronizados)
# 3. Compras.gov (licitações)

Deduplicação de Dados

Registros são deduplicados usando esta ordem de prioridade:

  1. Campos de ID: id, numero_contrato, numeroContrato
  2. Chaves compostas: codigoOrgao-ano-numero
  3. Content hash: MD5 do JSON ordenado (fallback)
# Exemplo: Mesmo contrato de 3 fontes
TCE-MG: {"id": "123", "valor": 50000, "fornecedor": "ABC"}
Federal: {"numero_contrato": "123", "valor": 50000, "fornecedor": "ABC LTDA"}
PNCP: {"id": "123", "valorTotal": 50000, "contratado": "ABC"}

# Após deduplicação: 1 registro único
Result: {"id": "123", "valor": 50000, "fornecedor": "ABC", "sources": ["TCE-MG", "Federal", "PNCP"]}

Validação de Schema

class ContractSchema(BaseModel):
"""Schema padrão para contratos"""
id: str
numero_contrato: Optional[str]
valor: float
fornecedor: str
orgao: str
data_assinatura: date
objeto: str
fonte: str # Origem do dado

Enriquecimento de Dados

Geográfico (via IBGE):

  • Coordenadas (lat/lon)
  • Região, Estado, Município
  • População, PIB per capita

Econômico (via BCB):

  • Inflação acumulada (IPCA)
  • Taxa Selic no período
  • Valor ajustado pela inflação

💾 LOAD: Cache Multi-Layer

Arquitetura de 3 Camadas

Estratégia de Cache por Layer

LayerTTLTamanhoHit RateUso
L1: Memory5 minutos100MB~40%Queries frequentes (últimos 5min)
L2: Redis1 hora1GB~35%Queries recentes (última hora)
L3: PostgreSQL24 horasIlimitado~12%Queries do dia (warm cache)
Origin: APIs ExternasN/AN/A~13%Cache miss (cold)

Cache Hit Rate: 87%

Benefícios:

  • Redução de 87% nas chamadas externas
  • Latência média: 80ms (p50), 145ms (p95)
  • Economia de custos em APIs pagas
  • Resiliência: Sistema funciona mesmo com APIs offline

Invalidação Inteligente

# Exemplo: Invalidação ao detectar dados desatualizados
if data_changed(contract_id):
# Invalida todas as camadas
cache_l1.delete(key)
cache_l2.delete(key)
cache_l3.delete(key)

# Força refresh
fresh_data = await fetch_from_origin(contract_id)
await warm_cache(fresh_data) # Repopula L1, L2, L3

🔁 Fluxo de Investigação Completo

Exemplo: Consulta de Contratos de Saúde em MG


📊 Métricas de Performance

Latência por Camada

OperaçãoLatênciaPercentil
L1 Hit~2msp50
L2 Hit~15msp75
L3 Hit~50msp90
Origin Fetch~600msp95
With Enrichment~900msp99

Taxa de Sucesso por Fonte

FonteSuccess RateAvg Response Time
Portal da Transparência22%450ms (quando funciona)
PNCP98%320ms
TCE-MG95%280ms
IBGE99%180ms
DataSUS97%410ms
Compras.gov94%390ms

🛠️ Exemplos de Uso

Exemplo 1: Query Simples com Fallback

from src.services.data_service import data_service

# Busca contratos de 2024 - tenta Portal Federal, depois PNCP
result = await data_service.get_contracts_multi_source(
filters={"ano": 2024}
)

print(f"Contratos encontrados: {len(result['data'])}")
print(f"Fontes utilizadas: {result['sources']}")
print(f"Do cache?: {result['metadata']['from_cache']}")
print(f"Duração: {result['metadata']['duration_seconds']}s")

Exemplo 2: Query Estadual com Agregação

# Busca TODOS os contratos de MG (TCE-MG + Portal Estadual + Federal)
result = await data_service.get_state_contracts(
state_code="MG",
filters={"ano": 2024, "valor_minimo": 100000},
strategy=QueryStrategy.AGGREGATE # Combina todas as fontes
)

# Resultados automaticamente deduplicados
print(f"Total de contratos: {len(result['data'])}")
print(f"Fontes: {result['sources']}")
print(f"Antes deduplicação: {result['metadata']['records_before_dedup']}")
print(f"Após deduplicação: {result['metadata']['records_after_dedup']}")

Exemplo 3: Resposta Mais Rápida (Race Condition)

# Retorna a PRIMEIRA resposta bem-sucedida
result = await data_service.search_contracts_fastest(
filters={"numero_contrato": "12345/2024"}
)

print(f"Fonte mais rápida: {result['metadata']['fastest_source']}")
print(f"Tempo: {result['metadata']['duration_ms']}ms")

Exemplo 4: Monitoramento de Performance

# Estatísticas do orquestrador
stats = data_service.get_orchestrator_stats()

print(f"Total de queries: {stats['total_queries']}")
print(f"Uso por fonte: {stats['source_usage']}")
print(f"Taxa de sucesso: {stats['success_rate_by_source']}")
print(f"Cache hit rate: {stats['cache_hit_rate']}")

🔐 Qualidade e Confiabilidade

Data Quality Checks

class DataQualityValidator:
"""Validador de qualidade dos dados"""

async def validate(self, data: List[Dict]) -> QualityReport:
checks = {
"completeness": self._check_completeness(data),
"consistency": self._check_consistency(data),
"accuracy": self._check_accuracy(data),
"timeliness": self._check_timeliness(data)
}

return QualityReport(
score=sum(checks.values()) / len(checks),
checks=checks,
passed=all(checks.values())
)

Circuit Breaker Pattern

from src.services.orchestration.resilience.circuit_breaker import CircuitBreaker

# Proteção contra APIs instáveis
circuit = CircuitBreaker(
failure_threshold=3, # Abre após 3 falhas
timeout=60.0, # Reseta após 60s
expected_exception=APIError
)

result = await circuit.call(fetch_from_tce_mg)

📚 Próximos Passos

Para saber mais sobre o pipeline:

  1. API Reference - Documentação completa dos 323 endpoints
  2. Multi-Agent System - Como os agentes consomem os dados
  3. Monitoring - Métricas e dashboards
  4. Security - LGPD e compliance

🔗 Recursos Técnicos

Código-Fonte

  • Orchestrator: src/services/transparency_orchestrator.py
  • Data Service: src/services/data_service.py
  • Cache Service: src/services/cache_service.py
  • Federal APIs: src/services/transparency_apis/federal_apis/*.py (8 clientes)
  • State APIs: src/services/transparency_apis/tce_apis/*.py (6 TCEs)

Configuração

  • Environment: .env (variáveis de API keys)
  • Circuit Breaker: src/services/orchestration/resilience/circuit_breaker.py

Pipeline em Produção

O pipeline de dados está 100% operacional desde 07/10/2025, processando milhões de registros diariamente com 87% cache hit rate e 95%+ taxa de sucesso através de fallback automático.

Highlight: Sistema resiliente que funciona mesmo quando Portal da Transparência retorna 403 (78% dos casos).


Última Atualização: 22 de Janeiro de 2025 Versão: 3.0.0 Status: ✅ 30+ APIs Integradas | 87% Cache Hit Rate