Rust Admin

O que Todo Dev Deve Saber sobre Streams Assíncronos e Concorrência Avançada com Tokio Já leu

Introdução ao Tokio: Fundações de Assincronismo em Rust Tokio é o runtime assíncrono mais maduro e utilizado em Rust, baseado no modelo de trabalho com Tasks e um scheduler eficiente. Diferente de linguagens como JavaScript que possuem uma única event loop, Tokio usa múltiplas threads de trabalho por padrão, permitindo verdadeiro paralelismo em sistemas multicore. Antes de avançarmos para streams e concorrência, você precisa compreender que Tokio não cria threads para cada tarefa — em vez disso, agrupa múltiplas tasks em threads de trabalho através de work-stealing. A chave para dominar Tokio é entender que código assíncrono em Rust é apenas syntactic sugar para máquinas de estado compiladas. Quando você escreve , o compilador transforma isso em uma Future que pode ser pausada e retomada. O runtime Tokio gerencia quando essas Futures são executadas, sempre respeitando dependências de I/O e disponibilidade de recursos. Streams Assíncronos Avançados O que são Streams e por que importam Um Stream assíncrono é essencialmente um

Introdução ao Tokio: Fundações de Assincronismo em Rust

Tokio é o runtime assíncrono mais maduro e utilizado em Rust, baseado no modelo de trabalho com Tasks e um scheduler eficiente. Diferente de linguagens como JavaScript que possuem uma única event loop, Tokio usa múltiplas threads de trabalho por padrão, permitindo verdadeiro paralelismo em sistemas multicore. Antes de avançarmos para streams e concorrência, você precisa compreender que Tokio não cria threads para cada tarefa — em vez disso, agrupa múltiplas tasks em threads de trabalho através de work-stealing.

A chave para dominar Tokio é entender que código assíncrono em Rust é apenas syntactic sugar para máquinas de estado compiladas. Quando você escreve async fn, o compilador transforma isso em uma Future que pode ser pausada e retomada. O runtime Tokio gerencia quando essas Futures são executadas, sempre respeitando dependências de I/O e disponibilidade de recursos.

Streams Assíncronos Avançados

O que são Streams e por que importam

Um Stream assíncrono é essencialmente um Iterator que produz valores de forma assíncrona. Enquanto um Iterator tradicional é síncrono e bloqueia até produzir o próximo item, um Stream pode "aguardar" I/O ou outras operações sem bloquear a thread. A trait Stream do crate futures é o padrão, mas Tokio também oferece tokio_stream com implementações otimizadas.

use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
    // Criando um stream que emite a cada 100ms
    let mut stream = interval(Duration::from_millis(100));

    // Consumindo o stream com take() e filter()
    let mut contador = 0;
    while let Some(_) = stream.next().await {
        contador += 1;
        println!("Tick: {}", contador);
        if contador >= 5 {
            break;
        }
    }
}

Neste exemplo, interval() cria um stream que emite periodicamente. O StreamExt trait nos dá acesso a combinadores como take(), filter(), map() — exatamente como Iterators, mas assincronamente. A grande diferença é que cada .next().await respeita o tempo de espera sem bloquear outras tasks.

Composição de Streams com Combinadores

Streams brilham quando você precisa transformar e compor fluxos de dados. Imagine um servidor que precisa processar requisições vindas de múltiplas fontes simultaneamente:

use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(32);

    // Spawn tasks que enviam dados
    tokio::spawn(async move {
        for i in 0..10 {
            let _ = tx.send(i).await;
            tokio::time::sleep(Duration::from_millis(50)).await;
        }
    });

    // Converter receiver em stream
    let mut stream = ReceiverStream::new(rx);

    // Aplicar transformações
    let mut processado = stream
        .map(|x| x * 2)
        .filter(|x| x % 3 != 0)
        .take(5);

    while let Some(valor) = processado.next().await {
        println!("Processado: {}", valor);
    }
}

Aqui utilizamos channels do Tokio — mpsc (multi-producer, single-consumer) — e convertemos o receiver em um Stream via ReceiverStream. Os combinadores map e filter funcionam assincronamente, permitindo que transformações sejam aplicadas sem bloquear. O segredo é que cada item passa por toda a pipeline de transformação antes do próximo ser consumido.

Concorrência Avançada com Tokio

Tasks, Spawning e Sincronização

Tokio permite criar tasks independentes com tokio::spawn(). Diferente de threads do SO, tasks são extremamente leves — você pode ter milhares delas. O verdadeiro desafio em concorrência avançada é coordenar essas tasks sem deadlocks ou race conditions.

use tokio::sync::{Mutex, RwLock, Barrier};
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let contador = Arc::new(Mutex::new(0));
    let barrier = Arc::new(Barrier::new(3));

    let mut handles = vec![];

    for i in 0..3 {
        let contador = Arc::clone(&contador);
        let barrier = Arc::clone(&barrier);

        let handle = tokio::spawn(async move {
            // Sincronizar o início das 3 tasks
            barrier.wait().await;

            let mut num = contador.lock().await;
            *num += i;
            println!("Task {} incrementou para: {}", i, *num);
        });

        handles.push(handle);
    }

    for handle in handles {
        let _ = handle.await;
    }

    println!("Valor final: {}", *contador.lock().await);
}

Aqui demonstro sincronização correta: Barrier força tasks a aguardar até que todas cheguem ao ponto de sincronização, Mutex protege dados compartilhados. Note que usamos Arc (Atomic Reference Counting) para compartilhar ownership — essencial em Rust. Mutex::lock().await é não-bloqueante, diferente do std::sync::Mutex que bloquearia a thread.

Select e Timeouts para Operações Concorrentes

Em cenários reais, você frequentemente precisa executar múltiplas operações concorrentes e reagir ao primeiro resultado ou a timeouts. Tokio oferece tokio::select! para isso:

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let mut intervalo = tokio::time::interval(Duration::from_millis(100));

    loop {
        tokio::select! {
            _ = intervalo.tick() => {
                println!("Tick!");
            }
            _ = async {
                sleep(Duration::from_secs(2)).await
            } => {
                println!("Timeout de 2 segundos atingido!");
                break;
            }
        }
    }
}

select! é uma macro que monitora múltiplas Futures. Assim que uma delas completa, o código correspondente é executado. Isso é fundamental para timeouts, operações de fallback e tratamento de múltiplos eventos simultaneamente. Diferente de join!, que aguarda todas as Futures, select! é não-bloqueante e reativo.

Padrões de Erro Comuns e Otimização

Iniciantes frequentemente cometem erros ao misturar código síncrono com assíncrono. Nunca use block_on() dentro de uma task — isso paralisa o scheduler. Se você precisa chamar uma função bloqueante (como I/O de arquivo), use tokio::task::block_in_place() ou tokio::task::spawn_blocking():

#[tokio::main]
async fn main() {
    let resultado = tokio::task::spawn_blocking(|| {
        // Código síncrono pesado
        std::thread::sleep(std::time::Duration::from_secs(1));
        42
    }).await.unwrap();

    println!("Resultado: {}", resultado);
}

Para otimização, sempre considere o tamanho do channel buffer (padrão 32 é conservador), ajuste o número de threads worker conforme sua carga, e use tokio_util::codec para protocolar streams de bytes eficientemente.

Conclusão

Dominando Tokio, você aprendeu que: (1) Streams assíncronos são Iterators que respeitam I/O não-bloqueante, permitindo composição elegante de pipelines de dados através de combinadores; (2) Concorrência em Tokio exige sincronização explícita com Mutex/RwLock assincronos, nunca síncrono, e select! é seu aliado para operações reativas; (3) O paradigma de task-based concurrency elimina a complexidade de threads tradicionais, mas exige compreensão de Futures e ownership para ser efetivo.

Referências


Artigos relacionados