DynamoDB Streams: Capturando Mudanças em Tempo Real
DynamoDB Streams permite capturar modificações (insert, update, delete) em uma tabela e processá-las de forma assíncrona. Cada stream registra a sequência exata de alterações com informações da chave, imagem anterior e nova. Isso é essencial para arquiteturas que precisam sincronizar dados entre sistemas, alimentar data lakes ou disparar notificações em tempo real.
Para ativar streams, você configura na tabela o tipo de informação capturada: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE ou NEW_AND_OLD_IMAGES. A integração natural com Lambda permite processar registros automaticamente sem gerenciar infraestrutura. Aqui está um exemplo funcional que processa eventos de stream:
import json
import boto3
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('usuarios')
def lambda_handler(event, context):
"""Processa eventos do DynamoDB Stream"""
for record in event['Records']:
event_name = record['eventName'] # INSERT, MODIFY, REMOVE
if event_name == 'INSERT':
new_image = record['dynamodb']['NewImage']
user_id = new_image['userId']['S']
email = new_image['email']['S']
print(f"Novo usuário: {user_id} - {email}")
# Aqui você poderia enviar email de boas-vindas via SNS
elif event_name == 'MODIFY':
old_image = record['dynamodb']['OldImage']
new_image = record['dynamodb']['NewImage']
print(f"Alteração detectada: {json.dumps(new_image)}")
# Comparar e registrar mudanças em auditoria
elif event_name == 'REMOVE':
keys = record['dynamodb']['Keys']
print(f"Registro deletado: {keys}")
return {'statusCode': 200, 'body': 'Processado com sucesso'}
Configuração de Stream na Tabela
Para criar uma tabela com stream ativo via Terraform ou CloudFormation, defina o parâmetro StreamSpecification. A melhor prática é usar NEW_AND_OLD_IMAGES apenas quando realmente necessário, pois aumenta o custo. Para auditoria simples, KEYS_ONLY é mais econômico.
DynamoDB Transactions: Garantindo Consistência Multi-Registro
Transactions no DynamoDB garantem que múltiplas operações sejam executadas atomicamente: todas bem-sucedidas ou todas revertidas. Isso é crítico em cenários como transferência entre contas, atualização de estoque com desconto aplicado ou criação de relacionamentos que não podem ficar inconsistentes.
Existem dois modos: TransactWriteItems (até 25 operações) e TransactGetItems (até 25 leituras). Ao contrário de SQL, não há locks de leitura no DynamoDB — use transações apenas quando escritas precisam ser atômicas. Veja um exemplo prático de transferência de pontos entre dois usuários:
import boto3
from botocore.exceptions import ClientError
dynamodb = boto3.client('dynamodb')
def transferir_pontos(usuario_origem, usuario_destino, pontos):
"""
Transfere pontos entre usuários de forma atômica.
Se qualquer operação falhar, ambas são revertidas.
"""
try:
response = dynamodb.transact_write_items(
TransactItems=[
{
'Update': {
'TableName': 'usuarios',
'Key': {'userId': {'S': usuario_origem}},
'UpdateExpression': 'SET pontos = pontos - :pts, atualizadoEm = :agora',
'ExpressionAttributeValues': {
':pts': {'N': str(pontos)},
':agora': {'S': datetime.now().isoformat()}
},
'ConditionExpression': 'pontos >= :pts' # Valida antes
}
},
{
'Update': {
'TableName': 'usuarios',
'Key': {'userId': {'S': usuario_destino}},
'UpdateExpression': 'SET pontos = pontos + :pts, atualizadoEm = :agora',
'ExpressionAttributeValues': {
':pts': {'N': str(pontos)},
':agora': {'S': datetime.now().isoformat()}
}
}
}
]
)
print("Transferência bem-sucedida")
return True
except ClientError as e:
if e.response['Error']['Code'] == 'TransactionCanceledException':
print(f"Transação cancelada: {e.response['Error']['Message']}")
# Pode ser falta de saldo ou outro erro de validação
return False
from datetime import datetime
Padrões de Erro e Tratamento
Exceções de transação requerem retry logic com exponential backoff. A chave é implementar idempotência: sua operação deve produzir o mesmo resultado se executada múltiplas vezes. Inclua um requestId único para detectar duplicatas.
DAX Cache: Acelerando Leituras com Cache em Memória
DAX (DynamoDB Accelerator) é um cache gerenciado que fica entre sua aplicação e DynamoDB, reduzindo latência de milissegundos para microsegundos e economizando unidades de leitura. É especialmente eficaz para workloads com hot keys (poucos registros acessados repetidamente).
DAX funciona transparentemente: você usa o mesmo SDK do DynamoDB, apenas apontando para o endpoint do cluster DAX. Caches são invalidados automaticamente quando você escreve via DAX, mantendo consistência. Veja como integrar:
import amazondax
import boto3
# Criar cliente DAX em vez do DynamoDB normal
dax_endpoint = 'my-cluster.dax.amazonaws.com'
dax = amazondax.AmazonDaxClient.resource(endpoint_url=f'http://{dax_endpoint}:8111')
# Usar normalmente como uma tabela DynamoDB
table = dax.Table('usuarios')
def buscar_usuario_com_cache(user_id):
"""
Primeira chamada consulta DynamoDB e armazena em cache (TTL padrão 5min).
Chamadas subsequentes vêm do cache sem custo de RCU.
"""
try:
response = table.get_item(Key={'userId': user_id})
user = response.get('Item')
print(f"Usuário {user_id} recuperado - TTL padrão: 5 minutos")
return user
except Exception as e:
print(f"Erro ao acessar DAX: {e}")
# Fallback para DynamoDB direto
dynamodb = boto3.resource('dynamodb')
return dynamodb.Table('usuarios').get_item(Key={'userId': user_id}).get('Item')
# Simular carga com acesso repetido
for i in range(100):
usuario = buscar_usuario_com_cache('user-123') # Mesmo user a cada iteração
Quando Usar DAX
Use DAX quando você tem padrão de leitura repetida ou workload de leitura pesada. Não recomendo para aplicações com muitas escritas distintas ou quando dados mudam frequentemente. Custo típico: $0.25/hora para cluster com 3 nós. Analise ROI antes de implementar.
Conclusão
Dominando esses três pilares você constrói aplicações DynamoDB verdadeiramente resilientes. Streams permitem propagação de dados assíncrona sem acoplamento. Transactions garantem consistência onde importa: transferências, inventário, estados críticos. DAX multiplica performance de leitura quando você tem padrões previsíveis. A combinação desses recursos resolve 95% dos problemas reais em produção.