Streams Avançados em Node.js: Transform, Duplex e Backpressure na Prática Já leu

Streams Avançados em Node.js: Transform, Duplex e Backpressure Streams são um dos pilares do Node.js para processar grandes volumes de dados de forma eficiente. Enquanto streams básicos (Readable e Writable) já resolvem muitos problemas, os padrões avançados — Transform, Duplex e o controle de backpressure — são essenciais para construir aplicações robustas e performáticas. Nesta aula, você aprenderá a dominar esses conceitos com exemplos práticos e imediatamente aplicáveis. Transform Streams: Transformando Dados em Tempo Real Um Transform stream é um stream que modifica os dados conforme passam por ele. Você implementa a lógica de transformação no método . É perfeito para casos como compressão, criptografia, parsing JSON em linhas ou conversão de formatos. Implementando um Transform Stream Note que recebe o chunk de dados, realiza a transformação e chama quando pronto. Você pode chamar múltiplas vezes se a transformação gerar vários chunks de saída. Um exemplo mais realista é um parser CSV que converte cada linha em JSON: Duplex Streams:

Streams Avançados em Node.js: Transform, Duplex e Backpressure

Streams são um dos pilares do Node.js para processar grandes volumes de dados de forma eficiente. Enquanto streams básicos (Readable e Writable) já resolvem muitos problemas, os padrões avançados — Transform, Duplex e o controle de backpressure — são essenciais para construir aplicações robustas e performáticas. Nesta aula, você aprenderá a dominar esses conceitos com exemplos práticos e imediatamente aplicáveis.

Transform Streams: Transformando Dados em Tempo Real

Um Transform stream é um stream que modifica os dados conforme passam por ele. Você implementa a lógica de transformação no método _transform(). É perfeito para casos como compressão, criptografia, parsing JSON em linhas ou conversão de formatos.

Implementando um Transform Stream

const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // chunk é um Buffer com os dados
    const transformed = chunk.toString().toUpperCase();
    this.push(transformed);
    callback(); // sinaliza que acabou a transformação
  }
}

// Uso
const fs = require('fs');
fs.createReadStream('input.txt')
  .pipe(new UppercaseTransform())
  .pipe(fs.createWriteStream('output.txt'));

Note que _transform() recebe o chunk de dados, realiza a transformação e chama callback() quando pronto. Você pode chamar this.push() múltiplas vezes se a transformação gerar vários chunks de saída. Um exemplo mais realista é um parser CSV que converte cada linha em JSON:

const { Transform } = require('stream');

class CSVtoJSON extends Transform {
  constructor(options = {}) {
    super(options);
    this.headers = null;
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');

    lines.forEach((line, index) => {
      if (index === 0 && !this.headers) {
        this.headers = line.split(',');
        return;
      }
      if (line.trim()) {
        const values = line.split(',');
        const obj = {};
        this.headers.forEach((header, i) => {
          obj[header] = values[i];
        });
        this.push(JSON.stringify(obj) + '\n');
      }
    });

    callback();
  }
}

// Teste
fs.createReadStream('dados.csv')
  .pipe(new CSVtoJSON())
  .pipe(fs.createWriteStream('dados.json'));

Duplex Streams: Leitura e Escrita Simultâneas

Um Duplex stream é aquele que funciona como Readable e Writable ao mesmo tempo — dados entram por um lado, saem por outro. Exemplos reais incluem conexões TCP, WebSockets e pipes bidirecionais. Você implementa _read() e _write() separadamente.

Criando um Duplex Stream

const { Duplex } = require('stream');
const net = require('net');

class EchoServer extends Duplex {
  constructor(options) {
    super(options);
    this.buffer = [];
  }

  _write(chunk, encoding, callback) {
    // Quando dados chegam, ecoamos de volta
    console.log('Recebido:', chunk.toString());
    this.buffer.push(chunk);
    callback();
  }

  _read(size) {
    // Quando o outro lado quer ler, enviamos dados armazenados
    if (this.buffer.length > 0) {
      this.push(this.buffer.shift());
    } else {
      this.push(null); // FIM do stream
    }
  }
}

// Teste manual
const echo = new EchoServer();
echo.write('Olá');
echo.write('Mundo');
echo.end();

echo.on('data', (chunk) => {
  console.log('Lido:', chunk.toString());
});

Um uso mais prático é um Duplex que funciona como um transformador bidirecional em uma conexão HTTP:

const { Duplex } = require('stream');

class BiDirectionalProcessor extends Duplex {
  _write(chunk, encoding, callback) {
    // Processa dados que chegam (escrita)
    const processed = chunk.toString().toUpperCase();
    console.log('→ Enviado ao cliente:', processed);
    callback();
  }

  _read(size) {
    // Simula dados sendo lidos do servidor
    if (!this.done) {
      this.push('Dados do servidor\n');
      this.done = true;
    } else {
      this.push(null);
    }
  }
}

const processor = new BiDirectionalProcessor();
processor.pipe(process.stdout);
processor.write('request do cliente');
processor.end();

Backpressure: O Controle de Fluxo Crítico

Backpressure ocorre quando o lado consumidor (Writable) não consegue processar dados tão rápido quanto o produtor (Readable) os envia. Se ignorado, acumula-se na memória. Node.js oferece mecanismos para detectar e resolver isso.

Detectando e Tratando Backpressure

Quando você chama stream.write(), ele retorna false se há backpressure. Além disso, o evento 'drain' é emitido quando o buffer interno foi esvaziado:

const fs = require('fs');

const source = fs.createReadStream('arquivo-grande.txt');
const destination = fs.createWriteStream('copia.txt');

source.on('data', (chunk) => {
  const continuar = destination.write(chunk);

  if (!continuar) {
    console.log('⚠️ Backpressure detectado!');
    source.pause(); // Pausa leitura
  }
});

destination.on('drain', () => {
  console.log('✓ Buffer esvaziado, retomando leitura');
  source.resume();
});

source.on('end', () => console.log('Concluído'));

A melhor prática, porém, é usar pipe() diretamente, que gerencia backpressure automaticamente:

// Automático e seguro — Node.js controla tudo
fs.createReadStream('grande.txt')
  .pipe(new UppercaseTransform())
  .pipe(fs.createWriteStream('resultado.txt'));

Se você implementar um Transform customizado, sempre respeite o retorno do this.push():

class SafeTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const transformed = chunk.toString().toUpperCase();

    // Se push() retorna false, há backpressure
    if (!this.push(transformed)) {
      console.log('⚠️ Backpressure no transform');
    }

    callback();
  }
}

Conclusão

Você aprendeu que Transform streams permitem modificar dados em pipeline, sendo fundamentais para processamento de grandes arquivos sem carregar tudo na memória. Duplex streams funcionam bidirecionalmente, ideais para protocolos de comunicação como HTTP/2 e WebSocket. E mais importante: backpressure é crítico — ignorá-lo causa vazamento de memória. Use sempre pipe() quando possível, ou implemente manualmente com pause()/resume() e o evento drain. Com esses três conceitos solidificados, você está pronto para construir aplicações escaláveis em Node.js.

Referências


Artigos relacionados