Dependency Injection com Depends
A injeção de dependências é um padrão fundamental em arquiteturas bem construídas. No FastAPI, o sistema de Depends permite que você declare dependências de forma limpa e reutilizável, evitando código duplicado e melhorando a testabilidade. Diferentemente de frameworks mais antigos, FastAPI resolve dependências automaticamente durante a execução da requisição, injetando-as nos parâmetros da sua função manipuladora.
O conceito funciona assim: você cria funções que retornam valores ou objetos que serão injetados em outras funções. Essas funções podem ter suas próprias dependências, criando uma árvore de resoluções que o FastAPI processa de forma inteligente, inclusive cache para requisições da mesma função dentro do mesmo escopo.
Dependências Simples
As dependências mais básicas são funções que retornam um valor específico. Imagine validar um token ou recuperar um usuário do banco de dados:
from fastapi import FastAPI, Depends, HTTPException, status
app = FastAPI()
# Simulando um banco de dados em memória
users_db = {
"user123": {"id": "user123", "name": "João Silva", "email": "joao@example.com"}
}
def get_current_user(user_id: str = "user123") -> dict:
"""Dependência que busca o usuário atual."""
if user_id not in users_db:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Usuário não encontrado")
return users_db[user_id]
@app.get("/profile")
def get_profile(current_user: dict = Depends(get_current_user)):
"""Endpoint que usa a dependência de usuário."""
return {"profile": current_user, "message": f"Bem-vindo, {current_user['name']}"}
Neste exemplo, get_current_user é uma dependência que valida se o usuário existe. Quando uma rota usa Depends(get_current_user), o FastAPI chama a função automaticamente e passa seu resultado. Se a função lançar uma exceção, o FastAPI trata e retorna a resposta HTTP apropriada.
Dependências com Sub-dependências
Uma dependência pode depender de outras dependências. Isso cria uma hierarquia que o FastAPI resolve recursivamente:
from fastapi import FastAPI, Depends, HTTPException, status
from typing import Optional
app = FastAPI()
def verify_token(authorization: Optional[str] = None) -> str:
"""Verifica se o token está presente."""
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token inválido")
return authorization.replace("Bearer ", "")
def get_current_user(token: str = Depends(verify_token)) -> dict:
"""Dependência que verifica o token e retorna o usuário."""
# Em produção, você decodificaria o JWT aqui
if token == "valid_token_123":
return {"id": "user1", "name": "Maria", "email": "maria@example.com"}
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expirado")
@app.get("/secure-data")
def get_secure_data(current_user: dict = Depends(get_current_user)):
"""Endpoint protegido que usa sub-dependências."""
return {"data": "Informação confidencial", "user": current_user["name"]}
Aqui, get_current_user depende de verify_token. O FastAPI chama verify_token primeiro, depois passa seu resultado para get_current_user. Se qualquer dependência falhar, a cadeia inteira é interrompida.
Dependências de Classe
Para casos mais complexos, você pode usar classes como dependências. Isso é particularmente útil quando você precisa reutilizar lógica ou estado:
from fastapi import FastAPI, Depends, HTTPException, status
app = FastAPI()
class PaginationParams:
"""Classe para parametrização de paginação."""
def __init__(self, skip: int = 0, limit: int = 10):
if skip < 0:
raise HTTPException(status_code=400, detail="skip não pode ser negativo")
if limit <= 0 or limit > 100:
raise HTTPException(status_code=400, detail="limit deve estar entre 1 e 100")
self.skip = skip
self.limit = limit
@app.get("/items")
def list_items(pagination: PaginationParams = Depends()):
"""Endpoint que usa classe como dependência."""
items = [f"item_{i}" for i in range(pagination.skip, pagination.skip + pagination.limit)]
return {"skip": pagination.skip, "limit": pagination.limit, "items": items}
Quando você usa Depends() sem argumentos em uma classe, o FastAPI automaticamente chama o método __init__ e injeta os parâmetros da URL/query. Isso é extremamente elegante para parametrização reutilizável.
Middleware: Processando Requisições Globalmente
Middleware são funções que processam requisições e respostas em um nível global, antes que cheguem aos endpoints e depois que saem deles. Diferentemente de dependências, que são específicas de rotas, middleware afeta toda a aplicação. Você pode usá-los para logging, autenticação global, modificação de headers, ou tratamento transversal de erros.
No FastAPI (que usa Starlette internamente), você adiciona middleware usando o padrão @app.middleware("http"). A função middleware recebe a requisição, passa para a próxima camada da aplicação, captura a resposta e pode modificá-la antes de retornar.
Middleware Básico de Logging
Um caso de uso clássico é registrar informações sobre as requisições:
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import time
import logging
app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request: Request, call_next):
"""Middleware que registra todas as requisições."""
start_time = time.time()
# Processa a requisição
response = await call_next(request)
# Calcula tempo de processamento
process_time = time.time() - start_time
# Adiciona header customizado
response.headers["X-Process-Time"] = str(process_time)
logger.info(
f"Método: {request.method} | Path: {request.url.path} | "
f"Status: {response.status_code} | Tempo: {process_time:.2f}s"
)
return response
@app.get("/hello")
def hello():
return {"message": "Olá, mundo!"}
Este middleware captura o tempo de início, passa a requisição adiante, e depois adiciona o tempo de processamento como header na resposta. Todos os endpoints da aplicação passarão por esse middleware automaticamente.
Middleware de Autenticação Global
Você pode usar middleware para aplicar autenticação em toda a aplicação, sem precisar adicionar Depends em cada rota:
from fastapi import FastAPI, HTTPException, status
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
app = FastAPI()
class AuthMiddleware(BaseHTTPMiddleware):
"""Middleware customizado que verifica autenticação."""
async def dispatch(self, request: Request, call_next):
# Rotas públicas que não precisam de autenticação
public_paths = ["/health", "/docs", "/openapi.json"]
if request.url.path in public_paths:
return await call_next(request)
# Verifica o header de autorização
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Credenciais inválidas"}
)
token = auth_header.replace("Bearer ", "")
if token != "secret_token_123":
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"detail": "Token inválido"}
)
# Se passou na validação, prossegue
response = await call_next(request)
return response
# Adiciona o middleware à aplicação
app.add_middleware(AuthMiddleware)
@app.get("/health")
def health_check():
"""Rota pública."""
return {"status": "ok"}
@app.get("/protected")
def protected_route():
"""Rota protegida que requer token."""
return {"data": "Este é um dados protegidos"}
Note que middleware são processados em ordem inversa de adição. Se você adicionar múltiplos middleware, o último adicionado será o primeiro a processar a requisição.
Middleware para Modificar Respostas
Você também pode usar middleware para modificar o corpo da resposta, adicionar informações ou transformar dados:
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
import json
app = FastAPI()
class ResponseWrapperMiddleware(BaseHTTPMiddleware):
"""Middleware que envolve todas as respostas em um padrão."""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Lê o corpo da resposta
body = b""
async for chunk in response.body_iterator:
body += chunk
# Decodifica e envolve
try:
data = json.loads(body.decode())
except:
data = body.decode()
wrapped = {
"success": 200 <= response.status_code < 300,
"status_code": response.status_code,
"data": data
}
# Retorna nova resposta
return Response(
content=json.dumps(wrapped),
status_code=response.status_code,
media_type="application/json"
)
app.add_middleware(ResponseWrapperMiddleware)
@app.get("/example")
def example():
return {"message": "Olá"}
Quando você chama GET /example, a resposta será envelopada automaticamente:
{
"success": true,
"status_code": 200,
"data": {"message": "Olá"}
}
Background Tasks: Executar Operações Assincronamente
Background tasks (tarefas em background) permitem que você inicie operações que não precisam completar antes de retornar a resposta ao cliente. Isso é fundamental para melhorar a experiência do usuário: você responde imediatamente e processa operações demoradas (envio de email, processamento pesado, etc.) em segundo plano.
FastAPI oferece duas abordagens principais: BackgroundTasks para tarefas simples na mesma aplicação, e Celery/RQ para sistemas distribuídos mais robustos. Começaremos com BackgroundTasks, que já vem integrada.
Background Tasks Básicas
A maneira mais simples é adicionar um parâmetro BackgroundTasks ao seu endpoint:
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def send_email(email: str, message: str):
"""Simula envio de email."""
time.sleep(2) # Simula operação demorada
print(f"Email enviado para {email}: {message}")
def process_data(data: dict):
"""Simula processamento pesado."""
time.sleep(3)
print(f"Dados processados: {data}")
@app.post("/send-notification")
def send_notification(email: str, background_tasks: BackgroundTasks):
"""Endpoint que envia notificação e retorna imediatamente."""
background_tasks.add_task(send_email, email, "Sua requisição foi recebida!")
return {"status": "Notificação será enviada em breve"}
@app.post("/process")
def process(data: dict, background_tasks: BackgroundTasks):
"""Endpoint que inicia processamento em background."""
background_tasks.add_task(process_data, data)
return {"status": "Processamento iniciado"}
Quando você chama POST /send-notification?email=user@example.com, o endpoint retorna imediatamente com status 200, enquanto o envio de email ocorre em segundo plano. O client não espera pelos 2 segundos de simulação.
Múltiplas Tasks e Dependências
Você pode adicionar várias tarefas na mesma requisição. Elas serão executadas sequencialmente após a resposta ser enviada:
from fastapi import FastAPI, BackgroundTasks, Depends
from datetime import datetime
app = FastAPI()
def log_event(event: str):
"""Registra um evento."""
print(f"[{datetime.now()}] {event}")
def send_email_task(email: str):
"""Envia email."""
print(f"Email enviado para {email}")
def update_analytics(action: str):
"""Atualiza analytics."""
print(f"Analytics atualizado: {action}")
def verify_email(email: str) -> str:
"""Dependência que valida email."""
if "@" not in email:
raise ValueError("Email inválido")
return email
@app.post("/register")
def register(email: str, background_tasks: BackgroundTasks, validated_email: str = Depends(verify_email)):
"""Endpoint que usa dependência e múltiplas background tasks."""
# Adiciona tarefas em ordem
background_tasks.add_task(log_event, f"Novo usuário: {email}")
background_tasks.add_task(send_email_task, email)
background_tasks.add_task(update_analytics, "registration")
return {"status": "Registro iniciado", "email": validated_email}
As tarefas são executadas na ordem em que foram adicionadas, sequencialmente.
Background Tasks com Contexto
Em aplicações reais, você frequentemente precisa passar contexto ou estado para as tarefas. Uma abordagem elegante é usar classes:
from fastapi import FastAPI, BackgroundTasks
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import time
app = FastAPI()
class TaskProcessor:
"""Classe que processa tarefas em background com contexto."""
def __init__(self, db_connection_string: str):
self.db = create_engine(db_connection_string)
def send_welcome_email(self, user_id: int, email: str):
"""Envia email de boas-vindas com acesso ao banco."""
# Simula acesso ao banco para obter template
print(f"Buscando template no banco para usuário {user_id}")
time.sleep(1)
print(f"Email de boas-vindas enviado para {email}")
def cleanup_old_sessions(self):
"""Limpa sessões antigas."""
print("Limpando sessões antigas do banco")
time.sleep(0.5)
# Instância global do processador
task_processor = TaskProcessor("sqlite:///./test.db")
@app.post("/signup")
def signup(email: str, background_tasks: BackgroundTasks):
"""Endpoint que usa task processor com contexto."""
user_id = 42 # Em produção, viria do banco
background_tasks.add_task(task_processor.send_welcome_email, user_id, email)
background_tasks.add_task(task_processor.cleanup_old_sessions)
return {"status": "Conta criada", "user_id": user_id}
WebSockets: Comunicação Bidirecional em Tempo Real
WebSockets permitem comunicação full-duplex entre cliente e servidor em tempo real. Diferentemente de requisições HTTP tradicionais que são unidirecionais, WebSockets mantêm uma conexão aberta onde ambos os lados podem enviar mensagens a qualquer momento. Isso é perfeito para chats, notificações ao vivo, dashboards em tempo real ou qualquer aplicação que precise de atualizações imediatas.
FastAPI, baseado em Starlette, oferece suporte robusto a WebSockets com sintaxe simples. A conexão é mantida viva enquanto ambos os lados mantiverem a conexão aberta, e você pode gerenciar múltiplas conexões simultâneas.
WebSocket Básico: Echo Server
Comecemos com o exemplo mais simples: um servidor que retorna tudo que recebe:
from fastapi import FastAPI, WebSocket
from starlette.websockets import WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket básico que ecoa mensagens recebidas."""
await websocket.accept()
try:
while True:
# Recebe mensagem do cliente
data = await websocket.receive_text()
# Envia de volta
await websocket.send_text(f"Você disse: {data}")
except WebSocketDisconnect:
print("Cliente desconectado")
# Cliente para testar (em outro arquivo ou terminal)
# python -m pip install websockets
# python cliente_ws.py
Cliente Python para testar:
import asyncio
import websockets
async def test():
async with websockets.connect("ws://localhost:8000/ws") as websocket:
# Envia mensagem
await websocket.send("Olá, servidor!")
# Recebe resposta
response = await websocket.recv()
print(f"Resposta: {response}")
asyncio.run(test())
Quando você conecta, o servidor chama accept() para aceitar a conexão. Depois, entra em um loop esperando mensagens com receive_text(). Quando o cliente desconecta, a exceção WebSocketDisconnect é capturada.
Chat Multiusuário com Manager de Conexões
Uma aplicação real requer gerenciar múltiplas conexões ativas. O padrão é usar uma classe ConnectionManager:
from fastapi import FastAPI, WebSocket
from starlette.websockets import WebSocketDisconnect
from typing import List
from datetime import datetime
app = FastAPI()
class ConnectionManager:
"""Gerencia múltiplas conexões WebSocket."""
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""Aceita e registra nova conexão."""
await websocket.accept()
self.active_connections.append(websocket)
print(f"Cliente conectado. Total: {len(self.active_connections)}")
async def disconnect(self, websocket: WebSocket):
"""Remove conexão desconectada."""
self.active_connections.remove(websocket)
print(f"Cliente desconectado. Total: {len(self.active_connections)}")
async def broadcast(self, message: str):
"""Envia mensagem para todos os clientes conectados."""
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
print(f"Erro ao enviar para cliente: {e}")
async def broadcast_json(self, message: dict):
"""Envia JSON para todos os clientes."""
import json
for connection in self.active_connections:
try:
await connection.send_text(json.dumps(message))
except Exception as e:
print(f"Erro ao enviar JSON: {e}")
manager = ConnectionManager()
@app.websocket("/ws/chat/{username}")
async def websocket_chat(websocket: WebSocket, username: str):
"""WebSocket para chat com broadcast."""
await manager.connect(websocket)
try:
# Notifica que usuário entrou
await manager.broadcast(f"{username} entrou no chat")
while True:
# Recebe mensagem
data = await websocket.receive_text()
# Envia para todos
message = {
"timestamp": datetime.now().isoformat(),
"username": username,
"message": data
}
await manager.broadcast_json(message)
except WebSocketDisconnect:
await manager.disconnect(websocket)
await manager.broadcast(f"{username} saiu do chat")
Cliente JavaScript para testar:
<!DOCTYPE html>
<html>
<head>
<title>Chat WebSocket</title>
</head>
<body>
<input id="messageInput" type="text" placeholder="Digite uma mensagem">
<button onclick="sendMessage()">Enviar</button>
<ul id="messages"></ul>
<script>
const username = prompt("Seu nome:");
const ws = new WebSocket(`ws://localhost:8000/ws/chat/${username}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
const li = document.createElement("li");
li.textContent = `${data.username}: ${data.message}`;
document.getElementById("messages").appendChild(li);
};
function sendMessage() {
const input = document.getElementById("messageInput");
ws.send(input.value);
input.value = "";
}
</script>
</body>
</html>
Este é um chat funcional onde qualquer mensagem enviada por um cliente é broadcast para todos os outros clientes conectados.
WebSocket com Autenticação e Dados Estruturados
Em produção, você precisa autenticar WebSocket e trabalhar com dados estruturados. Aqui está um exemplo mais robusto:
from fastapi import FastAPI, WebSocket, WebSocketException, status, Query
from pydantic import BaseModel
from starlette.websockets import WebSocketDisconnect
import json
import jwt
from datetime import datetime
app = FastAPI()
class Message(BaseModel):
"""Modelo para mensagens estruturadas."""
action: str
content: str
class AuthenticatedConnectionManager:
"""Manager com autenticação."""
def __init__(self):
self.active_connections: dict = {} # {user_id: websocket}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self.active_connections[user_id] = websocket
async def disconnect(self, user_id: str):
self.active_connections.pop(user_id, None)
async def send_personal_message(self, user_id: str, message: str):
"""Envia mensagem para um usuário específico."""
if user_id in self.active_connections:
await self.active_connections[user_id].send_text(message)
async def broadcast(self, message: dict):
"""Broadcast com estrutura."""
for user_id, connection in self.active_connections.items():
try:
await connection.send_json(message)
except Exception as e:
print(f"Erro ao enviar: {e}")
manager = AuthenticatedConnectionManager()
def verify_token(token: str) -> str:
"""Simula verificação de JWT."""
try:
# Em produção, use jwt.decode() com secret key
if token.startswith("valid_"):
return token.replace("valid_", "")
raise Exception("Token inválido")
except:
return None
@app.websocket("/ws/notifications")
async def websocket_notifications(websocket: WebSocket, token: str = Query(...)):
"""WebSocket com autenticação via query param."""
user_id = verify_token(token)
if not user_id:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, user_id)
try:
await manager.broadcast({
"type": "user_joined",
"user_id": user_id,
"timestamp": datetime.now().isoformat()
})
while True:
data = await websocket.receive_json()
message = Message(**data)
# Processa baseado na ação
if message.action == "notify_all":
await manager.broadcast({
"type": "notification",
"from": user_id,
"content": message.content
})
elif message.action == "notify_user":
# Em produção, parsearia o destino de message.content
await manager.send_personal_message(
"other_user",
json.dumps({"type": "dm", "from": user_id, "content": message.content})
)
except WebSocketDisconnect:
await manager.disconnect(user_id)
await manager.broadcast({
"type": "user_left",
"user_id": user_id
})
Cliente JavaScript com estrutura:
const token = "valid_user123";
const ws = new WebSocket(`ws://localhost:8000/ws/notifications?token=${token}`);
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log(`Tipo: ${data.type}`, data);
};
function sendNotification() {
ws.send(JSON.stringify({
action: "notify_all",
content: "Olá a todos!"
}));
}
Conclusão
Você dominou quatro pilares essenciais do FastAPI avançado. O sistema de Depends fornece injeção de dependências poderosa e flexível, permitindo código limpo e reutilizável com resolução automática de sub-dependências. Middleware processam requisições globalmente, ideal para logging, autenticação centralizada e transformação de dados em toda a aplicação, funcionando em camadas bem definidas.
BackgroundTasks liberam suas rotas para responder imediatamente ao cliente enquanto operações demoradas executam em segundo plano, melhorando drasticamente a experiência do usuário. Por último, WebSockets abrem a porta para comunicação em tempo real, permitindo desde chats até dashboards que se atualizam automaticamente, com suporte robusto a múltiplas conexões e autenticação.
A verdadeira maestria vem de combinar esses elementos: use Depends para injetar um usuário autenticado, aplique Middleware para validação global, lance uma BackgroundTask para limpeza pesada, e mantenha WebSockets abertos para feedback imediato. Comece simples e escale conforme a complexidade da aplicação demande.