AWS Admin

Como Usar SQS em Profundidade: Standard, FIFO, DLQ e Visibility Timeout em Produção Já leu

SQS: Arquitetura e Diferenças Fundamentais AWS SQS (Simple Queue Service) é o serviço de fila mais maduro da AWS, essencial para desacoplar aplicações em arquiteturas distribuídas. Existem dois tipos principais: Standard e FIFO, cada um com garantias e características distintas que definem sua aplicação em produção. Standard Queue oferece throughput ilimitado e entrega at-least-once, mas sem garantia de ordem. Use quando velocidade é crítica e duplicações podem ser tratadas idempotentemente. FIFO Queue garante entrega exatamente uma vez (exactly-once) e preserva ordem, ideal para operações financeiras ou sequências críticas. A escolha impacta diretamente sua arquitetura de consumidores e tolerância a falhas. Implementação Prática: Standard vs FIFO Standard Queue Standard queues permitem envios rápidos sem validações de ordem. A chave aqui é o long polling com , reduzindo custos e latência. Mensagens processadas com sucesso devem ser deletadas explicitamente; falhas resultam em reprocessamento automático. FIFO Queue Em FIFO, o garante que mensagens do mesmo grupo (ex: mesmo usuário) sejam processadas sequencialmente. O

SQS: Arquitetura e Diferenças Fundamentais

AWS SQS (Simple Queue Service) é o serviço de fila mais maduro da AWS, essencial para desacoplar aplicações em arquiteturas distribuídas. Existem dois tipos principais: Standard e FIFO, cada um com garantias e características distintas que definem sua aplicação em produção.

Standard Queue oferece throughput ilimitado e entrega at-least-once, mas sem garantia de ordem. Use quando velocidade é crítica e duplicações podem ser tratadas idempotentemente. FIFO Queue garante entrega exatamente uma vez (exactly-once) e preserva ordem, ideal para operações financeiras ou sequências críticas. A escolha impacta diretamente sua arquitetura de consumidores e tolerância a falhas.

Implementação Prática: Standard vs FIFO

Standard Queue

import boto3
import json
from datetime import datetime

sqs = boto3.client('sqs')
queue_url = 'https://queue.amazonaws.com/123456789/my-standard-queue'

# Produzir mensagens
def send_message_standard(body, attributes=None):
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(body),
        MessageAttributes=attributes or {}
    )
    return response['MessageId']

# Consumir com polling
def consume_standard():
    messages = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20,  # Long polling
        VisibilityTimeout=30
    )

    for msg in messages.get('Messages', []):
        try:
            body = json.loads(msg['Body'])
            process_message(body)
            # Deletar apenas após processamento bem-sucedido
            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=msg['ReceiptHandle']
            )
        except Exception as e:
            print(f"Erro ao processar: {e}")
            # Não deletar = volta à fila após visibility timeout

def process_message(body):
    print(f"Processando: {body} às {datetime.now()}")

Standard queues permitem envios rápidos sem validações de ordem. A chave aqui é o long polling com WaitTimeSeconds=20, reduzindo custos e latência. Mensagens processadas com sucesso devem ser deletadas explicitamente; falhas resultam em reprocessamento automático.

FIFO Queue

# Produzir mensagens FIFO
def send_message_fifo(body, deduplication_id, group_id):
    response = sqs.send_message(
        QueueUrl='https://queue.amazonaws.com/123456789/my-fifo.fifo',
        MessageBody=json.dumps(body),
        MessageDeduplicationId=deduplication_id,  # Garante idempotência
        MessageGroupId=group_id  # Preserva ordem por grupo
    )
    return response['MessageId']

# Consumir FIFO respeitando ordem
def consume_fifo():
    messages = sqs.receive_message(
        QueueUrl='https://queue.amazonaws.com/123456789/my-fifo.fifo',
        MaxNumberOfMessages=10,
        WaitTimeSeconds=20,
        VisibilityTimeout=60  # Geralmente maior que Standard
    )

    for msg in messages.get('Messages', []):
        receipt_handle = msg['ReceiptHandle']
        body = json.loads(msg['Body'])

        if process_order(body):
            sqs.delete_message(
                QueueUrl='https://queue.amazonaws.com/123456789/my-fifo.fifo',
                ReceiptHandle=receipt_handle
            )

def process_order(order):
    # Operação idempotente: pode ser executada múltiplas vezes
    user_id = order['user_id']
    amount = order['amount']
    print(f"Processando pedido {user_id}: R${amount}")
    return True

Em FIFO, o MessageGroupId garante que mensagens do mesmo grupo (ex: mesmo usuário) sejam processadas sequencialmente. O MessageDeduplicationId previne duplicatas automáticas em caso de retransmissão.

Visibility Timeout e Dead Letter Queue (DLQ)

Visibility Timeout em Profundidade

O visibility timeout é o período em que uma mensagem fica invisível após recebida. Se não for deletada nesse intervalo, retorna à fila para reprocessamento. Em produção, defina um valor maior que o tempo máximo de processamento esperado.

# Ajustar visibility timeout dinamicamente
def consume_with_dynamic_timeout():
    messages = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        VisibilityTimeout=30
    )

    for msg in messages.get('Messages', []):
        receipt_handle = msg['ReceiptHandle']
        start = datetime.now()

        try:
            # Processar mensagem
            result = heavy_processing(msg['Body'])

            # Se demorar muito, estender timeout
            elapsed = (datetime.now() - start).total_seconds()
            if elapsed > 20:
                sqs.change_message_visibility(
                    QueueUrl=queue_url,
                    ReceiptHandle=receipt_handle,
                    VisibilityTimeout=int(elapsed + 10)
                )

            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=receipt_handle
            )
        except Exception as e:
            print(f"Falha permanente: {e}")
            # Não fazer nada: deixar timeout expirar

def heavy_processing(body):
    import time
    time.sleep(5)
    return json.loads(body)

Dead Letter Queue (DLQ)

Configure uma DLQ na política de redrive para capturar mensagens que falham repetidamente. Isso evita loops infinitos e permite debugging posterior.

# Criar DLQ
dlq_response = sqs.create_queue(QueueName='my-dlq')
dlq_url = dlq_response['QueueUrl']

# Obter ARN da DLQ
dlq_attrs = sqs.get_queue_attributes(QueueUrl=dlq_url, AttributeNames=['QueueArn'])
dlq_arn = dlq_attrs['Attributes']['QueueArn']

# Configurar redrive policy na fila principal
sqs.set_queue_attributes(
    QueueUrl=queue_url,
    Attributes={
        'RedrivePolicy': json.dumps({
            'deadLetterTargetArn': dlq_arn,
            'maxReceiveCount': 3  # Após 3 tentativas, vai para DLQ
        })
    }
)

# Consumir DLQ para análise
def process_dlq():
    messages = sqs.receive_message(
        QueueUrl=dlq_url,
        MaxNumberOfMessages=10
    )

    for msg in messages.get('Messages', []):
        print(f"Mensagem morta: {msg['Body']}")
        # Log para análise ou reprocessamento manual
        sqs.delete_message(QueueUrl=dlq_url, ReceiptHandle=msg['ReceiptHandle'])

Uma DLQ bem configurada é sua rede de segurança em produção. Monitore-a constantemente; mensagens na DLQ indicam bugs ou dados inválidos que requerem investigação.

Otimizações e Padrões de Produção

Batch Processing e Métricas

import logging

logger = logging.getLogger(__name__)

def batch_consumer(queue_url, worker_threads=5):
    from concurrent.futures import ThreadPoolExecutor

    def worker():
        while True:
            messages = sqs.receive_message(
                QueueUrl=queue_url,
                MaxNumberOfMessages=10,
                WaitTimeSeconds=20,
                VisibilityTimeout=60
            )

            if 'Messages' not in messages:
                continue

            processed = 0
            failed = 0

            for msg in messages['Messages']:
                try:
                    process_message(msg['Body'])
                    sqs.delete_message(
                        QueueUrl=queue_url,
                        ReceiptHandle=msg['ReceiptHandle']
                    )
                    processed += 1
                except Exception as e:
                    logger.error(f"Falha: {e}", exc_info=True)
                    failed += 1

            logger.info(f"Batch: {processed} sucesso, {failed} falhas")

    with ThreadPoolExecutor(max_workers=worker_threads) as executor:
        for _ in range(worker_threads):
            executor.submit(worker)

Práticas essenciais: use long polling (WaitTimeSeconds > 0) para reduzir custos; processe em lotes para eficiência; implemente idempotência em consumidores; monitore ApproximateAgeOfOldestMessage para detectar atrasos.

Dica de Ouro: Em produção, sempre configure CloudWatch alarms para tamanho da fila, tempo de retenção de mensagens e contagem de mensagens na DLQ. Falhas silenciosas são piores que crashes detectáveis.

Conclusão

Dominar SQS requer compreensão de três pilares: (1) escolher Standard ou FIFO baseado em garantias de ordem vs throughput; (2) configurar visibility timeout e DLQ apropriadamente para resiliência; (3) implementar consumidores idempotentes com processamento em lotes e monitoramento ativo. A transição para produção é segura quando você entende que filas desacoplam responsabilidades, não eliminam erros—eles apenas os tornam gerenciáveis.

Referências


Artigos relacionados