Python Admin

Como Usar Redis com Python: Cache, Pub/Sub e Filas com redis-py em Produção Já leu

Redis: O Fundamento Redis é um armazenamento de dados em memória (in-memory data store) de código aberto que funciona como banco de dados chave-valor. Diferentemente de bancos de dados tradicionais que persistem dados em disco, Redis mantém tudo em RAM, o que o torna extraordinariamente rápido. Sua velocidade o qualifica para três casos de uso principais: cache, sistemas de pub/sub (publicador/assinante) e filas de processamento. O diferencial do Redis é sua capacidade de trabalhar com estruturas de dados complexas — strings, listas, conjuntos, hashes e sorted sets — todas operadas atomicamente. Isso significa que operações são indivisíveis e ocorrem sem interferência de requisições concorrentes. Quando você precisa de subsistemas que trocam mensagens rapidamente ou precisam armazenar dados temporários com extrema performance, Redis é a ferramenta ideal. A biblioteca é o cliente Python oficial que abstrai a comunicação com o servidor Redis, permitindo que você interaja com toda essa potência através de uma API Python intuitiva. Instalação e Configuração Básica Antes

Redis: O Fundamento

Redis é um armazenamento de dados em memória (in-memory data store) de código aberto que funciona como banco de dados chave-valor. Diferentemente de bancos de dados tradicionais que persistem dados em disco, Redis mantém tudo em RAM, o que o torna extraordinariamente rápido. Sua velocidade o qualifica para três casos de uso principais: cache, sistemas de pub/sub (publicador/assinante) e filas de processamento.

O diferencial do Redis é sua capacidade de trabalhar com estruturas de dados complexas — strings, listas, conjuntos, hashes e sorted sets — todas operadas atomicamente. Isso significa que operações são indivisíveis e ocorrem sem interferência de requisições concorrentes. Quando você precisa de subsistemas que trocam mensagens rapidamente ou precisam armazenar dados temporários com extrema performance, Redis é a ferramenta ideal. A biblioteca redis-py é o cliente Python oficial que abstrai a comunicação com o servidor Redis, permitindo que você interaja com toda essa potência através de uma API Python intuitiva.

Instalação e Configuração Básica

Antes de qualquer código, você precisa do servidor Redis rodando e da biblioteca redis-py instalada. Redis é disponibilizado em repositórios oficiais da maioria das distribuições Linux. No macOS, use Homebrew. No Windows, você pode usar WSL2 ou containers Docker.

# Linux (Debian/Ubuntu)
sudo apt-get install redis-server

# macOS
brew install redis

# Iniciar o servidor Redis
redis-server

# Em outro terminal, verificar se está rodando
redis-cli ping

Com o servidor pronto, instale a biblioteca Python:

pip install redis

Agora você consegue conectar:

import redis

# Conexão padrão localhost:6379
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Testar a conexão
print(r.ping())  # Output: True

O parâmetro decode_responses=True é essencial na maioria dos casos: ele converte automaticamente bytes retornados pelo Redis em strings Python. Sem ele, você receberia b'valor' em vez de 'valor', dificultando o trabalho. O parâmetro db=0 especifica qual banco de dados Redis usar (0 a 15 por padrão).

Cache com Redis

O Conceito de Cache

Cache é um armazenamento temporário de dados frequentemente acessados. A estratégia funciona assim: antes de executar uma operação custosa (como uma consulta em banco de dados), você verifica se o resultado já existe no cache. Se existir, retorna-o imediatamente. Se não existir, executa a operação, armazena o resultado no cache para futuras requisições e retorna ao cliente. Isso reduz carga no banco de dados e diminui latência drasticamente.

Redis é ideal para cache porque oferece expiração automática de chaves (TTL — Time To Live). Você pode dizer "armazene este valor, mas Delete-o após 5 minutos". Isso é crítico em cache, pois dados que nunca expiram ocupam memória eternamente.

Implementando Cache de Dados

Vamos criar um exemplo prático: um sistema que busca informações de usuários de um banco de dados lento. Sem cache, cada requisição query o banco. Com cache, consultas subsequentes são servidas em microsegundos.

import redis
import json
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# Simular uma operação custosa (leitura de BD)
def buscar_usuario_no_bd(user_id):
    time.sleep(2)  # Simula latência de BD
    return {
        'id': user_id,
        'nome': f'Usuário {user_id}',
        'email': f'user{user_id}@example.com'
    }

# Função com cache
def obter_usuario(user_id, cache_ttl=3600):
    # Criar uma chave única para este usuário
    cache_key = f'usuario:{user_id}'

    # Tentar recuperar do cache
    cached = r.get(cache_key)
    if cached:
        print(f'✓ Retornado do cache')
        return json.loads(cached)

    # Cache miss: buscar do "BD"
    print(f'✗ Cache miss - consultando BD')
    usuario = buscar_usuario_no_bd(user_id)

    # Armazenar no cache com TTL
    r.setex(cache_key, cache_ttl, json.dumps(usuario))
    return usuario

# Teste
print('Primeira requisição:')
user1 = obter_usuario(1)  # Demora 2 segundos
print(user1)

print('\nSegunda requisição (mesmo usuário):')
user1_again = obter_usuario(1)  # Instantâneo
print(user1_again)

print('\nTerceira requisição (usuário diferente):')
user2 = obter_usuario(2)  # Demora 2 segundos novamente
print(user2)

Output esperado:

Primeira requisição:
✗ Cache miss - consultando BD
{'id': 1, 'nome': 'Usuário 1', 'email': 'user1@example.com'}

Segunda requisição (mesmo usuário):
✓ Retornado do cache
{'id': 1, 'nome': 'Usuário 1', 'email': 'user1@example.com'}

Terceira requisição (usuário diferente):
✗ Cache miss - consultando BD
{'id': 2, 'nome': 'Usuário 2', 'email': 'user2@example.com'}

Aqui usamos setex() que é a fusão de set() + expire(): armazena um valor e define sua expiração em uma operação atômica. Após 3600 segundos, a chave é deletada automaticamente. Você também pode usar set() com parâmetro ex:

r.set(cache_key, json.dumps(usuario), ex=3600)

Estratégias de Invalidação

Um cache que nunca é invalidado pode servir dados desatualizados. Existem estratégias para lidar com isso. A mais simples é a expiração por TTL (já vista). Outra é invalidar manualmente quando os dados mudam:

def atualizar_usuario(user_id, dados_novos):
    # Atualizar no BD (pseudo-código)
    # db.update_user(user_id, dados_novos)

    # Invalidar o cache
    cache_key = f'usuario:{user_id}'
    r.delete(cache_key)
    print(f'Cache invalidado para usuário {user_id}')

atualizar_usuario(1, {'nome': 'Novo Nome'})
# Próxima requisição de obter_usuario(1) terá cache miss e buscará dados novos

Você também pode usar o padrão de invalidação em cascata com chaves padrão:

# Deletar todas as chaves que correspondem a um padrão
r.delete(*r.keys('usuario:*'))  # Deleta todos os caches de usuários

Pub/Sub: Comunicação Entre Componentes

Entendendo Pub/Sub

Pub/Sub (Publicador/Assinante) é um padrão de mensageria onde produtores de dados (publishers) enviam mensagens para tópicos sem conhecer quem as receberá. Consumidores (subscribers) se inscrevem em tópicos de interesse e recebem mensagens assim que são publicadas. Isso desacopla componentes do sistema: publishers não dependem de subscribers existirem, e novos subscribers podem ser adicionados sem alterar publishers.

Redis implementa Pub/Sub de forma elegante. Um publisher envia uma mensagem para um canal, e todos os subscribers daquele canal a recebem instantaneamente. É importante notar que Redis Pub/Sub não persiste mensagens: se nenhum subscriber estiver escutando quando a mensagem é publicada, ela se perde. Para persistência, use Streams ou filas (próxima seção).

Implementando Pub/Sub

Vamos criar um exemplo de notificações em tempo real: um serviço publica atualizações de pedidos, e múltiplos clientes as recebem.

import redis
import threading
import json
from datetime import datetime

def criar_publisher():
    """Simula um serviço que publica atualizações"""
    r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

    def publicar_atualizacao_pedido(pedido_id, status):
        mensagem = {
            'pedido_id': pedido_id,
            'status': status,
            'timestamp': datetime.now().isoformat()
        }
        # Publicar no canal 'pedidos'
        num_subscribers = r.publish('pedidos', json.dumps(mensagem))
        print(f'[PUBLISHER] Mensagem publicada - {num_subscribers} subscribers receberam')

    # Simular publicações
    import time
    time.sleep(1)  # Aguardar subscribers conectarem

    publicar_atualizacao_pedido(101, 'confirmado')
    time.sleep(1)
    publicar_atualizacao_pedido(101, 'processando')
    time.sleep(1)
    publicar_atualizacao_pedido(101, 'enviado')

def criar_subscriber(nome_subscriber):
    """Simula um cliente que consome notificações"""
    r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
    pubsub = r.pubsub()

    # Inscrever no canal
    pubsub.subscribe('pedidos')
    print(f'[{nome_subscriber}] Inscrito no canal pedidos')

    # Escutar mensagens
    for mensagem in pubsub.listen():
        if mensagem['type'] == 'message':
            dados = json.loads(mensagem['data'])
            print(f'[{nome_subscriber}] Recebeu: Pedido {dados["pedido_id"]} - {dados["status"]}')

# Executar em threads paralelas
thread_pub = threading.Thread(target=criar_publisher)
thread_sub1 = threading.Thread(target=criar_subscriber, args=('Cliente1',))
thread_sub2 = threading.Thread(target=criar_subscriber, args=('Cliente2',))

thread_sub1.start()
thread_sub2.start()
thread_pub.start()

thread_sub1.join()
thread_sub2.join()
thread_pub.join()

Output esperado:

[Cliente1] Inscrito no canal pedidos
[Cliente2] Inscrito no canal pedidos
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - confirmado
[Cliente2] Recebeu: Pedido 101 - confirmado
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - processando
[Cliente2] Recebeu: Pedido 101 - processando
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - enviado
[Cliente2] Recebeu: Pedido 101 - enviado

O método listen() cria um gerador que bloqueia aguardando mensagens. O loop roda indefinidamente até a conexão ser encerrada. Cada mensagem recebida é um dicionário com chaves type (tipo de evento: 'message', 'subscribe', etc.) e data (conteúdo da mensagem).

Padrões de Inscrição

Você pode inscrever-se em múltiplos canais ou usar padrões com wildcards:

pubsub = r.pubsub()

# Inscrever em múltiplos canais
pubsub.subscribe('pedidos', 'notificacoes', 'alertas')

# Inscrever em padrões (requer psubscribe)
pubsub.psubscribe('usuario:*:update')  # Recebe de usuario:1:update, usuario:2:update, etc.

# No publisher, publicar em canal específico
r.publish('usuario:42:update', json.dumps({'acao': 'perfil_atualizado'}))

Filas com Redis

Diferença Entre Pub/Sub e Filas

Filas e Pub/Sub parecem similares, mas têm propósitos diferentes. Pub/Sub é um para muitos e não persiste: uma mensagem é entregue a todos os subscribers conectados naquele momento. Filas são um para um (ou múltiplos consumidores) e persistem: mensagens ficam armazenadas até serem consumidas, e cada mensagem é entregue a um único consumidor (em arquitetura de fila tradicional) ou processada uma única vez mesmo com múltiplos consumidores.

Redis não tem suporte nativo a filas com reconhecimento de entrega (como RabbitMQ), mas você pode implementar uma fila simples e eficiente usando listas. Para casos que exigem garantias de entrega e reprocessamento, use Redis Streams. Aqui focamos em filas com listas pela simplicidade didática.

Implementando Filas com Listas

Uma lista Redis é uma sequência ordenada de valores. As operações LPUSH (inserir à esquerda) e RPOP (remover à direita) criam uma estrutura FIFO (First In, First Out).

import redis
import json
import threading
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

def produtor_tarefas():
    """Enfileira tarefas"""
    tarefas = [
        {'id': 1, 'tipo': 'email', 'destinatario': 'user1@example.com'},
        {'id': 2, 'tipo': 'sms', 'destinatario': '+5511999999999'},
        {'id': 3, 'tipo': 'email', 'destinatario': 'user2@example.com'},
        {'id': 4, 'tipo': 'push', 'destinatario': 'device_token_xyz'},
    ]

    for tarefa in tarefas:
        # Enfileirar no final da fila
        r.rpush('fila_tarefas', json.dumps(tarefa))
        print(f'[PRODUTOR] Enfileirada tarefa {tarefa["id"]}')
        time.sleep(0.5)

    print('[PRODUTOR] Todas as tarefas enfileiradas')

def consumidor_tarefas(nome_consumidor):
    """Consome e processa tarefas"""
    while True:
        # Remover do início da fila (ou aguardar)
        tarefa_json = r.lpop('fila_tarefas')

        if tarefa_json is None:
            print(f'[{nome_consumidor}] Fila vazia, aguardando...')
            time.sleep(1)
            continue

        tarefa = json.loads(tarefa_json)
        print(f'[{nome_consumidor}] Processando tarefa {tarefa["id"]} ({tarefa["tipo"]})')

        # Simular processamento
        time.sleep(1)
        print(f'[{nome_consumidor}] Tarefa {tarefa["id"]} concluída')

# Iniciar produtor e consumidores em threads paralelas
thread_prod = threading.Thread(target=produtor_tarefas)
thread_cons1 = threading.Thread(target=consumidor_tarefas, args=('Consumidor1',), daemon=True)
thread_cons2 = threading.Thread(target=consumidor_tarefas, args=('Consumidor2',), daemon=True)

thread_cons1.start()
thread_cons2.start()
thread_prod.start()

thread_prod.join()
time.sleep(10)  # Aguardar consumidores processarem

Output esperado (pode variar em ordem):

[Consumidor1] Fila vazia, aguardando...
[Consumidor2] Fila vazia, aguardando...
[PRODUTOR] Enfileirada tarefa 1
[PRODUTOR] Enfileirada tarefa 2
[Consumidor1] Processando tarefa 1 (email)
[PRODUTOR] Enfileirada tarefa 3
[Consumidor2] Processando tarefa 2 (sms)
[PRODUTOR] Enfileirada tarefa 4
[PRODUTOR] Todas as tarefas enfileiradas
[Consumidor1] Tarefa 1 concluída
[Consumidor2] Tarefa 2 concluída
[Consumidor1] Processando tarefa 3 (email)
[Consumidor2] Processando tarefa 4 (push)
[Consumidor1] Tarefa 3 concluída
[Consumidor2] Tarefa 4 concluída

Aqui, rpush() adiciona à direita e lpop() remove pela esquerda, garantindo ordem FIFO. Múltiplos consumidores podem trabalhar em paralelo, cada um pegando uma tarefa diferente.

Bloqueando Pop para Eficiência

Usar lpop() em loop com sleep é ineficiente: desperdiça CPU verificando a fila continuamente. Redis oferece blpop() (blocking left pop) que aguarda até que um elemento esteja disponível:

def consumidor_tarefas_bloqueante(nome_consumidor):
    """Versão mais eficiente com blocking pop"""
    while True:
        # Aguardar até 0 segundos (indefinido) por uma tarefa
        resultado = r.blpop('fila_tarefas', timeout=0)

        if resultado is None:
            print(f'[{nome_consumidor}] Timeout atingido')
            continue

        # blpop retorna tupla (chave, valor)
        chave, tarefa_json = resultado
        tarefa = json.loads(tarefa_json)
        print(f'[{nome_consumidor}] Processando tarefa {tarefa["id"]}')

        # Processar...
        time.sleep(1)
        print(f'[{nome_consumidor}] Concluída')

blpop() é superior porque não consome CPU enquanto aguarda. O timeout em segundos (0 = indefinido) define quanto tempo esperar. Se excedido sem nenhuma mensagem, retorna None.

Filas com Prioridade

Para tarefas com prioridades diferentes, use sorted sets:

def enfileirar_com_prioridade(chave_fila, tarefa, prioridade=0):
    """prioridade: -10 (alta) a 10 (baixa)"""
    r.zadd(chave_fila, {json.dumps(tarefa): prioridade})

def desfileirar_com_prioridade(chave_fila):
    """Remove tarefa com maior prioridade (menor score)"""
    resultado = r.zrange(chave_fila, 0, 0)
    if resultado:
        tarefa = resultado[0]
        r.zrem(chave_fila, tarefa)
        return json.loads(tarefa)
    return None

# Exemplo
enfileirar_com_prioridade('fila_prioritaria', 
                         {'id': 1, 'tipo': 'email'}, 
                         prioridade=5)
enfileirar_com_prioridade('fila_prioritaria', 
                         {'id': 2, 'tipo': 'alerta'}, 
                         prioridade=-10)  # Alta prioridade

tarefa = desfileirar_com_prioridade('fila_prioritaria')
print(tarefa)  # {'id': 2, 'tipo': 'alerta'} (menor score = maior prioridade)

Padrões Avançados e Boas Práticas

Connection Pooling

Em aplicações reais, você não cria uma nova conexão Redis para cada operação. Conexões são custosas. Use connection pooling: um conjunto de conexões reutilizáveis mantidas abertas.

import redis

# Pool padrão (máximo 50 conexões)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)

# Ou, mais simples, redis-py cria um pool automaticamente:
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Internamente já usa pooling

Pipelines para Múltiplas Operações

Se você precisa executar múltiplas operações Redis, use pipelines para agrupá-las e enviar em uma única requisição:

# Sem pipeline: 3 requisições
r.set('chave1', 'valor1')
r.set('chave2', 'valor2')
r.set('chave3', 'valor3')

# Com pipeline: 1 requisição
pipe = r.pipeline()
pipe.set('chave1', 'valor1')
pipe.set('chave2', 'valor2')
pipe.set('chave3', 'valor3')
results = pipe.execute()  # Execute todas de uma vez

Pipelines reduzem latência de rede significativamente em operações em batch.

Tratamento de Erros

Redis pode ficar indisponível. Sempre trate exceções:

import redis
from redis.exceptions import ConnectionError, TimeoutError

try:
    r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
    r.ping()
except ConnectionError:
    print('Falha ao conectar ao Redis')
except TimeoutError:
    print('Timeout na conexão com Redis')

# Para operações, use try-except
try:
    valor = r.get('chave')
except redis.RedisError as e:
    print(f'Erro Redis: {e}')

Expiração e Limpeza

Sempre defina TTL em dados temporários. Dados sem expiração causam vazamento de memória:

# Ruim: nenhuma expiração
r.set('sessao_user', json.dumps(dados_sessao))

# Bom: expira em 24 horas
r.setex('sessao_user', 86400, json.dumps(dados_sessao))

# Ou:
r.set('sessao_user', json.dumps(dados_sessao), ex=86400)

Você pode monitorar memória usada com:

info = r.info('memory')
print(f"Memória usada: {info['used_memory_human']}")
print(f"Pico de memória: {info['used_memory_peak_human']}")

Conclusão

Dominando Redis com Python, você adquiriu três superpoderes. Primeiro, cache inteligente que reduz carga massivamente em sistemas: dados frequentemente acessados são servidos em microsegundos, não segundos. Redis transforma performance de aplicações. Segundo, Pub/Sub para comunicação em tempo real: desacopla componentes do sistema permitindo que diferentes serviços se comuniquem sem conhecerem-se mutuamente, criando arquiteturas escaláveis e resilientes. Terceiro, filas robustas para processamento assíncrono: tarefas custosas são enfileiradas e processadas em background por workers, mantendo a aplicação responsiva.

A biblioteca redis-py abstrai a complexidade do protocolo Redis e oferece uma API Pythônica. Combine essas capacidades — cache + Pub/Sub + filas — e você resolve problemas que muitos desenvolvedores enfrentam com soluções caras e complexas. Use as práticas apresentadas: sempre defina TTLs, trate erros, use connection pooling e pipelines. Redis não é apenas um cache; é um alicerce para sistemas modernos de alta performance.

Referências


Artigos relacionados