Go Admin

Boas Práticas de CQRS e Event Sourcing em Go: Implementação Prática para Times Ágeis Já leu

Entendendo CQRS: O Padrão de Separação de Responsabilidades CQRS significa Command Query Responsibility Segregation — um padrão arquitetural que separa operações de escrita (Commands) de operações de leitura (Queries). A ideia central é simples: um comando altera o estado do sistema, enquanto uma query apenas consulta dados sem causar efeitos colaterais. Esta separação permite que você otimize cada lado independentemente. Na prática, quando você tradiciona um repositório único para ler e escrever dados, você acaba enfrentando problemas como: modelos de dados complexos que tentam ser "tudo para todos", dificuldade em escalar operações de leitura independentemente de escrita, e lógica de negócio espalhada. CQRS resolve isso criando dois caminhos distintos. O lado de escrita (Command Side) processa operações que modificam estado, enquanto o lado de leitura (Query Side) mantém projeções otimizadas para consultas rápidas. Event Sourcing: Armazenando o Histórico de Mudanças Event Sourcing é um padrão complementar ao CQRS onde, em vez de armazenar apenas o estado atual de uma entidade,

Entendendo CQRS: O Padrão de Separação de Responsabilidades

CQRS significa Command Query Responsibility Segregation — um padrão arquitetural que separa operações de escrita (Commands) de operações de leitura (Queries). A ideia central é simples: um comando altera o estado do sistema, enquanto uma query apenas consulta dados sem causar efeitos colaterais. Esta separação permite que você otimize cada lado independentemente.

Na prática, quando você tradiciona um repositório único para ler e escrever dados, você acaba enfrentando problemas como: modelos de dados complexos que tentam ser "tudo para todos", dificuldade em escalar operações de leitura independentemente de escrita, e lógica de negócio espalhada. CQRS resolve isso criando dois caminhos distintos. O lado de escrita (Command Side) processa operações que modificam estado, enquanto o lado de leitura (Query Side) mantém projeções otimizadas para consultas rápidas.

Event Sourcing: Armazenando o Histórico de Mudanças

Event Sourcing é um padrão complementar ao CQRS onde, em vez de armazenar apenas o estado atual de uma entidade, você armazena uma sequência imutável de eventos que descrevem tudo o que aconteceu. Pense em um extrato bancário: você não guarda apenas o saldo final, mas todo o histórico de transações que levou até ali.

Cada evento é um fato irrefutável do passado. Uma vez registrado, nunca muda. Você reconstrói o estado atual replicando todos os eventos na ordem em que ocorreram. Isso oferece auditoria completa, a capacidade de recriar qualquer estado anterior e até de corrigir bugs aplicando novos eventos sem perder o histórico. A desvantagem é a complexidade aumentada e a eventual consistency — nem todos terão o mesmo estado no mesmo instante em um sistema distribuído.

Implementação Prática em Go

Estrutura Base e Modelos de Domínio

Vamos construir um sistema de conta bancária simples usando CQRS e Event Sourcing. Começamos definindo os eventos do domínio:

package domain

import (
    "time"
)

// Evento base que toda mudança será registrada
type Event interface {
    GetAggregateID() string
    GetTimestamp() time.Time
    GetType() string
}

// Eventos específicos do domínio
type AccountCreated struct {
    AggregateID string
    Owner       string
    InitialBalance float64
    Timestamp   time.Time
}

func (e AccountCreated) GetAggregateID() string { return e.AggregateID }
func (e AccountCreated) GetTimestamp() time.Time { return e.Timestamp }
func (e AccountCreated) GetType() string { return "AccountCreated" }

type MoneyDeposited struct {
    AggregateID string
    Amount      float64
    Timestamp   time.Time
}

func (e MoneyDeposited) GetAggregateID() string { return e.AggregateID }
func (e MoneyDeposited) GetTimestamp() time.Time { return e.Timestamp }
func (e MoneyDeposited) GetType() string { return "MoneyDeposited" }

type MoneyWithdrawn struct {
    AggregateID string
    Amount      float64
    Timestamp   time.Time
}

func (e MoneyWithdrawn) GetAggregateID() string { return e.AggregateID }
func (e MoneyWithdrawn) GetTimestamp() time.Time { return e.Timestamp }
func (e MoneyWithdrawn) GetType() string { return "MoneyWithdrawn" }

// Entidade de Agregado (Aggregate Root)
type BankAccount struct {
    ID            string
    Owner         string
    Balance       float64
    Version       int
    UncommittedEvents []Event
}

// Aplicar eventos ao agregado
func (ba *BankAccount) ApplyEvent(event Event) {
    switch e := event.(type) {
    case AccountCreated:
        ba.ID = e.AggregateID
        ba.Owner = e.Owner
        ba.Balance = e.InitialBalance
    case MoneyDeposited:
        ba.Balance += e.Amount
    case MoneyWithdrawn:
        ba.Balance -= e.Amount
    }
    ba.Version++
}

// Comandos que geram eventos
func (ba *BankAccount) CreateAccount(id, owner string, initial float64) {
    ba.ApplyEvent(AccountCreated{
        AggregateID: id,
        Owner:       owner,
        InitialBalance: initial,
        Timestamp:   time.Now(),
    })
    ba.UncommittedEvents = append(ba.UncommittedEvents, AccountCreated{
        AggregateID: id,
        Owner:       owner,
        InitialBalance: initial,
        Timestamp:   time.Now(),
    })
}

func (ba *BankAccount) Deposit(amount float64) error {
    if amount <= 0 {
        return ErrInvalidAmount
    }
    event := MoneyDeposited{
        AggregateID: ba.ID,
        Amount:      amount,
        Timestamp:   time.Now(),
    }
    ba.ApplyEvent(event)
    ba.UncommittedEvents = append(ba.UncommittedEvents, event)
    return nil
}

func (ba *BankAccount) Withdraw(amount float64) error {
    if amount <= 0 {
        return ErrInvalidAmount
    }
    if ba.Balance < amount {
        return ErrInsufficientFunds
    }
    event := MoneyWithdrawn{
        AggregateID: ba.ID,
        Amount:      amount,
        Timestamp:   time.Now(),
    }
    ba.ApplyEvent(event)
    ba.UncommittedEvents = append(ba.UncommittedEvents, event)
    return nil
}

var (
    ErrInvalidAmount = errors.New("amount must be greater than zero")
    ErrInsufficientFunds = errors.New("insufficient funds")
)

Armazenamento de Eventos (Event Store)

O Event Store é o coração do Event Sourcing. Ele persiste todos os eventos de forma imutável:

package persistence

import (
    "encoding/json"
    "sync"
    "domain"
)

// EventStore armazena e recupera eventos
type EventStore struct {
    events map[string][]domain.Event
    mu     sync.RWMutex
}

func NewEventStore() *EventStore {
    return &EventStore{
        events: make(map[string][]domain.Event),
    }
}

// SaveEvents persiste novos eventos
func (es *EventStore) SaveEvents(aggregateID string, events []domain.Event) error {
    es.mu.Lock()
    defer es.mu.Unlock()

    if _, exists := es.events[aggregateID]; !exists {
        es.events[aggregateID] = []domain.Event{}
    }

    es.events[aggregateID] = append(es.events[aggregateID], events...)
    return nil
}

// GetEvents recupera todos os eventos de um agregado
func (es *EventStore) GetEvents(aggregateID string) ([]domain.Event, error) {
    es.mu.RLock()
    defer es.mu.RUnlock()

    events, exists := es.events[aggregateID]
    if !exists {
        return []domain.Event{}, nil
    }

    return events, nil
}

// RebuildAggregate reconstrói o estado a partir dos eventos
func (es *EventStore) RebuildAggregate(aggregateID string) (*domain.BankAccount, error) {
    events, err := es.GetEvents(aggregateID)
    if err != nil {
        return nil, err
    }

    account := &domain.BankAccount{}
    for _, event := range events {
        account.ApplyEvent(event)
    }

    return account, nil
}

CQRS: Separando Commands e Queries

Agora implementamos o lado de Commands (escrita) e Queries (leitura):

package cqrs

import (
    "domain"
    "persistence"
)

// CommandBus executa comandos que modificam estado
type CommandBus struct {
    eventStore *persistence.EventStore
}

func NewCommandBus(eventStore *persistence.EventStore) *CommandBus {
    return &CommandBus{
        eventStore: eventStore,
    }
}

// CreateAccountCommand
type CreateAccountCommand struct {
    AccountID      string
    Owner          string
    InitialBalance float64
}

func (cb *CommandBus) CreateAccount(cmd CreateAccountCommand) error {
    account := &domain.BankAccount{}
    account.CreateAccount(cmd.AccountID, cmd.Owner, cmd.InitialBalance)

    return cb.eventStore.SaveEvents(cmd.AccountID, account.UncommittedEvents)
}

// DepositCommand
type DepositCommand struct {
    AccountID string
    Amount    float64
}

func (cb *CommandBus) Deposit(cmd DepositCommand) error {
    account, err := cb.eventStore.RebuildAggregate(cmd.AccountID)
    if err != nil {
        return err
    }

    if err := account.Deposit(cmd.Amount); err != nil {
        return err
    }

    return cb.eventStore.SaveEvents(cmd.AccountID, account.UncommittedEvents)
}

// WithdrawCommand
type WithdrawCommand struct {
    AccountID string
    Amount    float64
}

func (cb *CommandBus) Withdraw(cmd WithdrawCommand) error {
    account, err := cb.eventStore.RebuildAggregate(cmd.AccountID)
    if err != nil {
        return err
    }

    if err := account.Withdraw(cmd.Amount); err != nil {
        return err
    }

    return cb.eventStore.SaveEvents(cmd.AccountID, account.UncommittedEvents)
}

// QueryBus executa consultas otimizadas
type QueryBus struct {
    readModel map[string]*AccountReadModel
}

type AccountReadModel struct {
    AccountID string
    Owner     string
    Balance   float64
}

func NewQueryBus() *QueryBus {
    return &QueryBus{
        readModel: make(map[string]*AccountReadModel),
    }
}

// GetAccountQuery
type GetAccountQuery struct {
    AccountID string
}

func (qb *QueryBus) GetAccount(query GetAccountQuery) (*AccountReadModel, error) {
    model, exists := qb.readModel[query.AccountID]
    if !exists {
        return nil, ErrAccountNotFound
    }
    return model, nil
}

// UpdateReadModel sincroniza o modelo de leitura com novos eventos
func (qb *QueryBus) UpdateReadModel(accountID string, account *domain.BankAccount) {
    qb.readModel[accountID] = &AccountReadModel{
        AccountID: accountID,
        Owner:     account.Owner,
        Balance:   account.Balance,
    }
}

var ErrAccountNotFound = errors.New("account not found")

Orquestrando Tudo: Application Service

Um Application Service conecta commands, eventos e a query model:

package application

import (
    "cqrs"
    "persistence"
)

type BankingService struct {
    commandBus *cqrs.CommandBus
    queryBus   *cqrs.QueryBus
    eventStore *persistence.EventStore
}

func NewBankingService(eventStore *persistence.EventStore) *BankingService {
    return &BankingService{
        commandBus: cqrs.NewCommandBus(eventStore),
        queryBus:   cqrs.NewQueryBus(),
        eventStore: eventStore,
    }
}

func (bs *BankingService) CreateAccount(accountID, owner string, initial float64) error {
    cmd := cqrs.CreateAccountCommand{
        AccountID:      accountID,
        Owner:          owner,
        InitialBalance: initial,
    }

    if err := bs.commandBus.CreateAccount(cmd); err != nil {
        return err
    }

    // Sincroniza a read model
    account, _ := bs.eventStore.RebuildAggregate(accountID)
    bs.queryBus.UpdateReadModel(accountID, account)

    return nil
}

func (bs *BankingService) Deposit(accountID string, amount float64) error {
    cmd := cqrs.DepositCommand{
        AccountID: accountID,
        Amount:    amount,
    }

    if err := bs.commandBus.Deposit(cmd); err != nil {
        return err
    }

    account, _ := bs.eventStore.RebuildAggregate(accountID)
    bs.queryBus.UpdateReadModel(accountID, account)

    return nil
}

func (bs *BankingService) Withdraw(accountID string, amount float64) error {
    cmd := cqrs.WithdrawCommand{
        AccountID: accountID,
        Amount:    amount,
    }

    if err := bs.commandBus.Withdraw(cmd); err != nil {
        return err
    }

    account, _ := bs.eventStore.RebuildAggregate(accountID)
    bs.queryBus.UpdateReadModel(accountID, account)

    return nil
}

func (bs *BankingService) GetAccountBalance(accountID string) (*cqrs.AccountReadModel, error) {
    return bs.queryBus.GetAccount(cqrs.GetAccountQuery{AccountID: accountID})
}

Exemplo de Uso Completo

package main

import (
    "fmt"
    "application"
    "persistence"
)

func main() {
    // Inicializa Event Store
    eventStore := persistence.NewEventStore()

    // Cria o serviço da aplicação
    service := application.NewBankingService(eventStore)

    // Executa comandos
    err := service.CreateAccount("ACC001", "João Silva", 1000.00)
    if err != nil {
        fmt.Println("Erro ao criar conta:", err)
        return
    }

    err = service.Deposit("ACC001", 500.00)
    if err != nil {
        fmt.Println("Erro ao depositar:", err)
        return
    }

    err = service.Withdraw("ACC001", 200.00)
    if err != nil {
        fmt.Println("Erro ao sacar:", err)
        return
    }

    // Consulta o estado atual (via Read Model)
    account, err := service.GetAccountBalance("ACC001")
    if err != nil {
        fmt.Println("Erro ao consultar:", err)
        return
    }

    fmt.Printf("Conta: %s\n", account.AccountID)
    fmt.Printf("Titular: %s\n", account.Owner)
    fmt.Printf("Saldo: R$ %.2f\n", account.Balance)

    // Reconstrói a conta a partir do Event Store (simulando auditar histórico)
    rebuilt, _ := eventStore.RebuildAggregate("ACC001")
    fmt.Printf("Saldo reconstruído: R$ %.2f\n", rebuilt.Balance)
}

Padrões Avançados e Considerações

Snapshotting para Performance

Em sistemas com muitos eventos, replicar todos pode ser lento. Snapshots guardam o estado em pontos específicos:

package persistence

type Snapshot struct {
    AggregateID string
    Version     int
    State       interface{}
}

type EventStoreWithSnapshot struct {
    events    map[string][]domain.Event
    snapshots map[string]Snapshot
    mu        sync.RWMutex
}

func (es *EventStoreWithSnapshot) RebuildAggregateWithSnapshot(
    aggregateID string) (*domain.BankAccount, error) {

    es.mu.RLock()
    snapshot, hasSnapshot := es.snapshots[aggregateID]
    es.mu.RUnlock()

    var startVersion int
    account := &domain.BankAccount{}

    if hasSnapshot {
        account = snapshot.State.(*domain.BankAccount)
        startVersion = snapshot.Version
    }

    es.mu.RLock()
    events := es.events[aggregateID]
    es.mu.RUnlock()

    for i, event := range events {
        if i >= startVersion {
            account.ApplyEvent(event)
        }
    }

    return account, nil
}

func (es *EventStoreWithSnapshot) CreateSnapshot(
    aggregateID string, version int, account *domain.BankAccount) {

    es.mu.Lock()
    defer es.mu.Unlock()

    es.snapshots[aggregateID] = Snapshot{
        AggregateID: aggregateID,
        Version:     version,
        State:       account,
    }
}

Eventual Consistency e Sincronização

Em ambientes distribuídos, a read model pode estar atrasada. Use message brokers para disseminar eventos:

package messaging

type EventPublisher interface {
    Publish(event interface{}) error
}

type InMemoryEventBus struct {
    subscribers map[string][]func(interface{})
    mu          sync.RWMutex
}

func NewInMemoryEventBus() *InMemoryEventBus {
    return &InMemoryEventBus{
        subscribers: make(map[string][]func(interface{})),
    }
}

func (eb *InMemoryEventBus) Subscribe(eventType string, handler func(interface{})) {
    eb.mu.Lock()
    defer eb.mu.Unlock()
    eb.subscribers[eventType] = append(eb.subscribers[eventType], handler)
}

func (eb *InMemoryEventBus) Publish(eventType string, event interface{}) {
    eb.mu.RLock()
    handlers := eb.subscribers[eventType]
    eb.mu.RUnlock()

    for _, handler := range handlers {
        go handler(event)
    }
}

Conclusão

Três pontos fundamentais que você deve levar deste artigo: Primeiro, CQRS e Event Sourcing resolvem problemas reais de escalabilidade e auditoria, separando a lógica de escrita da leitura e permitindo modelos otimizados para cada lado. Segundo, a implementação prática requer disciplina arquitetural — o código deve manter agregados puros, eventos imutáveis e uma cadeia clara de responsabilidade. Terceiro, estas técnicas trazem complexidade operacional que vale a pena apenas em sistemas que realmente precisam de auditoria completa, alta escala de leitura ou reconstrução de estado histórico; para CRUD simples, você estará sobreenginearing.

Referências


Artigos relacionados