SQS: Arquitetura e Diferenças Fundamentais
AWS SQS (Simple Queue Service) é o serviço de fila mais maduro da AWS, essencial para desacoplar aplicações em arquiteturas distribuídas. Existem dois tipos principais: Standard e FIFO, cada um com garantias e características distintas que definem sua aplicação em produção.
Standard Queue oferece throughput ilimitado e entrega at-least-once, mas sem garantia de ordem. Use quando velocidade é crítica e duplicações podem ser tratadas idempotentemente. FIFO Queue garante entrega exatamente uma vez (exactly-once) e preserva ordem, ideal para operações financeiras ou sequências críticas. A escolha impacta diretamente sua arquitetura de consumidores e tolerância a falhas.
Implementação Prática: Standard vs FIFO
Standard Queue
import boto3
import json
from datetime import datetime
sqs = boto3.client('sqs')
queue_url = 'https://queue.amazonaws.com/123456789/my-standard-queue'
# Produzir mensagens
def send_message_standard(body, attributes=None):
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(body),
MessageAttributes=attributes or {}
)
return response['MessageId']
# Consumir com polling
def consume_standard():
messages = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20, # Long polling
VisibilityTimeout=30
)
for msg in messages.get('Messages', []):
try:
body = json.loads(msg['Body'])
process_message(body)
# Deletar apenas após processamento bem-sucedido
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=msg['ReceiptHandle']
)
except Exception as e:
print(f"Erro ao processar: {e}")
# Não deletar = volta à fila após visibility timeout
def process_message(body):
print(f"Processando: {body} às {datetime.now()}")
Standard queues permitem envios rápidos sem validações de ordem. A chave aqui é o long polling com WaitTimeSeconds=20, reduzindo custos e latência. Mensagens processadas com sucesso devem ser deletadas explicitamente; falhas resultam em reprocessamento automático.
FIFO Queue
# Produzir mensagens FIFO
def send_message_fifo(body, deduplication_id, group_id):
response = sqs.send_message(
QueueUrl='https://queue.amazonaws.com/123456789/my-fifo.fifo',
MessageBody=json.dumps(body),
MessageDeduplicationId=deduplication_id, # Garante idempotência
MessageGroupId=group_id # Preserva ordem por grupo
)
return response['MessageId']
# Consumir FIFO respeitando ordem
def consume_fifo():
messages = sqs.receive_message(
QueueUrl='https://queue.amazonaws.com/123456789/my-fifo.fifo',
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=60 # Geralmente maior que Standard
)
for msg in messages.get('Messages', []):
receipt_handle = msg['ReceiptHandle']
body = json.loads(msg['Body'])
if process_order(body):
sqs.delete_message(
QueueUrl='https://queue.amazonaws.com/123456789/my-fifo.fifo',
ReceiptHandle=receipt_handle
)
def process_order(order):
# Operação idempotente: pode ser executada múltiplas vezes
user_id = order['user_id']
amount = order['amount']
print(f"Processando pedido {user_id}: R${amount}")
return True
Em FIFO, o MessageGroupId garante que mensagens do mesmo grupo (ex: mesmo usuário) sejam processadas sequencialmente. O MessageDeduplicationId previne duplicatas automáticas em caso de retransmissão.
Visibility Timeout e Dead Letter Queue (DLQ)
Visibility Timeout em Profundidade
O visibility timeout é o período em que uma mensagem fica invisível após recebida. Se não for deletada nesse intervalo, retorna à fila para reprocessamento. Em produção, defina um valor maior que o tempo máximo de processamento esperado.
# Ajustar visibility timeout dinamicamente
def consume_with_dynamic_timeout():
messages = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
VisibilityTimeout=30
)
for msg in messages.get('Messages', []):
receipt_handle = msg['ReceiptHandle']
start = datetime.now()
try:
# Processar mensagem
result = heavy_processing(msg['Body'])
# Se demorar muito, estender timeout
elapsed = (datetime.now() - start).total_seconds()
if elapsed > 20:
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=int(elapsed + 10)
)
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
except Exception as e:
print(f"Falha permanente: {e}")
# Não fazer nada: deixar timeout expirar
def heavy_processing(body):
import time
time.sleep(5)
return json.loads(body)
Dead Letter Queue (DLQ)
Configure uma DLQ na política de redrive para capturar mensagens que falham repetidamente. Isso evita loops infinitos e permite debugging posterior.
# Criar DLQ
dlq_response = sqs.create_queue(QueueName='my-dlq')
dlq_url = dlq_response['QueueUrl']
# Obter ARN da DLQ
dlq_attrs = sqs.get_queue_attributes(QueueUrl=dlq_url, AttributeNames=['QueueArn'])
dlq_arn = dlq_attrs['Attributes']['QueueArn']
# Configurar redrive policy na fila principal
sqs.set_queue_attributes(
QueueUrl=queue_url,
Attributes={
'RedrivePolicy': json.dumps({
'deadLetterTargetArn': dlq_arn,
'maxReceiveCount': 3 # Após 3 tentativas, vai para DLQ
})
}
)
# Consumir DLQ para análise
def process_dlq():
messages = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10
)
for msg in messages.get('Messages', []):
print(f"Mensagem morta: {msg['Body']}")
# Log para análise ou reprocessamento manual
sqs.delete_message(QueueUrl=dlq_url, ReceiptHandle=msg['ReceiptHandle'])
Uma DLQ bem configurada é sua rede de segurança em produção. Monitore-a constantemente; mensagens na DLQ indicam bugs ou dados inválidos que requerem investigação.
Otimizações e Padrões de Produção
Batch Processing e Métricas
import logging
logger = logging.getLogger(__name__)
def batch_consumer(queue_url, worker_threads=5):
from concurrent.futures import ThreadPoolExecutor
def worker():
while True:
messages = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=20,
VisibilityTimeout=60
)
if 'Messages' not in messages:
continue
processed = 0
failed = 0
for msg in messages['Messages']:
try:
process_message(msg['Body'])
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=msg['ReceiptHandle']
)
processed += 1
except Exception as e:
logger.error(f"Falha: {e}", exc_info=True)
failed += 1
logger.info(f"Batch: {processed} sucesso, {failed} falhas")
with ThreadPoolExecutor(max_workers=worker_threads) as executor:
for _ in range(worker_threads):
executor.submit(worker)
Práticas essenciais: use long polling (WaitTimeSeconds > 0) para reduzir custos; processe em lotes para eficiência; implemente idempotência em consumidores; monitore ApproximateAgeOfOldestMessage para detectar atrasos.
Dica de Ouro: Em produção, sempre configure CloudWatch alarms para tamanho da fila, tempo de retenção de mensagens e contagem de mensagens na DLQ. Falhas silenciosas são piores que crashes detectáveis.
Conclusão
Dominar SQS requer compreensão de três pilares: (1) escolher Standard ou FIFO baseado em garantias de ordem vs throughput; (2) configurar visibility timeout e DLQ apropriadamente para resiliência; (3) implementar consumidores idempotentes com processamento em lotes e monitoramento ativo. A transição para produção é segura quando você entende que filas desacoplam responsabilidades, não eliminam erros—eles apenas os tornam gerenciáveis.