Redis: O Fundamento
Redis é um armazenamento de dados em memória (in-memory data store) de código aberto que funciona como banco de dados chave-valor. Diferentemente de bancos de dados tradicionais que persistem dados em disco, Redis mantém tudo em RAM, o que o torna extraordinariamente rápido. Sua velocidade o qualifica para três casos de uso principais: cache, sistemas de pub/sub (publicador/assinante) e filas de processamento.
O diferencial do Redis é sua capacidade de trabalhar com estruturas de dados complexas — strings, listas, conjuntos, hashes e sorted sets — todas operadas atomicamente. Isso significa que operações são indivisíveis e ocorrem sem interferência de requisições concorrentes. Quando você precisa de subsistemas que trocam mensagens rapidamente ou precisam armazenar dados temporários com extrema performance, Redis é a ferramenta ideal. A biblioteca redis-py é o cliente Python oficial que abstrai a comunicação com o servidor Redis, permitindo que você interaja com toda essa potência através de uma API Python intuitiva.
Instalação e Configuração Básica
Antes de qualquer código, você precisa do servidor Redis rodando e da biblioteca redis-py instalada. Redis é disponibilizado em repositórios oficiais da maioria das distribuições Linux. No macOS, use Homebrew. No Windows, você pode usar WSL2 ou containers Docker.
# Linux (Debian/Ubuntu)
sudo apt-get install redis-server
# macOS
brew install redis
# Iniciar o servidor Redis
redis-server
# Em outro terminal, verificar se está rodando
redis-cli ping
Com o servidor pronto, instale a biblioteca Python:
pip install redis
Agora você consegue conectar:
import redis
# Conexão padrão localhost:6379
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Testar a conexão
print(r.ping()) # Output: True
O parâmetro decode_responses=True é essencial na maioria dos casos: ele converte automaticamente bytes retornados pelo Redis em strings Python. Sem ele, você receberia b'valor' em vez de 'valor', dificultando o trabalho. O parâmetro db=0 especifica qual banco de dados Redis usar (0 a 15 por padrão).
Cache com Redis
O Conceito de Cache
Cache é um armazenamento temporário de dados frequentemente acessados. A estratégia funciona assim: antes de executar uma operação custosa (como uma consulta em banco de dados), você verifica se o resultado já existe no cache. Se existir, retorna-o imediatamente. Se não existir, executa a operação, armazena o resultado no cache para futuras requisições e retorna ao cliente. Isso reduz carga no banco de dados e diminui latência drasticamente.
Redis é ideal para cache porque oferece expiração automática de chaves (TTL — Time To Live). Você pode dizer "armazene este valor, mas Delete-o após 5 minutos". Isso é crítico em cache, pois dados que nunca expiram ocupam memória eternamente.
Implementando Cache de Dados
Vamos criar um exemplo prático: um sistema que busca informações de usuários de um banco de dados lento. Sem cache, cada requisição query o banco. Com cache, consultas subsequentes são servidas em microsegundos.
import redis
import json
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Simular uma operação custosa (leitura de BD)
def buscar_usuario_no_bd(user_id):
time.sleep(2) # Simula latência de BD
return {
'id': user_id,
'nome': f'Usuário {user_id}',
'email': f'user{user_id}@example.com'
}
# Função com cache
def obter_usuario(user_id, cache_ttl=3600):
# Criar uma chave única para este usuário
cache_key = f'usuario:{user_id}'
# Tentar recuperar do cache
cached = r.get(cache_key)
if cached:
print(f'✓ Retornado do cache')
return json.loads(cached)
# Cache miss: buscar do "BD"
print(f'✗ Cache miss - consultando BD')
usuario = buscar_usuario_no_bd(user_id)
# Armazenar no cache com TTL
r.setex(cache_key, cache_ttl, json.dumps(usuario))
return usuario
# Teste
print('Primeira requisição:')
user1 = obter_usuario(1) # Demora 2 segundos
print(user1)
print('\nSegunda requisição (mesmo usuário):')
user1_again = obter_usuario(1) # Instantâneo
print(user1_again)
print('\nTerceira requisição (usuário diferente):')
user2 = obter_usuario(2) # Demora 2 segundos novamente
print(user2)
Output esperado:
Primeira requisição:
✗ Cache miss - consultando BD
{'id': 1, 'nome': 'Usuário 1', 'email': 'user1@example.com'}
Segunda requisição (mesmo usuário):
✓ Retornado do cache
{'id': 1, 'nome': 'Usuário 1', 'email': 'user1@example.com'}
Terceira requisição (usuário diferente):
✗ Cache miss - consultando BD
{'id': 2, 'nome': 'Usuário 2', 'email': 'user2@example.com'}
Aqui usamos setex() que é a fusão de set() + expire(): armazena um valor e define sua expiração em uma operação atômica. Após 3600 segundos, a chave é deletada automaticamente. Você também pode usar set() com parâmetro ex:
r.set(cache_key, json.dumps(usuario), ex=3600)
Estratégias de Invalidação
Um cache que nunca é invalidado pode servir dados desatualizados. Existem estratégias para lidar com isso. A mais simples é a expiração por TTL (já vista). Outra é invalidar manualmente quando os dados mudam:
def atualizar_usuario(user_id, dados_novos):
# Atualizar no BD (pseudo-código)
# db.update_user(user_id, dados_novos)
# Invalidar o cache
cache_key = f'usuario:{user_id}'
r.delete(cache_key)
print(f'Cache invalidado para usuário {user_id}')
atualizar_usuario(1, {'nome': 'Novo Nome'})
# Próxima requisição de obter_usuario(1) terá cache miss e buscará dados novos
Você também pode usar o padrão de invalidação em cascata com chaves padrão:
# Deletar todas as chaves que correspondem a um padrão
r.delete(*r.keys('usuario:*')) # Deleta todos os caches de usuários
Pub/Sub: Comunicação Entre Componentes
Entendendo Pub/Sub
Pub/Sub (Publicador/Assinante) é um padrão de mensageria onde produtores de dados (publishers) enviam mensagens para tópicos sem conhecer quem as receberá. Consumidores (subscribers) se inscrevem em tópicos de interesse e recebem mensagens assim que são publicadas. Isso desacopla componentes do sistema: publishers não dependem de subscribers existirem, e novos subscribers podem ser adicionados sem alterar publishers.
Redis implementa Pub/Sub de forma elegante. Um publisher envia uma mensagem para um canal, e todos os subscribers daquele canal a recebem instantaneamente. É importante notar que Redis Pub/Sub não persiste mensagens: se nenhum subscriber estiver escutando quando a mensagem é publicada, ela se perde. Para persistência, use Streams ou filas (próxima seção).
Implementando Pub/Sub
Vamos criar um exemplo de notificações em tempo real: um serviço publica atualizações de pedidos, e múltiplos clientes as recebem.
import redis
import threading
import json
from datetime import datetime
def criar_publisher():
"""Simula um serviço que publica atualizações"""
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def publicar_atualizacao_pedido(pedido_id, status):
mensagem = {
'pedido_id': pedido_id,
'status': status,
'timestamp': datetime.now().isoformat()
}
# Publicar no canal 'pedidos'
num_subscribers = r.publish('pedidos', json.dumps(mensagem))
print(f'[PUBLISHER] Mensagem publicada - {num_subscribers} subscribers receberam')
# Simular publicações
import time
time.sleep(1) # Aguardar subscribers conectarem
publicar_atualizacao_pedido(101, 'confirmado')
time.sleep(1)
publicar_atualizacao_pedido(101, 'processando')
time.sleep(1)
publicar_atualizacao_pedido(101, 'enviado')
def criar_subscriber(nome_subscriber):
"""Simula um cliente que consome notificações"""
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
pubsub = r.pubsub()
# Inscrever no canal
pubsub.subscribe('pedidos')
print(f'[{nome_subscriber}] Inscrito no canal pedidos')
# Escutar mensagens
for mensagem in pubsub.listen():
if mensagem['type'] == 'message':
dados = json.loads(mensagem['data'])
print(f'[{nome_subscriber}] Recebeu: Pedido {dados["pedido_id"]} - {dados["status"]}')
# Executar em threads paralelas
thread_pub = threading.Thread(target=criar_publisher)
thread_sub1 = threading.Thread(target=criar_subscriber, args=('Cliente1',))
thread_sub2 = threading.Thread(target=criar_subscriber, args=('Cliente2',))
thread_sub1.start()
thread_sub2.start()
thread_pub.start()
thread_sub1.join()
thread_sub2.join()
thread_pub.join()
Output esperado:
[Cliente1] Inscrito no canal pedidos
[Cliente2] Inscrito no canal pedidos
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - confirmado
[Cliente2] Recebeu: Pedido 101 - confirmado
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - processando
[Cliente2] Recebeu: Pedido 101 - processando
[PUBLISHER] Mensagem publicada - 2 subscribers receberam
[Cliente1] Recebeu: Pedido 101 - enviado
[Cliente2] Recebeu: Pedido 101 - enviado
O método listen() cria um gerador que bloqueia aguardando mensagens. O loop roda indefinidamente até a conexão ser encerrada. Cada mensagem recebida é um dicionário com chaves type (tipo de evento: 'message', 'subscribe', etc.) e data (conteúdo da mensagem).
Padrões de Inscrição
Você pode inscrever-se em múltiplos canais ou usar padrões com wildcards:
pubsub = r.pubsub()
# Inscrever em múltiplos canais
pubsub.subscribe('pedidos', 'notificacoes', 'alertas')
# Inscrever em padrões (requer psubscribe)
pubsub.psubscribe('usuario:*:update') # Recebe de usuario:1:update, usuario:2:update, etc.
# No publisher, publicar em canal específico
r.publish('usuario:42:update', json.dumps({'acao': 'perfil_atualizado'}))
Filas com Redis
Diferença Entre Pub/Sub e Filas
Filas e Pub/Sub parecem similares, mas têm propósitos diferentes. Pub/Sub é um para muitos e não persiste: uma mensagem é entregue a todos os subscribers conectados naquele momento. Filas são um para um (ou múltiplos consumidores) e persistem: mensagens ficam armazenadas até serem consumidas, e cada mensagem é entregue a um único consumidor (em arquitetura de fila tradicional) ou processada uma única vez mesmo com múltiplos consumidores.
Redis não tem suporte nativo a filas com reconhecimento de entrega (como RabbitMQ), mas você pode implementar uma fila simples e eficiente usando listas. Para casos que exigem garantias de entrega e reprocessamento, use Redis Streams. Aqui focamos em filas com listas pela simplicidade didática.
Implementando Filas com Listas
Uma lista Redis é uma sequência ordenada de valores. As operações LPUSH (inserir à esquerda) e RPOP (remover à direita) criam uma estrutura FIFO (First In, First Out).
import redis
import json
import threading
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
def produtor_tarefas():
"""Enfileira tarefas"""
tarefas = [
{'id': 1, 'tipo': 'email', 'destinatario': 'user1@example.com'},
{'id': 2, 'tipo': 'sms', 'destinatario': '+5511999999999'},
{'id': 3, 'tipo': 'email', 'destinatario': 'user2@example.com'},
{'id': 4, 'tipo': 'push', 'destinatario': 'device_token_xyz'},
]
for tarefa in tarefas:
# Enfileirar no final da fila
r.rpush('fila_tarefas', json.dumps(tarefa))
print(f'[PRODUTOR] Enfileirada tarefa {tarefa["id"]}')
time.sleep(0.5)
print('[PRODUTOR] Todas as tarefas enfileiradas')
def consumidor_tarefas(nome_consumidor):
"""Consome e processa tarefas"""
while True:
# Remover do início da fila (ou aguardar)
tarefa_json = r.lpop('fila_tarefas')
if tarefa_json is None:
print(f'[{nome_consumidor}] Fila vazia, aguardando...')
time.sleep(1)
continue
tarefa = json.loads(tarefa_json)
print(f'[{nome_consumidor}] Processando tarefa {tarefa["id"]} ({tarefa["tipo"]})')
# Simular processamento
time.sleep(1)
print(f'[{nome_consumidor}] Tarefa {tarefa["id"]} concluída')
# Iniciar produtor e consumidores em threads paralelas
thread_prod = threading.Thread(target=produtor_tarefas)
thread_cons1 = threading.Thread(target=consumidor_tarefas, args=('Consumidor1',), daemon=True)
thread_cons2 = threading.Thread(target=consumidor_tarefas, args=('Consumidor2',), daemon=True)
thread_cons1.start()
thread_cons2.start()
thread_prod.start()
thread_prod.join()
time.sleep(10) # Aguardar consumidores processarem
Output esperado (pode variar em ordem):
[Consumidor1] Fila vazia, aguardando...
[Consumidor2] Fila vazia, aguardando...
[PRODUTOR] Enfileirada tarefa 1
[PRODUTOR] Enfileirada tarefa 2
[Consumidor1] Processando tarefa 1 (email)
[PRODUTOR] Enfileirada tarefa 3
[Consumidor2] Processando tarefa 2 (sms)
[PRODUTOR] Enfileirada tarefa 4
[PRODUTOR] Todas as tarefas enfileiradas
[Consumidor1] Tarefa 1 concluída
[Consumidor2] Tarefa 2 concluída
[Consumidor1] Processando tarefa 3 (email)
[Consumidor2] Processando tarefa 4 (push)
[Consumidor1] Tarefa 3 concluída
[Consumidor2] Tarefa 4 concluída
Aqui, rpush() adiciona à direita e lpop() remove pela esquerda, garantindo ordem FIFO. Múltiplos consumidores podem trabalhar em paralelo, cada um pegando uma tarefa diferente.
Bloqueando Pop para Eficiência
Usar lpop() em loop com sleep é ineficiente: desperdiça CPU verificando a fila continuamente. Redis oferece blpop() (blocking left pop) que aguarda até que um elemento esteja disponível:
def consumidor_tarefas_bloqueante(nome_consumidor):
"""Versão mais eficiente com blocking pop"""
while True:
# Aguardar até 0 segundos (indefinido) por uma tarefa
resultado = r.blpop('fila_tarefas', timeout=0)
if resultado is None:
print(f'[{nome_consumidor}] Timeout atingido')
continue
# blpop retorna tupla (chave, valor)
chave, tarefa_json = resultado
tarefa = json.loads(tarefa_json)
print(f'[{nome_consumidor}] Processando tarefa {tarefa["id"]}')
# Processar...
time.sleep(1)
print(f'[{nome_consumidor}] Concluída')
blpop() é superior porque não consome CPU enquanto aguarda. O timeout em segundos (0 = indefinido) define quanto tempo esperar. Se excedido sem nenhuma mensagem, retorna None.
Filas com Prioridade
Para tarefas com prioridades diferentes, use sorted sets:
def enfileirar_com_prioridade(chave_fila, tarefa, prioridade=0):
"""prioridade: -10 (alta) a 10 (baixa)"""
r.zadd(chave_fila, {json.dumps(tarefa): prioridade})
def desfileirar_com_prioridade(chave_fila):
"""Remove tarefa com maior prioridade (menor score)"""
resultado = r.zrange(chave_fila, 0, 0)
if resultado:
tarefa = resultado[0]
r.zrem(chave_fila, tarefa)
return json.loads(tarefa)
return None
# Exemplo
enfileirar_com_prioridade('fila_prioritaria',
{'id': 1, 'tipo': 'email'},
prioridade=5)
enfileirar_com_prioridade('fila_prioritaria',
{'id': 2, 'tipo': 'alerta'},
prioridade=-10) # Alta prioridade
tarefa = desfileirar_com_prioridade('fila_prioritaria')
print(tarefa) # {'id': 2, 'tipo': 'alerta'} (menor score = maior prioridade)
Padrões Avançados e Boas Práticas
Connection Pooling
Em aplicações reais, você não cria uma nova conexão Redis para cada operação. Conexões são custosas. Use connection pooling: um conjunto de conexões reutilizáveis mantidas abertas.
import redis
# Pool padrão (máximo 50 conexões)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
# Ou, mais simples, redis-py cria um pool automaticamente:
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# Internamente já usa pooling
Pipelines para Múltiplas Operações
Se você precisa executar múltiplas operações Redis, use pipelines para agrupá-las e enviar em uma única requisição:
# Sem pipeline: 3 requisições
r.set('chave1', 'valor1')
r.set('chave2', 'valor2')
r.set('chave3', 'valor3')
# Com pipeline: 1 requisição
pipe = r.pipeline()
pipe.set('chave1', 'valor1')
pipe.set('chave2', 'valor2')
pipe.set('chave3', 'valor3')
results = pipe.execute() # Execute todas de uma vez
Pipelines reduzem latência de rede significativamente em operações em batch.
Tratamento de Erros
Redis pode ficar indisponível. Sempre trate exceções:
import redis
from redis.exceptions import ConnectionError, TimeoutError
try:
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r.ping()
except ConnectionError:
print('Falha ao conectar ao Redis')
except TimeoutError:
print('Timeout na conexão com Redis')
# Para operações, use try-except
try:
valor = r.get('chave')
except redis.RedisError as e:
print(f'Erro Redis: {e}')
Expiração e Limpeza
Sempre defina TTL em dados temporários. Dados sem expiração causam vazamento de memória:
# Ruim: nenhuma expiração
r.set('sessao_user', json.dumps(dados_sessao))
# Bom: expira em 24 horas
r.setex('sessao_user', 86400, json.dumps(dados_sessao))
# Ou:
r.set('sessao_user', json.dumps(dados_sessao), ex=86400)
Você pode monitorar memória usada com:
info = r.info('memory')
print(f"Memória usada: {info['used_memory_human']}")
print(f"Pico de memória: {info['used_memory_peak_human']}")
Conclusão
Dominando Redis com Python, você adquiriu três superpoderes. Primeiro, cache inteligente que reduz carga massivamente em sistemas: dados frequentemente acessados são servidos em microsegundos, não segundos. Redis transforma performance de aplicações. Segundo, Pub/Sub para comunicação em tempo real: desacopla componentes do sistema permitindo que diferentes serviços se comuniquem sem conhecerem-se mutuamente, criando arquiteturas escaláveis e resilientes. Terceiro, filas robustas para processamento assíncrono: tarefas custosas são enfileiradas e processadas em background por workers, mantendo a aplicação responsiva.
A biblioteca redis-py abstrai a complexidade do protocolo Redis e oferece uma API Pythônica. Combine essas capacidades — cache + Pub/Sub + filas — e você resolve problemas que muitos desenvolvedores enfrentam com soluções caras e complexas. Use as práticas apresentadas: sempre defina TTLs, trate erros, use connection pooling e pipelines. Redis não é apenas um cache; é um alicerce para sistemas modernos de alta performance.