🔐 Segurança e Autenticação
Segurança em Primeiro Lugar
O Cidadão.AI implementa múltiplas camadas de segurança seguindo as melhores práticas da indústria, incluindo autenticação JWT, OAuth2, rate limiting, proteção contra ataques e auditoria completa.
🎯 Visão Geral
O sistema de segurança do Cidadão.AI fornece:
- Autenticação Multi-Fator: JWT + OAuth2 providers
- Autorização Granular: RBAC (Role-Based Access Control)
- Rate Limiting: Por IP, usuário e endpoint
- Proteção contra Ataques: XSS, SQL Injection, CSRF
- Auditoria Completa: Logs de segurança e compliance
🏗️ Arquitetura de Segurança
🔑 Autenticação JWT
Configuração
# Variáveis de ambiente necessárias
JWT_SECRET_KEY="your-secret-key-min-32-chars"
JWT_ALGORITHM="HS256"
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
Fluxo de Autenticação
Implementação
from src.api.auth import AuthManager
# Login endpoint
@app.post("/auth/login")
async def login(credentials: LoginRequest):
auth_manager = AuthManager()
# Validar credenciais
user = auth_manager.authenticate_user(
credentials.email,
credentials.password
)
if not user:
raise HTTPException(
status_code=401,
detail="Invalid credentials"
)
# Gerar tokens
access_token = auth_manager.create_access_token(
data={"sub": user.email, "role": user.role}
)
refresh_token = auth_manager.create_refresh_token(
data={"sub": user.email}
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
Token Structure
{
"sub": "user@example.com",
"role": "analyst",
"exp": 1706789456,
"iat": 1706787656,
"jti": "unique-token-id",
"iss": "cidadao.ai"
}
🌐 OAuth2 Integration
Providers Suportados
OAUTH_PROVIDERS = {
"google": {
"client_id": GOOGLE_CLIENT_ID,
"client_secret": GOOGLE_CLIENT_SECRET,
"authorize_url": "https://accounts.google.com/o/oauth2/auth",
"token_url": "https://oauth2.googleapis.com/token",
"userinfo_url": "https://www.googleapis.com/oauth2/v1/userinfo",
"scopes": ["openid", "email", "profile"]
},
"github": {
"client_id": GITHUB_CLIENT_ID,
"client_secret": GITHUB_CLIENT_SECRET,
"authorize_url": "https://github.com/login/oauth/authorize",
"token_url": "https://github.com/login/oauth/access_token",
"userinfo_url": "https://api.github.com/user",
"scopes": ["user:email"]
},
"microsoft": {
"client_id": MICROSOFT_CLIENT_ID,
"client_secret": MICROSOFT_CLIENT_SECRET,
"authorize_url": "https://login.microsoftonline.com/common/oauth2/v2.0/authorize",
"token_url": "https://login.microsoftonline.com/common/oauth2/v2.0/token",
"userinfo_url": "https://graph.microsoft.com/v1.0/me",
"scopes": ["openid", "email", "profile"]
},
"govbr": {
"client_id": GOVBR_CLIENT_ID,
"client_secret": GOVBR_CLIENT_SECRET,
"authorize_url": "https://sso.acesso.gov.br/authorize",
"token_url": "https://sso.acesso.gov.br/token",
"userinfo_url": "https://sso.acesso.gov.br/userinfo",
"scopes": ["openid", "email", "profile", "govbr_confiabilidade"]
}
}
OAuth2 Flow
# Iniciar OAuth2
@app.get("/auth/oauth/{provider}")
async def oauth_login(provider: str):
oauth_client = OAuth2Client(provider)
authorization_url = oauth_client.get_authorization_url(
redirect_uri=f"{API_URL}/auth/callback/{provider}"
)
return {"authorization_url": authorization_url}
# Callback OAuth2
@app.get("/auth/callback/{provider}")
async def oauth_callback(provider: str, code: str):
oauth_client = OAuth2Client(provider)
# Trocar código por token
token = await oauth_client.get_token(code)
# Obter informações do usuário
user_info = await oauth_client.get_user_info(token)
# Criar ou atualizar usuário
user = await create_or_update_oauth_user(user_info, provider)
# Gerar JWT
access_token = auth_manager.create_access_token(
data={"sub": user.email, "role": user.role}
)
return {"access_token": access_token}
🚦 Rate Limiting
Configuração
class RateLimitConfig:
# Limites globais
REQUESTS_PER_MINUTE = 60
REQUESTS_PER_HOUR = 1000
BURST_SIZE = 10
# Limites por endpoint
ENDPOINT_LIMITS = {
"/api/v1/investigations": {"rpm": 30, "rph": 500},
"/api/v1/agents/*": {"rpm": 100, "rph": 2000},
"/api/v1/reports": {"rpm": 10, "rph": 100},
"/auth/*": {"rpm": 20, "rph": 200}
}
# Limites por role
ROLE_MULTIPLIERS = {
"admin": 10.0,
"analyst": 2.0,
"user": 1.0,
"guest": 0.5
}
Implementação
class RateLimiter:
def __init__(self):
self.requests = defaultdict(lambda: deque(maxlen=1000))
self.blocked_ips = {}
async def check_rate_limit(
self,
request: Request,
user: Optional[User] = None
) -> bool:
ip = request.client.host
endpoint = request.url.path
# Verificar IP bloqueado
if ip in self.blocked_ips:
if datetime.utcnow() < self.blocked_ips[ip]:
raise HTTPException(
status_code=429,
detail="IP temporarily blocked"
)
else:
del self.blocked_ips[ip]
# Calcular limite baseado no usuário
limit = self._calculate_limit(endpoint, user)
# Verificar janela de tempo
now = time.time()
window_start = now - 60 # 1 minute window
# Limpar requests antigas
self.requests[ip] = deque(
[t for t in self.requests[ip] if t > window_start],
maxlen=1000
)
# Verificar limite
if len(self.requests[ip]) >= limit:
# Bloquear IP temporariamente
self.blocked_ips[ip] = datetime.utcnow() + timedelta(minutes=5)
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
# Registrar request
self.requests[ip].append(now)
return True
🛡️ Proteção contra Ataques
XSS Protection
# Headers de segurança
SECURITY_HEADERS = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Content-Security-Policy": "default-src 'self'",
"Referrer-Policy": "strict-origin-when-cross-origin"
}
# Validação de entrada
def sanitize_input(value: str) -> str:
"""Remove caracteres perigosos"""
# Remove tags HTML
value = re.sub(r'<[^>]*>', '', value)
# Escape caracteres especiais
value = html.escape(value)
# Remove padrões suspeitos
for pattern in SUSPICIOUS_PATTERNS:
value = re.sub(pattern, '', value, flags=re.IGNORECASE)
return value
SQL Injection Prevention
# Sempre use parâmetros preparados
async def get_investigation(investigation_id: str):
# CORRETO - Parâmetros preparados
query = """
SELECT * FROM investigations
WHERE id = :investigation_id
"""
result = await database.fetch_one(
query=query,
values={"investigation_id": investigation_id}
)
# ERRADO - Concatenação de strings
# query = f"SELECT * FROM investigations WHERE id = '{investigation_id}'"
CSRF Protection
# Token CSRF
class CSRFProtection:
def generate_token(self, session_id: str) -> str:
"""Gera token CSRF único"""
secret = settings.SECRET_KEY
message = f"{session_id}:{datetime.utcnow().isoformat()}"
return hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
def validate_token(
self,
token: str,
session_id: str,
max_age: int = 3600
) -> bool:
"""Valida token CSRF"""
expected = self.generate_token(session_id)
return hmac.compare_digest(token, expected)
🔒 Autorização (RBAC)
Roles e Permissões
class Role(Enum):
ADMIN = "admin"
ANALYST = "analyst"
USER = "user"
GUEST = "guest"
ROLE_PERMISSIONS = {
Role.ADMIN: [
"investigation:create",
"investigation:read",
"investigation:update",
"investigation:delete",
"agent:manage",
"user:manage",
"system:configure"
],
Role.ANALYST: [
"investigation:create",
"investigation:read",
"investigation:update",
"agent:use",
"report:generate"
],
Role.USER: [
"investigation:create",
"investigation:read",
"report:view"
],
Role.GUEST: [
"investigation:read",
"report:view"
]
}
Decorator de Permissão
def require_permission(permission: str):
def decorator(func):
async def wrapper(*args, **kwargs):
# Obter usuário atual
user = kwargs.get("current_user")
if not user:
raise HTTPException(
status_code=401,
detail="Authentication required"
)
# Verificar permissão
user_permissions = ROLE_PERMISSIONS.get(user.role, [])
if permission not in user_permissions:
raise HTTPException(
status_code=403,
detail="Insufficient permissions"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# Uso
@app.post("/api/v1/investigations")
@require_permission("investigation:create")
async def create_investigation(
request: InvestigationRequest,
current_user: User = Depends(get_current_user)
):
# Apenas usuários com permissão podem criar
pass
🗝️ Vault Integration
HashiCorp Vault
class VaultClient:
def __init__(self):
self.client = hvac.Client(
url=settings.VAULT_URL,
token=settings.VAULT_TOKEN
)
async def get_secret(self, path: str) -> Optional[Dict]:
"""Obter secret do Vault"""
try:
response = self.client.secrets.kv.v2.read_secret_version(
path=path,
mount_point="secret"
)
return response["data"]["data"]
except Exception as e:
logger.error(f"Vault error: {e}")
return None
async def rotate_jwt_secret(self):
"""Rotacionar JWT secret"""
new_secret = secrets.token_urlsafe(32)
self.client.secrets.kv.v2.create_or_update_secret(
path="jwt/secret_key",
secret={"value": new_secret},
mount_point="secret"
)
return new_secret
📝 Auditoria de Segurança
Event Logging
class SecurityAudit:
async def log_security_event(
self,
event_type: str,
severity: str,
user_id: Optional[str],
ip_address: str,
details: Dict[str, Any]
):
event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"severity": severity,
"user_id": user_id,
"ip_address": ip_address,
"details": details,
"hash": self._calculate_event_hash(details)
}
# Log para arquivo
audit_logger.log(event)
# Alertas críticos
if severity == "CRITICAL":
await self.send_security_alert(event)
Eventos Monitorados
SECURITY_EVENTS = {
"login_success": "INFO",
"login_failure": "WARNING",
"multiple_login_failures": "HIGH",
"suspicious_activity": "HIGH",
"unauthorized_access": "CRITICAL",
"rate_limit_exceeded": "WARNING",
"ip_blocked": "HIGH",
"token_theft_attempt": "CRITICAL",
"sql_injection_attempt": "CRITICAL",
"xss_attempt": "HIGH"
}
🚀 Best Practices
1. Configuração Segura
- Use variáveis de ambiente para secrets
- Rotacione secrets regularmente
- Use HTTPS sempre
- Configure CORS adequadamente
2. Validação de Dados
- Valide toda entrada do usuário
- Use schemas Pydantic
- Sanitize outputs
- Limite tamanhos de request
3. Monitoramento
- Monitore tentativas de login falhadas
- Detecte padrões suspeitos
- Alerte sobre atividades anormais
- Mantenha logs de auditoria
Próximo: Configuração de OAuth2 →