Entendendo CQRS: O Padrão de Separação de Responsabilidades
CQRS significa Command Query Responsibility Segregation — um padrão arquitetural que separa operações de escrita (Commands) de operações de leitura (Queries). A ideia central é simples: um comando altera o estado do sistema, enquanto uma query apenas consulta dados sem causar efeitos colaterais. Esta separação permite que você otimize cada lado independentemente.
Na prática, quando você tradiciona um repositório único para ler e escrever dados, você acaba enfrentando problemas como: modelos de dados complexos que tentam ser "tudo para todos", dificuldade em escalar operações de leitura independentemente de escrita, e lógica de negócio espalhada. CQRS resolve isso criando dois caminhos distintos. O lado de escrita (Command Side) processa operações que modificam estado, enquanto o lado de leitura (Query Side) mantém projeções otimizadas para consultas rápidas.
Event Sourcing: Armazenando o Histórico de Mudanças
Event Sourcing é um padrão complementar ao CQRS onde, em vez de armazenar apenas o estado atual de uma entidade, você armazena uma sequência imutável de eventos que descrevem tudo o que aconteceu. Pense em um extrato bancário: você não guarda apenas o saldo final, mas todo o histórico de transações que levou até ali.
Cada evento é um fato irrefutável do passado. Uma vez registrado, nunca muda. Você reconstrói o estado atual replicando todos os eventos na ordem em que ocorreram. Isso oferece auditoria completa, a capacidade de recriar qualquer estado anterior e até de corrigir bugs aplicando novos eventos sem perder o histórico. A desvantagem é a complexidade aumentada e a eventual consistency — nem todos terão o mesmo estado no mesmo instante em um sistema distribuído.
Implementação Prática em Go
Estrutura Base e Modelos de Domínio
Vamos construir um sistema de conta bancária simples usando CQRS e Event Sourcing. Começamos definindo os eventos do domínio:
package domain
import (
"time"
)
// Evento base que toda mudança será registrada
type Event interface {
GetAggregateID() string
GetTimestamp() time.Time
GetType() string
}
// Eventos específicos do domínio
type AccountCreated struct {
AggregateID string
Owner string
InitialBalance float64
Timestamp time.Time
}
func (e AccountCreated) GetAggregateID() string { return e.AggregateID }
func (e AccountCreated) GetTimestamp() time.Time { return e.Timestamp }
func (e AccountCreated) GetType() string { return "AccountCreated" }
type MoneyDeposited struct {
AggregateID string
Amount float64
Timestamp time.Time
}
func (e MoneyDeposited) GetAggregateID() string { return e.AggregateID }
func (e MoneyDeposited) GetTimestamp() time.Time { return e.Timestamp }
func (e MoneyDeposited) GetType() string { return "MoneyDeposited" }
type MoneyWithdrawn struct {
AggregateID string
Amount float64
Timestamp time.Time
}
func (e MoneyWithdrawn) GetAggregateID() string { return e.AggregateID }
func (e MoneyWithdrawn) GetTimestamp() time.Time { return e.Timestamp }
func (e MoneyWithdrawn) GetType() string { return "MoneyWithdrawn" }
// Entidade de Agregado (Aggregate Root)
type BankAccount struct {
ID string
Owner string
Balance float64
Version int
UncommittedEvents []Event
}
// Aplicar eventos ao agregado
func (ba *BankAccount) ApplyEvent(event Event) {
switch e := event.(type) {
case AccountCreated:
ba.ID = e.AggregateID
ba.Owner = e.Owner
ba.Balance = e.InitialBalance
case MoneyDeposited:
ba.Balance += e.Amount
case MoneyWithdrawn:
ba.Balance -= e.Amount
}
ba.Version++
}
// Comandos que geram eventos
func (ba *BankAccount) CreateAccount(id, owner string, initial float64) {
ba.ApplyEvent(AccountCreated{
AggregateID: id,
Owner: owner,
InitialBalance: initial,
Timestamp: time.Now(),
})
ba.UncommittedEvents = append(ba.UncommittedEvents, AccountCreated{
AggregateID: id,
Owner: owner,
InitialBalance: initial,
Timestamp: time.Now(),
})
}
func (ba *BankAccount) Deposit(amount float64) error {
if amount <= 0 {
return ErrInvalidAmount
}
event := MoneyDeposited{
AggregateID: ba.ID,
Amount: amount,
Timestamp: time.Now(),
}
ba.ApplyEvent(event)
ba.UncommittedEvents = append(ba.UncommittedEvents, event)
return nil
}
func (ba *BankAccount) Withdraw(amount float64) error {
if amount <= 0 {
return ErrInvalidAmount
}
if ba.Balance < amount {
return ErrInsufficientFunds
}
event := MoneyWithdrawn{
AggregateID: ba.ID,
Amount: amount,
Timestamp: time.Now(),
}
ba.ApplyEvent(event)
ba.UncommittedEvents = append(ba.UncommittedEvents, event)
return nil
}
var (
ErrInvalidAmount = errors.New("amount must be greater than zero")
ErrInsufficientFunds = errors.New("insufficient funds")
)
Armazenamento de Eventos (Event Store)
O Event Store é o coração do Event Sourcing. Ele persiste todos os eventos de forma imutável:
package persistence
import (
"encoding/json"
"sync"
"domain"
)
// EventStore armazena e recupera eventos
type EventStore struct {
events map[string][]domain.Event
mu sync.RWMutex
}
func NewEventStore() *EventStore {
return &EventStore{
events: make(map[string][]domain.Event),
}
}
// SaveEvents persiste novos eventos
func (es *EventStore) SaveEvents(aggregateID string, events []domain.Event) error {
es.mu.Lock()
defer es.mu.Unlock()
if _, exists := es.events[aggregateID]; !exists {
es.events[aggregateID] = []domain.Event{}
}
es.events[aggregateID] = append(es.events[aggregateID], events...)
return nil
}
// GetEvents recupera todos os eventos de um agregado
func (es *EventStore) GetEvents(aggregateID string) ([]domain.Event, error) {
es.mu.RLock()
defer es.mu.RUnlock()
events, exists := es.events[aggregateID]
if !exists {
return []domain.Event{}, nil
}
return events, nil
}
// RebuildAggregate reconstrói o estado a partir dos eventos
func (es *EventStore) RebuildAggregate(aggregateID string) (*domain.BankAccount, error) {
events, err := es.GetEvents(aggregateID)
if err != nil {
return nil, err
}
account := &domain.BankAccount{}
for _, event := range events {
account.ApplyEvent(event)
}
return account, nil
}
CQRS: Separando Commands e Queries
Agora implementamos o lado de Commands (escrita) e Queries (leitura):
package cqrs
import (
"domain"
"persistence"
)
// CommandBus executa comandos que modificam estado
type CommandBus struct {
eventStore *persistence.EventStore
}
func NewCommandBus(eventStore *persistence.EventStore) *CommandBus {
return &CommandBus{
eventStore: eventStore,
}
}
// CreateAccountCommand
type CreateAccountCommand struct {
AccountID string
Owner string
InitialBalance float64
}
func (cb *CommandBus) CreateAccount(cmd CreateAccountCommand) error {
account := &domain.BankAccount{}
account.CreateAccount(cmd.AccountID, cmd.Owner, cmd.InitialBalance)
return cb.eventStore.SaveEvents(cmd.AccountID, account.UncommittedEvents)
}
// DepositCommand
type DepositCommand struct {
AccountID string
Amount float64
}
func (cb *CommandBus) Deposit(cmd DepositCommand) error {
account, err := cb.eventStore.RebuildAggregate(cmd.AccountID)
if err != nil {
return err
}
if err := account.Deposit(cmd.Amount); err != nil {
return err
}
return cb.eventStore.SaveEvents(cmd.AccountID, account.UncommittedEvents)
}
// WithdrawCommand
type WithdrawCommand struct {
AccountID string
Amount float64
}
func (cb *CommandBus) Withdraw(cmd WithdrawCommand) error {
account, err := cb.eventStore.RebuildAggregate(cmd.AccountID)
if err != nil {
return err
}
if err := account.Withdraw(cmd.Amount); err != nil {
return err
}
return cb.eventStore.SaveEvents(cmd.AccountID, account.UncommittedEvents)
}
// QueryBus executa consultas otimizadas
type QueryBus struct {
readModel map[string]*AccountReadModel
}
type AccountReadModel struct {
AccountID string
Owner string
Balance float64
}
func NewQueryBus() *QueryBus {
return &QueryBus{
readModel: make(map[string]*AccountReadModel),
}
}
// GetAccountQuery
type GetAccountQuery struct {
AccountID string
}
func (qb *QueryBus) GetAccount(query GetAccountQuery) (*AccountReadModel, error) {
model, exists := qb.readModel[query.AccountID]
if !exists {
return nil, ErrAccountNotFound
}
return model, nil
}
// UpdateReadModel sincroniza o modelo de leitura com novos eventos
func (qb *QueryBus) UpdateReadModel(accountID string, account *domain.BankAccount) {
qb.readModel[accountID] = &AccountReadModel{
AccountID: accountID,
Owner: account.Owner,
Balance: account.Balance,
}
}
var ErrAccountNotFound = errors.New("account not found")
Orquestrando Tudo: Application Service
Um Application Service conecta commands, eventos e a query model:
package application
import (
"cqrs"
"persistence"
)
type BankingService struct {
commandBus *cqrs.CommandBus
queryBus *cqrs.QueryBus
eventStore *persistence.EventStore
}
func NewBankingService(eventStore *persistence.EventStore) *BankingService {
return &BankingService{
commandBus: cqrs.NewCommandBus(eventStore),
queryBus: cqrs.NewQueryBus(),
eventStore: eventStore,
}
}
func (bs *BankingService) CreateAccount(accountID, owner string, initial float64) error {
cmd := cqrs.CreateAccountCommand{
AccountID: accountID,
Owner: owner,
InitialBalance: initial,
}
if err := bs.commandBus.CreateAccount(cmd); err != nil {
return err
}
// Sincroniza a read model
account, _ := bs.eventStore.RebuildAggregate(accountID)
bs.queryBus.UpdateReadModel(accountID, account)
return nil
}
func (bs *BankingService) Deposit(accountID string, amount float64) error {
cmd := cqrs.DepositCommand{
AccountID: accountID,
Amount: amount,
}
if err := bs.commandBus.Deposit(cmd); err != nil {
return err
}
account, _ := bs.eventStore.RebuildAggregate(accountID)
bs.queryBus.UpdateReadModel(accountID, account)
return nil
}
func (bs *BankingService) Withdraw(accountID string, amount float64) error {
cmd := cqrs.WithdrawCommand{
AccountID: accountID,
Amount: amount,
}
if err := bs.commandBus.Withdraw(cmd); err != nil {
return err
}
account, _ := bs.eventStore.RebuildAggregate(accountID)
bs.queryBus.UpdateReadModel(accountID, account)
return nil
}
func (bs *BankingService) GetAccountBalance(accountID string) (*cqrs.AccountReadModel, error) {
return bs.queryBus.GetAccount(cqrs.GetAccountQuery{AccountID: accountID})
}
Exemplo de Uso Completo
package main
import (
"fmt"
"application"
"persistence"
)
func main() {
// Inicializa Event Store
eventStore := persistence.NewEventStore()
// Cria o serviço da aplicação
service := application.NewBankingService(eventStore)
// Executa comandos
err := service.CreateAccount("ACC001", "João Silva", 1000.00)
if err != nil {
fmt.Println("Erro ao criar conta:", err)
return
}
err = service.Deposit("ACC001", 500.00)
if err != nil {
fmt.Println("Erro ao depositar:", err)
return
}
err = service.Withdraw("ACC001", 200.00)
if err != nil {
fmt.Println("Erro ao sacar:", err)
return
}
// Consulta o estado atual (via Read Model)
account, err := service.GetAccountBalance("ACC001")
if err != nil {
fmt.Println("Erro ao consultar:", err)
return
}
fmt.Printf("Conta: %s\n", account.AccountID)
fmt.Printf("Titular: %s\n", account.Owner)
fmt.Printf("Saldo: R$ %.2f\n", account.Balance)
// Reconstrói a conta a partir do Event Store (simulando auditar histórico)
rebuilt, _ := eventStore.RebuildAggregate("ACC001")
fmt.Printf("Saldo reconstruído: R$ %.2f\n", rebuilt.Balance)
}
Padrões Avançados e Considerações
Snapshotting para Performance
Em sistemas com muitos eventos, replicar todos pode ser lento. Snapshots guardam o estado em pontos específicos:
package persistence
type Snapshot struct {
AggregateID string
Version int
State interface{}
}
type EventStoreWithSnapshot struct {
events map[string][]domain.Event
snapshots map[string]Snapshot
mu sync.RWMutex
}
func (es *EventStoreWithSnapshot) RebuildAggregateWithSnapshot(
aggregateID string) (*domain.BankAccount, error) {
es.mu.RLock()
snapshot, hasSnapshot := es.snapshots[aggregateID]
es.mu.RUnlock()
var startVersion int
account := &domain.BankAccount{}
if hasSnapshot {
account = snapshot.State.(*domain.BankAccount)
startVersion = snapshot.Version
}
es.mu.RLock()
events := es.events[aggregateID]
es.mu.RUnlock()
for i, event := range events {
if i >= startVersion {
account.ApplyEvent(event)
}
}
return account, nil
}
func (es *EventStoreWithSnapshot) CreateSnapshot(
aggregateID string, version int, account *domain.BankAccount) {
es.mu.Lock()
defer es.mu.Unlock()
es.snapshots[aggregateID] = Snapshot{
AggregateID: aggregateID,
Version: version,
State: account,
}
}
Eventual Consistency e Sincronização
Em ambientes distribuídos, a read model pode estar atrasada. Use message brokers para disseminar eventos:
package messaging
type EventPublisher interface {
Publish(event interface{}) error
}
type InMemoryEventBus struct {
subscribers map[string][]func(interface{})
mu sync.RWMutex
}
func NewInMemoryEventBus() *InMemoryEventBus {
return &InMemoryEventBus{
subscribers: make(map[string][]func(interface{})),
}
}
func (eb *InMemoryEventBus) Subscribe(eventType string, handler func(interface{})) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.subscribers[eventType] = append(eb.subscribers[eventType], handler)
}
func (eb *InMemoryEventBus) Publish(eventType string, event interface{}) {
eb.mu.RLock()
handlers := eb.subscribers[eventType]
eb.mu.RUnlock()
for _, handler := range handlers {
go handler(event)
}
}
Conclusão
Três pontos fundamentais que você deve levar deste artigo: Primeiro, CQRS e Event Sourcing resolvem problemas reais de escalabilidade e auditoria, separando a lógica de escrita da leitura e permitindo modelos otimizados para cada lado. Segundo, a implementação prática requer disciplina arquitetural — o código deve manter agregados puros, eventos imutáveis e uma cadeia clara de responsabilidade. Terceiro, estas técnicas trazem complexidade operacional que vale a pena apenas em sistemas que realmente precisam de auditoria completa, alta escala de leitura ou reconstrução de estado histórico; para CRUD simples, você estará sobreenginearing.