Go Admin

O que Todo Dev Deve Saber sobre Padrões de Concorrência em Go: Pipeline, Fan-out, Fan-in e Worker Pool Já leu

Introdução aos Padrões de Concorrência em Go Go foi projetada desde o início com concorrência em mente. Diferente de linguagens que adicionaram suporte a concorrência posteriormente, Go integra goroutines e canais como primitivas de primeira classe. Isso torna a linguagem naturalmente adequada para construir aplicações que processam múltiplos fluxos de trabalho simultaneamente, sem a complexidade típica de threads tradicionais. Os padrões que estudaremos neste artigo — Pipeline, Fan-out, Fan-in e Worker Pool — não são exclusivos de Go, mas ganham elegância e simplicidade quando implementados com goroutines e canais. Compreender esses padrões é essencial para qualquer desenvolvedor Go que queira construir sistemas escaláveis, eficientes e maintíveis. Vamos explorar cada um progressivamente, começando pelos conceitos fundamentais e evoluindo para aplicações práticas complexas. Pipeline: Processamento em Cadeia O Conceito Um pipeline é uma série de estágios de processamento conectados em sequência, onde a saída de um estágio é a entrada do próximo. Cada estágio executa uma transformação ou validação específica nos dados,

Introdução aos Padrões de Concorrência em Go

Go foi projetada desde o início com concorrência em mente. Diferente de linguagens que adicionaram suporte a concorrência posteriormente, Go integra goroutines e canais como primitivas de primeira classe. Isso torna a linguagem naturalmente adequada para construir aplicações que processam múltiplos fluxos de trabalho simultaneamente, sem a complexidade típica de threads tradicionais.

Os padrões que estudaremos neste artigo — Pipeline, Fan-out, Fan-in e Worker Pool — não são exclusivos de Go, mas ganham elegância e simplicidade quando implementados com goroutines e canais. Compreender esses padrões é essencial para qualquer desenvolvedor Go que queira construir sistemas escaláveis, eficientes e maintíveis. Vamos explorar cada um progressivamente, começando pelos conceitos fundamentais e evoluindo para aplicações práticas complexas.

Pipeline: Processamento em Cadeia

O Conceito

Um pipeline é uma série de estágios de processamento conectados em sequência, onde a saída de um estágio é a entrada do próximo. Cada estágio executa uma transformação ou validação específica nos dados, e as goroutines se comunicam através de canais. Esse padrão é particularmente útil quando você precisa processar dados através de múltiplas etapas sequenciais, mantendo a concorrência entre elas.

A beleza de um pipeline em Go é que você pode processar dados de forma contínua sem precisar acumular resultados intermediários na memória. Dados fluem naturalmente de um estágio para o próximo, permitindo processamento eficiente mesmo com grandes volumes.

Implementação Prática

package main

import (
    "fmt"
)

// Stage 1: Gera números
func generate(count int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 1; i <= count; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

// Stage 2: Multiplica cada número por 2
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// Stage 3: Adiciona 10 a cada resultado
func addTen(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n + 10
        }
        close(out)
    }()
    return out
}

func main() {
    // Conecta os estágios em um pipeline
    results := addTen(square(generate(5)))

    for result := range results {
        fmt.Println(result) // 11, 14, 19, 26, 35
    }
}

Neste exemplo, criamos um pipeline de três estágios. O estágio de geração produz números de 1 a 5. O estágio de quadrado eleva cada número ao quadrado. O estágio final adiciona 10. Observe como cada função retorna apenas um canal de leitura (<-chan), impedindo que estágios subsequentes escrevam nele — isso é uma prática defensiva importante.

Fan-out e Fan-in: Distribuição e Agregação

Fan-out: Distribuindo Trabalho

Fan-out é o padrão de distribuir uma entrada para múltiplos processadores que trabalham em paralelo. Use-o quando você tem um único fluxo de dados que precisa ser processado por vários workers independentemente. Isso acelera o processamento distribuindo a carga.

package main

import (
    "fmt"
    "sync"
)

// Distribui dados de um canal para múltiplos workers
func fanOut(in <-chan int, workers int) []<-chan int {
    channels := make([]<-chan int, workers)

    for i := 0; i < workers; i++ {
        ch := make(chan int)
        channels[i] = ch

        go func(out chan<- int) {
            for val := range in {
                out <- val
            }
            close(out)
        }(ch)
    }

    return channels
}

// Processa os dados (simulando trabalho)
func process(in <-chan int, id int) <-chan string {
    out := make(chan string)
    go func() {
        for val := range in {
            out <- fmt.Sprintf("Worker %d processou: %d", id, val)
        }
        close(out)
    }()
    return out
}

func main() {
    // Gera dados
    data := make(chan int)
    go func() {
        for i := 1; i <= 10; i++ {
            data <- i
        }
        close(data)
    }()

    // Fan-out: distribui para 3 workers
    workers := fanOut(data, 3)

    // Cada worker processa seus dados
    results := make([]<-chan string, 3)
    for i, worker := range workers {
        results[i] = process(worker, i+1)
    }
}

Fan-in: Agregando Resultados

Fan-in é o inverso: múltiplos canais de entrada convergem em um único canal de saída. Use este padrão quando vários workers independentes produzem resultados que precisam ser coletados e processados juntos.

package main

import (
    "fmt"
)

// Combina múltiplos canais em um único canal
func fanIn(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    // Lança uma goroutine para cada canal de entrada
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            for val := range c {
                out <- val
            }
            wg.Done()
        }(ch)
    }

    // Fecha o canal de saída quando todos os inputs terminarem
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func producer(id int, count int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 1; i <= count; i++ {
            out <- id*100 + i
        }
        close(out)
    }()
    return out
}

func main() {
    // Múltiplos produtores
    p1 := producer(1, 3)
    p2 := producer(2, 3)
    p3 := producer(3, 3)

    // Fan-in: combina todos em um canal
    results := fanIn(p1, p2, p3)

    for result := range results {
        fmt.Println(result)
    }
}

Note que usamos sync.WaitGroup para rastrear quando todos os canais de entrada foram processados. Isso garante que o canal de saída seja fechado apenas após todos os inputs terminarem, evitando deadlock.

Combinando Fan-out e Fan-in

A verdadeira potência emerge quando combinamos fan-out e fan-in. Fan-out distribui o trabalho, cada worker processa de forma independente, e fan-in recolhe os resultados.

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, in <-chan int) <-chan string {
    out := make(chan string)
    go func() {
        for val := range in {
            time.Sleep(time.Millisecond * 100) // Simula trabalho
            out <- fmt.Sprintf("Worker %d: %d^2 = %d", id, val, val*val)
        }
        close(out)
    }()
    return out
}

func main() {
    // Entrada: números de 1 a 10
    numbers := make(chan int)
    go func() {
        for i := 1; i <= 10; i++ {
            numbers <- i
        }
        close(numbers)
    }()

    // Fan-out: distribui para 3 workers
    numWorkers := 3
    workers := make([]<-chan string, numWorkers)

    for i := 0; i < numWorkers; i++ {
        workers[i] = worker(i+1, numbers)
    }

    // Fan-in: coleta resultados
    results := fanIn(workers...)

    for result := range results {
        fmt.Println(result)
    }
}

func fanIn(channels ...<-chan string) <-chan string {
    out := make(chan string)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan string) {
            for val := range c {
                out <- val
            }
            wg.Done()
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Worker Pool: Pool de Trabalho Reutilizável

O Conceito e Benefícios

Um worker pool é um conjunto fixo de goroutines que aguardam por tarefas em um canal compartilhado. Em vez de criar uma nova goroutine para cada tarefa (o que pode ser caro com milhares de tarefas), reutilizamos um número limitado de workers. Esse padrão é ideal quando você tem muitas tarefas pequenas, quer controlar a concorrência máxima, ou precisa de um limite superior no consumo de recursos.

A diferença crítica entre fan-out/fan-in e worker pool é a quantidade: fan-out tipicamente distribui a entrada para um número pequeno de workers, enquanto um pool recebe múltiplas tarefas e as distribui entre workers reutilizáveis. Worker pool é mais adequado para sistemas com taxa de chegada variável de trabalho.

Implementação Clássica

package main

import (
    "fmt"
    "sync"
)

// Task representa uma unidade de trabalho
type Task struct {
    ID    int
    Value int
}

// Result representa o resultado de uma tarefa
type Result struct {
    TaskID int
    Result int
    Error  error
}

// WorkerPool gerencia um conjunto de workers
type WorkerPool struct {
    tasks   chan Task
    results chan Result
    wg      sync.WaitGroup
}

// NewWorkerPool cria um novo pool com numWorkers goroutines
func NewWorkerPool(numWorkers int) *WorkerPool {
    return &WorkerPool{
        tasks:   make(chan Task, 10), // Buffer pequeno
        results: make(chan Result, 10),
    }
}

// Start inicia os workers
func (wp *WorkerPool) Start(numWorkers int) {
    for i := 0; i < numWorkers; i++ {
        wp.wg.Add(1)
        go wp.worker(i)
    }
}

// worker é executado por cada goroutine do pool
func (wp *WorkerPool) worker(id int) {
    defer wp.wg.Done()

    for task := range wp.tasks {
        // Processa a tarefa
        result := task.Value * task.Value
        wp.results <- Result{
            TaskID: task.ID,
            Result: result,
        }
    }
}

// Submit adiciona uma tarefa ao pool
func (wp *WorkerPool) Submit(task Task) {
    wp.tasks <- task
}

// Close fecha o pool e aguarda conclusão
func (wp *WorkerPool) Close() {
    close(wp.tasks)
    wp.wg.Wait()
    close(wp.results)
}

func main() {
    pool := NewWorkerPool(0)
    pool.Start(4) // 4 workers

    // Submete 20 tarefas
    go func() {
        for i := 1; i <= 20; i++ {
            pool.Submit(Task{ID: i, Value: i})
        }
        pool.Close()
    }()

    // Coleta resultados
    for result := range pool.results {
        fmt.Printf("Task %d: %d\n", result.TaskID, result.Result)
    }
}

Worker Pool Avançado com Contexto

Em aplicações reais, frequentemente precisamos cancelar tarefas, implementar timeouts ou parar o pool graciosamente. A abordagem abaixo usa context.Context:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID   int
    Dur  time.Duration
}

type JobResult struct {
    JobID int
    Value string
}

type Pool struct {
    jobs    chan Job
    results chan JobResult
    ctx     context.Context
    cancel  context.CancelFunc
    wg      sync.WaitGroup
}

func NewPool(ctx context.Context, numWorkers int) *Pool {
    ctxWithCancel, cancel := context.WithCancel(ctx)
    p := &Pool{
        jobs:    make(chan Job, 5),
        results: make(chan JobResult, 5),
        ctx:     ctxWithCancel,
        cancel:  cancel,
    }

    for i := 0; i < numWorkers; i++ {
        p.wg.Add(1)
        go p.worker(i + 1)
    }

    return p
}

func (p *Pool) worker(id int) {
    defer p.wg.Done()

    for {
        select {
        case <-p.ctx.Done():
            fmt.Printf("Worker %d terminando\n", id)
            return
        case job, ok := <-p.jobs:
            if !ok {
                return
            }

            // Simula processamento
            time.Sleep(job.Dur)
            p.results <- JobResult{
                JobID: job.ID,
                Value: fmt.Sprintf("Processado por worker %d", id),
            }
        }
    }
}

func (p *Pool) Submit(job Job) {
    select {
    case p.jobs <- job:
    case <-p.ctx.Done():
        fmt.Println("Pool foi cancelado")
    }
}

func (p *Pool) Shutdown() {
    close(p.jobs)
    p.wg.Wait()
    p.cancel()
    close(p.results)
}

func main() {
    ctx := context.Background()
    pool := NewPool(ctx, 3)

    // Submete tarefas
    for i := 1; i <= 10; i++ {
        pool.Submit(Job{ID: i, Dur: time.Millisecond * 500})
    }

    // Coleta alguns resultados e cancela
    for i := 0; i < 5; i++ {
        result := <-pool.results
        fmt.Printf("Job %d: %s\n", result.JobID, result.Value)
    }

    pool.Shutdown()

    // Coleta resultados restantes
    for result := range pool.results {
        fmt.Printf("Job %d: %s\n", result.JobID, result.Value)
    }
}

Nesta implementação, o context permite que o caller cancele todas as operações pendentes simultaneamente. Cada worker verifica o contexto antes de processar uma nova tarefa, garantindo um shutdown limpo.

Escolhendo o Padrão Certo

Pipeline

Use pipeline quando você tem dados fluindo através de uma série de transformações sequenciais. Cada estágio é independente e pode executar simultaneamente. Exemplo: ler arquivo → parsear → validar → salvar.

Fan-out / Fan-in

Use fan-out e fan-in quando uma entrada precisa ser processada por múltiplos workers em paralelo, e você precisa agregar os resultados depois. Exemplo: análise de log distribuída, processamento de imagem com múltiplos filtros, ou scraping web com múltiplos clients.

Worker Pool

Use worker pool quando você tem muitas tarefas chegando continuamente, e quer controlar o nível de concorrência. Exemplo: processador de fila de mensagens, servidor web com limite de conexões simultâneas, ou batching de requisições.

// Exemplo prático: Combinando padrões em um problema real
package main

import (
    "fmt"
    "sync"
)

// Simula obtenção de URLs de uma fonte
func getURLs() <-chan string {
    out := make(chan string)
    go func() {
        urls := []string{
            "http://example.com/1", "http://example.com/2",
            "http://example.com/3", "http://example.com/4",
            "http://example.com/5", "http://example.com/6",
        }
        for _, url := range urls {
            out <- url
        }
        close(out)
    }()
    return out
}

// Worker que "baixa" URLs
func downloader(id int, urls <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        for url := range urls {
            out <- fmt.Sprintf("Worker %d baixou %s", id, url)
        }
        close(out)
    }()
    return out
}

func mergeResults(channels ...<-chan string) <-chan string {
    out := make(chan string)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan string) {
            for val := range c {
                out <- val
            }
            wg.Done()
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    // Pipeline + Fan-out + Fan-in
    urls := getURLs()

    // Fan-out para 3 downloaders
    results := make([]<-chan string, 3)
    for i := 0; i < 3; i++ {
        results[i] = downloader(i+1, urls)
    }

    // Fan-in coleta resultados
    combined := mergeResults(results...)

    for result := range combined {
        fmt.Println(result)
    }
}

Conclusão

Estudamos quatro padrões fundamentais de concorrência em Go, cada um com seu propósito específico. O Pipeline permite processar dados através de múltiplos estágios sequenciais de forma eficiente, reutilizando goroutines para transformações diferentes. O Fan-out/Fan-in distribui uma entrada entre múltiplos processadores e depois agrega os resultados, oferecendo paralelismo transparente e elegante. O Worker Pool reutiliza um conjunto fixo de goroutines para processar muitas tarefas, controlando o consumo de recursos e evitando a explosão de goroutines.

A chave para dominar esses padrões é entender que eles não são isolados — frequentemente você combinará dois ou mais deles em uma solução real. Um sistema pode usar pipeline para estruturar o fluxo, fan-out para distribuição e um worker pool para escalar o processamento. Comece simples, teste seus canais com tipos bem definidos, e sempre considere o fechamento correto de canais e sincronização com sync.WaitGroup ou context.Context. A elegância de Go está em como esses primitivos simples (goroutines e canais) permitem construir sistemas sofisticados e maintíveis.

Referências


Artigos relacionados