🎯 Arquitetura de Orquestração Multi-API para Investigações Automáticas¶
Autor: Anderson Henrique da Silva Data: 2025-10-14 16:30:00 -03:00 Status: PROPOSTA TÉCNICA
🎯 PROBLEMA¶
Com 200+ APIs governamentais disponíveis, como os agentes realizam investigações automáticas coordenando múltiplas fontes de dados?
Desafios: 1. ❌ Decidir quais APIs consultar para cada investigação 2. ❌ Correlacionar dados de fontes heterogêneas 3. ❌ Gerenciar latência (10+ chamadas de API) 4. ❌ Respeitar rate limits de cada API 5. ❌ Minimizar custos (algumas APIs são pagas) 6. ❌ Cachear dados inteligentemente 7. ❌ Lidar com falhas parciais
✅ SOLUÇÃO: 3 Camadas de Orquestração¶
┌─────────────────────────────────────────────────────────┐
│ CAMADA 1: Query Understanding & Planning │
│ (Entende a pergunta, decide quais APIs usar) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ CAMADA 2: Data Federation Layer │
│ (Orquestra chamadas, correlaciona dados) │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ CAMADA 3: Unified Entity Graph │
│ (Grafo de conhecimento unificado) │
└─────────────────────────────────────────────────────────┘
🧠 CAMADA 1: Query Understanding & Planning¶
1.1 Intent Classification¶
Objetivo: Classificar a intenção do usuário e mapear para APIs relevantes.
class InvestigationIntent:
"""Tipos de investigação e APIs necessárias."""
SUPPLIER_INVESTIGATION = {
"name": "Investigação de Fornecedor",
"apis": [
"minha_receita", # Dados cadastrais CNPJ
"pncp", # Licitações atuais
"compras_gov", # Histórico de contratos
"cnd", # Regularidade fiscal
"tse", # Doações eleitorais (se aplicável)
],
"sequence": "sequential", # Ordem de execução
"parallel_groups": [
["minha_receita"], # 1. Primeiro
["pncp", "compras_gov"], # 2. Paralelo após CNPJ
["cnd", "tse"], # 3. Paralelo após contratos
]
}
CONTRACT_ANOMALY_DETECTION = {
"name": "Detecção de Anomalias em Contratos",
"apis": [
"pncp", # Contratos para análise
"ibge", # Índices econômicos (contexto)
"bcb", # SELIC, inflação (contexto)
"minha_receita", # Dados de fornecedores
],
"sequence": "mixed",
"parallel_groups": [
["pncp"], # 1. Buscar contratos
["ibge", "bcb"], # 2. Contexto econômico (paralelo)
["minha_receita"], # 3. Enriquecer com CNPJs
]
}
HEALTH_BUDGET_ANALYSIS = {
"name": "Análise Orçamentária de Saúde",
"apis": [
"siconfi", # Finanças municipais/estaduais
"datasus", # Dados de saúde
"ibge", # Demografia
"portal_transp", # Gastos federais
],
"sequence": "mixed",
}
class QueryPlanner:
"""Planeja execução de queries baseado na intenção."""
async def plan_investigation(
self,
query: str,
context: InvestigationContext
) -> ExecutionPlan:
"""
Analisa a query e cria plano de execução.
Args:
query: Pergunta do usuário
context: Contexto da investigação (CPF, CNPJ, período)
Returns:
ExecutionPlan com APIs, ordem, paralelização
"""
# 1. Classificar intenção usando LLM
intent = await self._classify_intent(query)
# 2. Extrair entidades (CNPJ, CPF, datas, órgãos)
entities = await self._extract_entities(query, context)
# 3. Selecionar APIs relevantes
relevant_apis = self._select_apis(intent, entities)
# 4. Criar plano de execução otimizado
plan = self._create_execution_plan(
intent,
entities,
relevant_apis
)
return plan
1.2 Exemplo de Classificação¶
# Input
query = "Investigue contratos da empresa XYZ LTDA nos últimos 2 anos"
# Output do QueryPlanner
ExecutionPlan(
intent="SUPPLIER_INVESTIGATION",
entities={
"company_name": "XYZ LTDA",
"cnpj": None, # Será descoberto
"date_range": ("2023-01-01", "2025-10-14"),
},
stages=[
Stage(
name="Identificação",
apis=["minha_receita"],
method="search_by_name",
parallel=False,
reason="Descobrir CNPJ da empresa",
),
Stage(
name="Busca de Contratos",
apis=["pncp", "compras_gov"],
method="search_contracts",
parallel=True, # Executar em paralelo
depends_on=["Identificação"],
reason="Buscar licitações e contratos",
),
Stage(
name="Enriquecimento",
apis=["cnd", "portal_transp"],
method="check_compliance",
parallel=True,
depends_on=["Busca de Contratos"],
reason="Verificar regularidade e valores",
),
],
estimated_duration_seconds=15,
cache_strategy="aggressive",
)
🔄 CAMADA 2: Data Federation Layer¶
2.1 Unified Data Access¶
Objetivo: Camada de abstração que unifica acesso a todas as APIs.
class DataFederationService:
"""Serviço de federação de dados de múltiplas APIs."""
def __init__(self):
self.api_registry = APIRegistry()
self.cache = UnifiedCache()
self.metrics = FederationMetrics()
async def execute_investigation_plan(
self,
plan: ExecutionPlan
) -> InvestigationResult:
"""
Executa plano de investigação com orquestração inteligente.
Recursos:
- Paralelização automática
- Circuit breaker para APIs instáveis
- Fallback para fontes alternativas
- Deduplicação de chamadas
- Cache distribuído
"""
results = InvestigationResult()
for stage in plan.stages:
stage_results = await self._execute_stage(stage, results)
results.add_stage_results(stage.name, stage_results)
return results
async def _execute_stage(
self,
stage: Stage,
previous_results: InvestigationResult
) -> dict:
"""Executa um estágio do plano."""
if stage.parallel:
# Executar APIs em paralelo
tasks = [
self._call_api(api, stage, previous_results)
for api in stage.apis
]
results = await asyncio.gather(*tasks, return_exceptions=True)
else:
# Executar sequencialmente
results = []
for api in stage.apis:
result = await self._call_api(api, stage, previous_results)
results.append(result)
return self._aggregate_stage_results(results)
async def _call_api(
self,
api_name: str,
stage: Stage,
context: InvestigationResult
) -> APIResponse:
"""
Chama API com circuit breaker e fallback.
"""
# 1. Verificar cache
cache_key = self._generate_cache_key(api_name, stage, context)
if cached := await self.cache.get(cache_key):
self.metrics.record_cache_hit(api_name)
return cached
# 2. Obter cliente da API
client = self.api_registry.get_client(api_name)
# 3. Preparar parâmetros baseado no contexto
params = self._prepare_api_params(api_name, stage, context)
# 4. Executar com circuit breaker
try:
result = await self._call_with_circuit_breaker(
client,
stage.method,
params
)
# 5. Cachear resultado
await self.cache.set(
cache_key,
result,
ttl=self._get_cache_ttl(api_name)
)
return result
except APIError as e:
# 6. Tentar fallback se disponível
if fallback_api := self._get_fallback_api(api_name):
logger.warning(f"{api_name} failed, trying {fallback_api}")
return await self._call_api(fallback_api, stage, context)
# 7. Falha parcial aceitável
logger.error(f"API {api_name} failed: {e}")
return APIResponse(status="failed", error=str(e))
2.2 Circuit Breaker Pattern¶
class CircuitBreaker:
"""Previne chamadas para APIs instáveis."""
STATES = ["CLOSED", "OPEN", "HALF_OPEN"]
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
expected_exception: type = APIError
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED"
async def call(self, func, *args, **kwargs):
"""Executa função com circuit breaker."""
if self.state == "OPEN":
if self._should_attempt_reset():
self.state = "HALF_OPEN"
else:
raise CircuitBreakerOpen(
f"Circuit breaker OPEN for {func.__name__}"
)
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _on_success(self):
"""Resetar contador em sucesso."""
self.failure_count = 0
if self.state == "HALF_OPEN":
self.state = "CLOSED"
def _on_failure(self):
"""Incrementar contador em falha."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
2.3 Fallback Strategy¶
class FallbackRegistry:
"""Mapeia APIs para alternativas."""
FALLBACKS = {
"portal_transp": [
"pncp", # Para contratos
"compras_gov", # Para licitações antigas
],
"serpro_cnpj": [
"minha_receita", # Alternativa gratuita
],
"cnd_federal": [
"portal_transp", # Pode ter info parcial
],
}
def get_fallback(self, api_name: str) -> Optional[str]:
"""Retorna API de fallback."""
fallbacks = self.FALLBACKS.get(api_name, [])
return fallbacks[0] if fallbacks else None
🕸️ CAMADA 3: Unified Entity Graph¶
3.1 Knowledge Graph¶
Objetivo: Grafo de conhecimento unificado que conecta entidades de todas as APIs.
class EntityGraph:
"""
Grafo de entidades unificado.
Permite consultas como:
- "Todos os contratos da empresa X"
- "Fornecedores do órgão Y que têm pendências fiscais"
- "Correlação entre doações eleitorais e contratos"
"""
def __init__(self):
self.graph = nx.MultiDiGraph()
self.entity_index = {}
async def add_entity(
self,
entity_type: str,
entity_id: str,
data: dict,
source_api: str
):
"""
Adiciona entidade ao grafo.
Args:
entity_type: "company", "contract", "person", "agency"
entity_id: Identificador único (CNPJ, CPF, etc)
data: Dados da entidade
source_api: API de origem
"""
node_id = f"{entity_type}:{entity_id}"
self.graph.add_node(
node_id,
entity_type=entity_type,
entity_id=entity_id,
data=data,
source_api=source_api,
updated_at=datetime.now(),
)
# Indexar para busca rápida
self.entity_index[entity_id] = node_id
async def link_entities(
self,
from_entity: str,
to_entity: str,
relationship: str,
metadata: dict = None
):
"""
Cria relacionamento entre entidades.
Exemplos:
- Company X "won_contract" Contract Y
- Person A "is_partner_of" Company B
- Company C "donated_to" Politician D
"""
self.graph.add_edge(
from_entity,
to_entity,
relationship=relationship,
metadata=metadata or {},
created_at=datetime.now(),
)
async def query_graph(self, cypher_query: str) -> list:
"""
Consulta o grafo usando query similar a Cypher.
Example:
query = '''
MATCH (c:company)-[:won_contract]->(ct:contract)
WHERE ct.value > 1000000
RETURN c.name, ct.value, ct.date
'''
"""
# Implementação de query engine
pass
async def find_anomalies(self, entity_id: str) -> list[Anomaly]:
"""
Detecta anomalias analisando o grafo.
Exemplos:
- Empresa nova com contratos grandes
- Concentração de contratos em poucos fornecedores
- Sócios em comum entre concorrentes
"""
anomalies = []
# Análise de padrões suspeitos
node = self.graph.nodes[entity_id]
# 1. Verificar idade da empresa vs tamanho de contratos
if self._is_new_company_large_contracts(node):
anomalies.append(Anomaly(
type="new_company_large_contract",
severity="high",
description="Empresa recente com contratos acima da média",
))
# 2. Verificar rede de relacionamentos
if self._has_suspicious_network(node):
anomalies.append(Anomaly(
type="suspicious_network",
severity="medium",
description="Relacionamentos suspeitos detectados",
))
return anomalies
3.2 Exemplo de Grafo Construído¶
┌──────────────┐
│ Company: │
│ 12.345.678/ │────won_contract───┐
│ 0001-01 │ │
└──────────────┘ ↓
│ ┌──────────────┐
│ │ Contract: │
│ │ 2024/001 │
is_partner_of │ R$ 5.000.000 │
│ └──────────────┘
↓ │
┌──────────────┐ │
│ Person: │ │
│ 111.222.333 │──donated_to───────┤
│ -44 │ │
└──────────────┘ ↓
┌──────────────┐
│ Agency: │
│ Ministry X │
└──────────────┘
🚀 IMPLEMENTAÇÃO PRÁTICA¶
Fase 1: Query Planning Service (2 semanas)¶
# src/services/query_planner/
├── intent_classifier.py # Classificação de intenção
├── entity_extractor.py # Extração de entidades
├── execution_planner.py # Criação de planos
└── templates/
├── supplier_investigation.yaml
├── contract_anomaly.yaml
└── budget_analysis.yaml
Fase 2: Data Federation (3 semanas)¶
# src/services/data_federation/
├── federation_service.py # Orquestração
├── circuit_breaker.py # Resiliência
├── fallback_registry.py # Fallbacks
├── unified_cache.py # Cache distribuído
└── api_registry.py # Registro de APIs
Fase 3: Entity Graph (4 semanas)¶
# src/services/entity_graph/
├── graph_builder.py # Construção do grafo
├── entity_linker.py # Link entre entidades
├── query_engine.py # Engine de consultas
└── anomaly_detector.py # Detecção de anomalias
📊 EXEMPLO DE FLUXO COMPLETO¶
Input¶
Processamento¶
1. Query Planning (2s)
plan = QueryPlanner.plan_investigation(query)
# Intent: CONTRACT_ANOMALY_DETECTION
# Entities: {"company_name": "ACME LTDA", "year": 2024}
# APIs: minha_receita, pncp, compras_gov, bcb, ibge
2. Data Federation (10s)
# Stage 1: Identificar empresa (2s)
company = await minha_receita.search("ACME LTDA")
# CNPJ: 12.345.678/0001-01
# Stage 2: Buscar contratos (paralelo, 5s)
contracts_pncp = await pncp.search_contracts(cnpj="12.345.678/0001-01", year=2024)
contracts_old = await compras_gov.search_contracts(cnpj="12.345.678/0001-01")
# Stage 3: Contexto econômico (paralelo, 3s)
selic_2024 = await bcb.get_selic(year=2024)
ipca_2024 = await ibge.get_ipca(year=2024)
3. Entity Graph Construction (2s)
# Adicionar ao grafo
graph.add_entity("company", company.cnpj, company.dict())
for contract in contracts:
graph.add_entity("contract", contract.id, contract.dict())
graph.link_entities(company.cnpj, contract.id, "won_contract")
4. Anomaly Detection (1s)
anomalies = graph.find_anomalies(company.cnpj)
# - Empresa criada em 2023, contratos de R$ 10M em 2024
# - Preços 30% acima da média do mercado
# - Único vencedor em 80% das licitações do órgão X
5. Response (15s total)
InvestigationResult(
company=company,
contracts_found=25,
total_value=50_000_000,
anomalies=[
Anomaly(type="new_company_large_contract", severity="high"),
Anomaly(type="price_above_market", severity="medium"),
Anomaly(type="supplier_concentration", severity="high"),
],
data_sources=["Minha Receita", "PNCP", "BCB", "IBGE"],
confidence_score=0.85,
)
🎯 BENEFÍCIOS DA ARQUITETURA¶
✅ Para os Agentes¶
- Automação Total: Agentes não precisam saber quais APIs usar
- Velocidade: Paralelização inteligente reduz latência
- Resiliência: Fallbacks automáticos em falhas
- Contexto Rico: Grafo fornece visão 360° das entidades
✅ Para o Sistema¶
- Escalabilidade: Adicionar novas APIs sem modificar agentes
- Observabilidade: Métricas de cada API e estágio
- Custo: Cache agressivo reduz chamadas repetidas
- Qualidade: Correlação de múltiplas fontes aumenta confiança
✅ Para os Usuários¶
- Investigações Profundas: 10+ fontes em uma consulta
- Respostas Rápidas: 15s para investigações complexas
- Confiabilidade: Sistema continua funcionando com APIs parcialmente offline
- Transparência: Sabe exatamente quais fontes foram usadas
📝 PRÓXIMOS PASSOS¶
- ✅ Documentar arquitetura (FEITO)
- ⏳ Implementar QueryPlanner (2 semanas)
- ⏳ Implementar DataFederation (3 semanas)
- ⏳ Implementar EntityGraph (4 semanas)
- ⏳ Integrar com agentes existentes (1 semana)
- ⏳ Testes de carga e benchmarks (1 semana)
Total: 11 semanas (2.5 meses)
Última Atualização: 2025-10-14 16:30:00 -03:00 Status: PROPOSTA TÉCNICA COMPLETA Responsável: Anderson Henrique da Silva