Python Admin

Dominando Celery em Python: Filas de Tarefas, Workers e Beat Scheduler em Projetos Reais Já leu

O que é Celery e por que você precisa disso Celery é uma biblioteca Python assíncrona que permite executar tarefas em background de forma distribuída. Se você já se viu preso em uma situação onde sua aplicação web fica travada enquanto processa um arquivo grande, envia emails ou faz requisições lentas para APIs externas, você entendeu o problema que Celery resolve. A ideia central é simples: em vez de executar operações pesadas no mesmo processo que atende as requisições HTTP, você enfileira essas tarefas em um broker de mensagens (como Redis ou RabbitMQ) e deixa que workers independentes processem tudo em paralelo. Sua aplicação responde instantaneamente ao usuário enquanto o trabalho acontece nos bastidores. Celery também oferece agendamento de tarefas através do Beat Scheduler, permitindo executar jobs em horários específicos, como um cron job tradicional, mas com toda a robustez de um sistema distribuído. Arquitetura fundamental do Celery O Celery funciona com três componentes principais: a aplicação que enfileira tarefas,

O que é Celery e por que você precisa disso

Celery é uma biblioteca Python assíncrona que permite executar tarefas em background de forma distribuída. Se você já se viu preso em uma situação onde sua aplicação web fica travada enquanto processa um arquivo grande, envia emails ou faz requisições lentas para APIs externas, você entendeu o problema que Celery resolve.

A ideia central é simples: em vez de executar operações pesadas no mesmo processo que atende as requisições HTTP, você enfileira essas tarefas em um broker de mensagens (como Redis ou RabbitMQ) e deixa que workers independentes processem tudo em paralelo. Sua aplicação responde instantaneamente ao usuário enquanto o trabalho acontece nos bastidores. Celery também oferece agendamento de tarefas através do Beat Scheduler, permitindo executar jobs em horários específicos, como um cron job tradicional, mas com toda a robustez de um sistema distribuído.

Arquitetura fundamental do Celery

O Celery funciona com três componentes principais: a aplicação que enfileira tarefas, o broker que armazena essas mensagens, e os workers que as executem. A aplicação envia mensagens para o broker, dizendo "execute essa função com esses parâmetros". Os workers monitoram o broker continuamente, pegam mensagens, executam o código correspondente e retornam o resultado. Tudo isso sem bloquear a aplicação principal.

O resultado das tarefas é armazenado em um backend de resultados (também pode ser Redis, banco de dados, etc), permitindo que você consulte o status e recupere o resultado quando necessário. Essa separação de responsabilidades torna o sistema escalável: você pode adicionar quantos workers quiser conforme a demanda cresce.

Configuração inicial e primeiros passos

Antes de qualquer coisa, você precisa instalar Celery e um broker de mensagens. Redis é a escolha mais simples para começar:

pip install celery redis

Agora vamos criar uma estrutura mínima de projeto. Crie um arquivo celery_app.py:

from celery import Celery

# Cria a instância do Celery
app = Celery('meu_projeto')

# Configura o broker (Redis rodando localmente na porta padrão)
app.conf.broker_url = 'redis://localhost:6379/0'

# Configura o backend de resultados
app.conf.result_backend = 'redis://localhost:6379/0'

# Define um timeout para tarefas (em segundos)
app.conf.task_time_limit = 30 * 60  # 30 minutos
app.conf.task_soft_time_limit = 25 * 60  # 25 minutos (aviso antes do timeout)

Agora defina suas tarefas em um arquivo tasks.py:

from celery_app import app
import time

@app.task
def enviar_email(destinatario, assunto, corpo):
    """Simula envio de email"""
    print(f"Enviando email para {destinatario}...")
    time.sleep(2)  # Simula operação lenta
    print(f"Email enviado para {destinatario}")
    return f"Email enviado com sucesso para {destinatario}"

@app.task
def processar_arquivo(caminho_arquivo):
    """Simula processamento de arquivo grande"""
    print(f"Processando arquivo: {caminho_arquivo}")
    time.sleep(5)  # Simula trabalho pesado
    print(f"Arquivo processado")
    return {"status": "concluído", "arquivo": caminho_arquivo}

Para testar localmente sem precisar rodar um worker separado, você pode usar:

# teste.py
from tasks import enviar_email, processar_arquivo

# Executa a tarefa de forma síncrona (apenas para testes)
resultado = enviar_email.apply_async(
    args=('usuario@example.com', 'Bem-vindo', 'Teste de email'),
    task_id='teste-123'
)

print(f"ID da tarefa: {resultado.id}")
print(f"Status: {resultado.status}")
print(f"Resultado: {resultado.get(timeout=10)}")

Iniciando o Redis e os Workers

Abra um terminal e inicie o Redis (ou use Docker):

docker run -d -p 6379:6379 redis:latest

Em outro terminal, inicie um worker Celery:

celery -A celery_app worker --loglevel=info

Agora você pode executar suas tarefas de forma assíncrona. A resposta será instantânea e o worker processará em background.

Filas, Prioridades e Roteamento de Tarefas

Por padrão, todas as tarefas entram em uma única fila chamada celery. Para aplicações maiores, você quer separar tarefas por tipo e dar prioridades diferentes. Isso é feito através do roteamento.

Vamos expandir o arquivo de configuração celery_app.py:

from celery import Celery
from kombu import Exchange, Queue

app = Celery('meu_projeto')

app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/0'

# Define as exchanges (tópicos/grupos de mensagens)
default_exchange = Exchange('celery', type='direct')
email_exchange = Exchange('email', type='direct')
processamento_exchange = Exchange('processamento', type='direct')

# Define as filas
app.conf.task_queues = (
    Queue('default', exchange=default_exchange, routing_key='default'),
    Queue('email', exchange=email_exchange, routing_key='email'),
    Queue('processamento', exchange=processamento_exchange, routing_key='processamento'),
)

# Define o roteamento: qual fila cada tarefa deve usar
app.conf.task_routes = {
    'tasks.enviar_email': {'queue': 'email', 'routing_key': 'email'},
    'tasks.processar_arquivo': {'queue': 'processamento', 'routing_key': 'processamento'},
}

Agora suas tarefas irão para filas específicas:

# tasks.py
from celery_app import app

@app.task(queue='email')
def enviar_email(destinatario, assunto, corpo):
    print(f"Enviando email para {destinatario}...")
    return f"Email enviado para {destinatario}"

@app.task(queue='processamento')
def processar_arquivo(caminho_arquivo):
    print(f"Processando arquivo: {caminho_arquivo}")
    return {"status": "concluído"}

Controlando a prioridade de execução

Se você quer que certos emails urgentes sejam processados antes de outros, use o parâmetro priority:

from tasks import enviar_email

# Email com alta prioridade
enviar_email.apply_async(
    args=('ceo@empresa.com', 'Urgente', 'Relatório crítico'),
    priority=9  # 0 a 9, quanto maior, maior a prioridade
)

# Email com prioridade normal
enviar_email.apply_async(
    args=('usuario@empresa.com', 'Newsletter', 'Conteúdo semanal'),
    priority=5
)

Você pode iniciar múltiplos workers especializados, cada um consumindo filas diferentes:

# Worker exclusivo para emails (processa prioritariamente)
celery -A celery_app worker -Q email --concurrency=2 --loglevel=info

# Worker para processamento pesado
celery -A celery_app worker -Q processamento --concurrency=1 --loglevel=info

# Worker para tarefas gerais
celery -A celery_app worker -Q default --concurrency=4 --loglevel=info

Celery Beat: Agendamento de Tarefas Periódicas

O Celery Beat é um scheduler que executa tarefas em intervalos regulares ou em horários específicos. É a solução Celery para o que você faria com cron, mas integrada e distribuída.

Configurando o Beat Scheduler

Adicione à sua configuração do Celery:

# celery_app.py
from celery import Celery
from celery.schedules import crontab
from kombu import Exchange, Queue
import os

app = Celery('meu_projeto')

app.conf.broker_url = 'redis://localhost:6379/0'
app.conf.result_backend = 'redis://localhost:6379/0'

# Configuração do Beat
app.conf.beat_scheduler = 'celery.beat:PersistentScheduler'
app.conf.beat_schedule = {
    # Tarefa que executa a cada 30 segundos
    'limpar-cache-cada-30s': {
        'task': 'tasks.limpar_cache',
        'schedule': 30.0,  # em segundos
    },
    # Tarefa que executa a cada 5 minutos
    'verificar-emails-pendentes': {
        'task': 'tasks.verificar_emails_pendentes',
        'schedule': 300.0,  # 5 minutos
    },
    # Tarefa que executa diariamente às 3 da manhã
    'backup-diario': {
        'task': 'tasks.fazer_backup',
        'schedule': crontab(hour=3, minute=0),
    },
    # Tarefa que executa segunda a sexta às 9 da manhã
    'relatorio-semanal': {
        'task': 'tasks.enviar_relatorio',
        'schedule': crontab(hour=9, minute=0, day_of_week='1-5'),
    },
}

Agora defina as tarefas correspondentes:

# tasks.py
from celery_app import app
from datetime import datetime

@app.task
def limpar_cache():
    """Remove itens expirados do cache"""
    print(f"[{datetime.now()}] Limpando cache...")
    # Sua lógica de limpeza aqui
    return "Cache limpo"

@app.task
def verificar_emails_pendentes():
    """Verifica emails aguardando envio"""
    print(f"[{datetime.now()}] Verificando emails pendentes...")
    # Sua lógica aqui
    return "Verificação concluída"

@app.task
def fazer_backup():
    """Faz backup do banco de dados"""
    print(f"[{datetime.now()}] Iniciando backup...")
    # Sua lógica de backup
    return "Backup concluído"

@app.task
def enviar_relatorio():
    """Envia relatório semanal"""
    print(f"[{datetime.now()}] Gerando e enviando relatório...")
    # Sua lógica de relatório
    return "Relatório enviado"

Executando o Beat Scheduler

Em um terminal separado, inicie o Beat:

celery -A celery_app beat --loglevel=info

E deixe um ou mais workers rodando em outros terminais:

celery -A celery_app worker --loglevel=info

O Beat irá enfileirar as tarefas nos horários configurados, e os workers as executarão. Você verá no log do worker as tarefas sendo processadas automaticamente.

Combinando Beat com roteamento de filas

Para tarefas agendadas irem para filas específicas:

# celery_app.py
app.conf.beat_schedule = {
    'backup-diario': {
        'task': 'tasks.fazer_backup',
        'schedule': crontab(hour=3, minute=0),
        'options': {'queue': 'processamento', 'priority': 8}
    },
}

Monitoramento, Tratamento de Erros e Boas Práticas

Tratamento de erros e retry automático

Tarefas podem falhar por vários motivos. Celery oferece mecanismos de retry inteligentes:

# tasks.py
from celery_app import app
from celery import Task
import requests

class CallbackTask(Task):
    """Task base com callback para erros"""
    autoretry_for = (Exception,)
    retry_kwargs = {'max_retries': 3}
    retry_backoff = True  # Espera exponencial entre tentativas
    retry_backoff_max = 600  # Máximo de 10 minutos de espera
    retry_jitter = True  # Adiciona aleatoriedade para evitar thundering herd

@app.task(bind=True, base=CallbackTask)
def chamar_api_externa(self, url):
    """Chama API externa com retry automático"""
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        return {"status": "sucesso", "dados": response.json()}
    except requests.RequestException as exc:
        # Tenta novamente em 60 segundos na primeira falha,
        # 120 na segunda, 240 na terceira
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)

@app.task(bind=True)
def tarefa_critica(self, dados):
    """Tarefa com retry customizado"""
    try:
        # Sua lógica aqui
        resultado = processar(dados)
        return resultado
    except ValueError as exc:
        # Não faz retry para ValueError
        raise
    except Exception as exc:
        # Faz retry exponencial
        self.retry(exc=exc, countdown=10)

def processar(dados):
    """Função auxiliar"""
    return {"processado": True}

Monitoramento com Flower

Flower é uma interface web para monitorar Celery em tempo real:

pip install flower

# Inicie o Flower (acessível em http://localhost:5555)
celery -A celery_app flower

No Flower você pode:
- Ver workers conectados e seu status
- Monitorar tarefas em tempo real
- Ver histórico de execução
- Executar tarefas manualmente
- Visualizar logs detalhados

Padrão de idempotência para tarefas

Tarefas podem ser executadas mais de uma vez (se o worker falhar ou por deduplicação). Garanta que são idempotentes:

# tasks.py
from celery_app import app
import hashlib

# Simulando um banco de dados
processados = set()

@app.task(bind=True)
def processar_pagamento(self, pedido_id, valor):
    """Processa pagamento de forma idempotente"""
    # Cria um ID único para este processamento
    task_hash = hashlib.md5(
        f"{pedido_id}-{valor}-{self.request.id}".encode()
    ).hexdigest()

    # Se já foi processado, retorna sem erro
    if task_hash in processados:
        print(f"Pagamento já processado (ID: {pedido_id})")
        return {"status": "já_processado", "pedido_id": pedido_id}

    # Processa o pagamento
    print(f"Processando pagamento: Pedido {pedido_id}, Valor R${valor}")
    # Sua lógica de pagamento aqui (chamar API do gateway, etc)

    # Marca como processado
    processados.add(task_hash)

    return {"status": "sucesso", "pedido_id": pedido_id, "valor": valor}

Exemplo completo com Flask

Aqui está uma aplicação web completa usando Celery:

# app.py
from flask import Flask, jsonify, request
from tasks import enviar_email, processar_arquivo
from celery_app import app as celery_app

app = Flask(__name__)

@app.route('/enviar-email', methods=['POST'])
def enviar_email_endpoint():
    """Enfileira um email para envio assíncrono"""
    dados = request.get_json()

    # Enfileira a tarefa sem esperar a resposta
    task = enviar_email.apply_async(
        args=(
            dados['destinatario'],
            dados['assunto'],
            dados['corpo']
        ),
        priority=dados.get('prioridade', 5)
    )

    return jsonify({
        "mensagem": "Email enfileirado",
        "task_id": task.id
    }), 202

@app.route('/status/<task_id>', methods=['GET'])
def obter_status(task_id):
    """Retorna status de uma tarefa"""
    task = celery_app.AsyncResult(task_id)

    return jsonify({
        "task_id": task_id,
        "status": task.status,
        "resultado": task.result if task.successful() else None
    })

@app.route('/processar', methods=['POST'])
def processar_endpoint():
    """Enfileira processamento de arquivo"""
    dados = request.get_json()

    task = processar_arquivo.apply_async(
        args=(dados['arquivo'],),
        queue='processamento'
    )

    return jsonify({
        "mensagem": "Processamento iniciado",
        "task_id": task.id
    }), 202

if __name__ == '__main__':
    app.run(debug=True)

Use a API assim:

# Enfileira um email
curl -X POST http://localhost:5000/enviar-email \
  -H "Content-Type: application/json" \
  -d '{
    "destinatario": "usuario@example.com",
    "assunto": "Bem-vindo",
    "corpo": "Teste de email",
    "prioridade": 7
  }'

# Verifica o status (use a task_id retornada)
curl http://localhost:5000/status/abc123def456

Conclusão

Você aprendeu que Celery resolve o problema de operações síncronas bloqueantes permitindo executar tarefas em background através de um broker de mensagens. Isso torna suas aplicações web responsivas mesmo com operações pesadas acontecendo simultaneamente.

O roteamento inteligente de tarefas em filas diferentes com prioridades oferece controle fino sobre como seu sistema processa trabalho, permitindo escalar verticalmente (mais workers) ou horizontalmente (máquinas diferentes) conforme necessário.

O Celery Beat traz agendamento confiável e distribuído, eliminando a necessidade de cron jobs tradicionais e integrando perfeitamente com sua arquitetura de workers. Combinado com tratamento de erros, retry exponencial e idempotência, você tem um sistema robusto capaz de processar bilhões de tarefas em produção.

Referências


Artigos relacionados