Entendendo os Três Pilares do Kinesis
O AWS Kinesis é um serviço gerenciado para processar dados em tempo real em larga escala. Existem três componentes principais: Kinesis Data Streams (para captura de dados brutos contínuos), Kinesis Firehose (para entrega automática em data lakes) e Kinesis Analytics (para análise SQL em tempo real). Cada um resolve um problema específico e frequentemente são usados em conjunto em arquiteturas modernas.
Compreender quando usar cada um é fundamental. Streams oferece controle fino e baixa latência—ideal para processamento real-time exigente. Firehose simplifica a ingestão com transformações automáticas—perfeito para arquivos em S3 ou Redshift. Analytics permite consultas SQL sem escrever código de processamento—indicado para monitoramento e alertas rápidos.
Kinesis Data Streams na Prática
Produzindo Dados
O Data Streams funciona como fila distribuída com shards paralelos. Você envia registros com uma PartitionKey que determina qual shard receberá o dado. Veja um produtor Python real:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def enviar_evento_usuario(user_id, acao):
payload = {
'user_id': user_id,
'acao': acao,
'timestamp': datetime.utcnow().isoformat(),
'origem': 'mobile-app'
}
response = kinesis.put_record(
StreamName='eventos-usuarios',
Data=json.dumps(payload),
PartitionKey=str(user_id) # Agrupa eventos do mesmo usuário
)
print(f"Registro {response['ShardId']}: {response['SequenceNumber']}")
# Uso
enviar_evento_usuario(12345, 'login')
enviar_evento_usuario(12345, 'visualizou_produto')
Consumindo Dados
Consumidores leem dados em ordem dentro de cada shard. Este exemplo processa eventos em tempo real:
import json
kinesis = boto3.client('kinesis')
def processar_stream():
response = kinesis.describe_stream(StreamName='eventos-usuarios')
shard_ids = [shard['ShardId'] for shard in response['StreamDescription']['Shards']]
for shard_id in shard_ids:
shard_iterator = kinesis.get_shard_iterator(
StreamName='eventos-usuarios',
ShardId=shard_id,
ShardIteratorType='LATEST'
)['ShardIterator']
while shard_iterator:
records = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for record in records['Records']:
data = json.loads(record['Data'])
print(f"Processando: {data['acao']} do usuário {data['user_id']}")
shard_iterator = records['NextShardIterator']
processar_stream()
Use Kinesis Client Library (KCL) ou Lambda com event source mappings para produção—eles gerenciam shards automaticamente.
Firehose: Entrega Automatizada
Configuração e Transformação
Firehose entrega dados em lotes para S3, Redshift ou Splunk com latência de 1-5 minutos. Você pode transformar registros com Lambda durante a ingestão:
import boto3
import json
import base64
firehose = boto3.client('firehose')
def lambda_handler(event, context):
"""Função Lambda chamada pelo Firehose para transformar registros"""
output = []
for record in event['records']:
data = json.loads(base64.b64decode(record['data']))
# Transformação: adicionar campo processado
data['processado_em'] = 'lambda-transform'
data['preco'] = float(data.get('preco', 0))
# Re-codificar
transformed = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(
json.dumps(data).encode('utf-8')
).decode('utf-8')
}
output.append(transformed)
return {'records': output}
Enviando para Firehose
def enviar_para_data_lake(produto_id, preco, quantidade):
firehose.put_record(
DeliveryStreamName='vendas-data-lake',
Record={
'Data': json.dumps({
'produto_id': produto_id,
'preco': preco,
'quantidade': quantidade
})
}
)
# Firehose agrupa 128 MB ou 15 minutos antes de escrever em S3
enviar_para_data_lake('PROD001', 99.90, 5)
Firehose é ideal para pipelines ETL simples onde latência de minutos é aceitável. Elimina necessidade de gerenciar consumers, scaling ou checkpoints.
Kinesis Analytics para Análise em Tempo Real
Consultas SQL em Streaming
Analytics transforma dados Streams em tabelas que você consulta com SQL. Perfeito para alertas e agregações imediatas:
-- Criar aplicação Analytics conectada ao Stream
CREATE APPLICATION vendas_tempo_real;
-- Criar input (source)
CREATE OR REPLACE STREAM SOURCE_SQL_STREAM (
product_id VARCHAR(32),
valor DOUBLE,
timestamp_evento BIGINT
);
-- Criar sink (resultado)
CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
categoria VARCHAR(32),
total_vendas DOUBLE,
quantidade_vendas BIGINT,
tempo_janela TIMESTAMP
);
-- Agregação com janela de tempo (tumbling window)
INSERT INTO DESTINATION_SQL_STREAM
SELECT
product_id,
SUM(valor) as total_vendas,
COUNT(*) as quantidade_vendas,
TUMBLE_START(ROWTIME, INTERVAL '1' MINUTE) as tempo_janela
FROM SOURCE_SQL_STREAM
GROUP BY TUMBLE(ROWTIME, INTERVAL '1' MINUTE), product_id
HAVING SUM(valor) > 1000; -- Alerta: mais de R$ 1000 por minuto
Enviando Resultados
Os resultados da Analytics podem ser consumidos por Lambda ou enviados direto para SNS para alertas:
# No seu código de alerta
if total_vendas > 1000:
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789:alertas-vendas',
Subject='Alerta: Pico de Vendas',
Message=f'Total: R$ {total_vendas} em {quantidade_vendas} transações'
)
Combine Streams → Analytics → SNS para um sistema de monitoramento robusto sem escrever código de processamento distribuído.
Caso de Uso Integrado: Sistema de Fraude em Tempo Real
Em um projeto real, você captura transações com Streams, processa com Analytics, enriquece com Lambda, e archiva no S3 via Firehose. Transações suspeitas disparam alertas instantaneamente enquanto dados históricos alimentam modelos de ML.
Essa arquitetura processa milhões de eventos diários com latência de segundos, escalando automaticamente. Custos são previsíveis (você paga por shards e volume processado) e operação é mínima comparada a Kafka self-hosted.
Conclusão
Tres aprendizados principais: (1) Kinesis Data Streams oferece controle e baixa latência para processamento complexo—use quando precisar de throughput garantido e lógica customizada; (2) Firehose elimina complexidade operacional para entrega em data lakes—escolha quando latência de minutos é aceitável e transformações são simples; (3) Analytics ativa SQL em tempo real—implemente quando precisar de agregações, alertas e janelas de tempo sem escrever código distribuído.
Domine a escolha certa entre os três e você construirá pipelines de dados que escalam com seu negócio.