DevToolBox免费
博客

Node.js Streams 完全指南:Readable、Writable、Transform 与 Pipeline

13 分钟作者 DevToolBox

Node.js Streams 指南:Readable、Writable、Transform、Duplex、Pipeline、背压与性能

全面掌握 Node.js Streams:Readable、Writable、Transform、Duplex 和 PassThrough 流。学习 pipeline API、背压处理、对象模式、异步迭代器、HTTP 流式传输、文件流、Web Streams API 兼容性,以及 CSV 处理、日志解析和高性能数据转换的实战模式。

TL;DRNode.js Streams 以块为单位处理数据,而非将所有内容加载到内存中,这对处理大文件、HTTP 请求和实时数据至关重要。四种流类型(Readable、Writable、Transform、Duplex)通过 pipe() 或 pipeline() API 组合,后者自动处理错误传播和清理。使用 pipe 时背压自动处理,但 write() 需要手动处理 drain 事件。对象模式流处理 JavaScript 对象而非缓冲区。异步迭代器(for await...of)提供最简洁的消费模式。Web Streams API 在 Node.js 中可用,支持跨平台兼容。
核心要点
  • Streams 以块为单位处理数据,内存占用恒定,与文件大小无关
  • 优先使用 pipeline() 而非 pipe()——它自动处理错误传播和流清理
  • 背压是关键概念:当 write() 返回 false 时暂停写入并等待 drain 事件
  • 异步迭代器(for await...of)是消费 Readable 流的最简洁方式
  • 对象模式让流处理结构化数据而非原始缓冲区
  • Transform 流是构建数据处理管道的基础组件
  • Web Streams API 在 Node.js 中可用,实现跨运行时兼容

1. Stream 基础概念

Node.js 中的 Stream 是处理流式数据的抽象接口。它们不会一次性将所有数据加载到内存中,而是以小块的方式逐步处理。这使得处理数 GB 甚至数 TB 的数据成为可能,而内存占用仅需几 MB。

流类型描述常见示例
Readable数据来源,可以读取数据fs.createReadStream, http.IncomingMessage, process.stdin
Writable数据目标,可以写入数据fs.createWriteStream, http.ServerResponse, process.stdout
Transform数据通过时进行修改zlib.createGzip, crypto.createCipher
Duplex独立的可读和可写端net.Socket, WebSocket
PassThrough不修改数据直接传递测试、监控、流分支

2. 创建自定义流

Readable 流

自定义 Readable 流需要实现 _read() 方法。当消费者请求更多数据时,Node.js 调用此方法。使用 this.push() 发送数据块,push(null) 表示流结束。

const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(max) {
    super(); // default: Buffer mode
    this.max = max;
    this.current = 0;
  }

  _read() {
    if (this.current <= this.max) {
      this.push(String(this.current++) + '\n');
    } else {
      this.push(null); // signal end of stream
    }
  }
}

const counter = new CounterStream(5);
counter.pipe(process.stdout);
// Output: 0 1 2 3 4 5

Writable 流

const { Writable } = require('stream');

class LogWriter extends Writable {
  _write(chunk, encoding, callback) {
    const line = chunk.toString().trim();
    const timestamp = new Date().toISOString();
    console.log(`[\${timestamp}] \${line}`);
    callback(); // signal done, ready for next chunk
  }

  _final(callback) {
    console.log('--- Log stream closed ---');
    callback();
  }
}

const logger = new LogWriter();
logger.write('Server started\n');
logger.end('Shutdown complete\n');

Transform 流

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }

  _flush(callback) {
    // Called once at end — use for final cleanup
    this.push('\n--- END ---\n');
    callback();
  }
}

// Usage: stdin → uppercase → stdout
process.stdin
  .pipe(new UpperCaseTransform())
  .pipe(process.stdout);

3. 管道与 Pipeline API

pipe() 是连接流的基本方法,但它有一个重大缺陷:不会自动传播错误。如果中间流出错,源流不会被销毁,导致内存泄漏。pipeline() 解决了这些问题。

pipe() vs pipeline()

特性pipe()pipeline()
错误传播不自动传播自动传播到所有流
流清理出错时不销毁流出错时自动销毁所有流
完成通知需监听多个事件回调/Promise
推荐场景简单原型验证生产代码
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

// pipe() — errors not propagated
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'));
// If gzip errors, readStream keeps reading!

// pipeline() — production-ready
async function compressFile(src, dest) {
  await pipeline(
    fs.createReadStream(src),
    zlib.createGzip(),
    fs.createWriteStream(dest)
  );
  console.log('Compression complete');
}

compressFile('input.txt', 'input.txt.gz')
  .catch(err => console.error('Pipeline failed:', err));

4. 背压处理

背压发生在可写流无法以接收数据的速度处理数据时。如果忽略背压,数据会在内存中无限累积,最终导致进程崩溃。write() 方法在内部缓冲区满时返回 false——这是暂停写入的信号。

const fs = require('fs');

const readable = fs.createReadStream('huge-file.csv');
const writable = fs.createWriteStream('output.csv');

// Manual backpressure handling
readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    // Buffer is full — pause reading
    readable.pause();
    writable.once('drain', () => {
      // Buffer drained — resume reading
      readable.resume();
    });
  }
});

readable.on('end', () => writable.end());

// BETTER: pipe() handles backpressure automatically
// readable.pipe(writable);
提示: 永远不要忽略 write() 的返回值。如果持续向返回 false 的流写入数据,highWaterMark 缓冲区会溢出,内存使用量将急剧增加。pipe() 和 pipeline() 自动处理背压——在生产代码中优先使用它们。

5. 对象模式流

默认情况下,流操作 Buffer 或字符串数据。对象模式允许流处理任意 JavaScript 值——对象、数组、数字等。highWaterMark 在对象模式下计数对象数量而非字节数。

const { Transform, Readable } = require('stream');

// Transform that filters JS objects
const filterAdults = new Transform({
  objectMode: true,
  transform(user, encoding, callback) {
    if (user.age >= 18) {
      this.push(user);
    }
    callback();
  }
});

const formatJSON = new Transform({
  objectMode: true,
  writableObjectMode: true,
  readableObjectMode: false,
  transform(user, encoding, callback) {
    this.push(JSON.stringify(user) + '\n');
    callback();
  }
});

// Feed objects into the pipeline
const users = Readable.from([
  { name: 'Alice', age: 25 },
  { name: 'Bob', age: 16 },
  { name: 'Charlie', age: 30 }
]);

users.pipe(filterAdults).pipe(formatJSON).pipe(process.stdout);
// {"name":"Alice","age":25}
// {"name":"Charlie","age":30}

6. 流事件与错误处理

每种流类型都会发出特定的事件。正确监听这些事件是编写健壮流代码的关键。未处理的 error 事件会导致进程崩溃。

事件流类型触发时机
dataReadable有数据块可读时
endReadable没有更多数据可读时
drainWritable缓冲区排空可继续写入时
finishWritable所有数据已刷新到底层系统
error所有类型发生错误时
close所有类型流及其底层资源已关闭
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const fs = require('fs');

async function safeProcess() {
  try {
    await pipeline(
      fs.createReadStream('data.json'),
      new Transform({
        transform(chunk, enc, cb) {
          try {
            const parsed = JSON.parse(chunk);
            cb(null, JSON.stringify(parsed) + '\n');
          } catch (e) {
            cb(new Error('Invalid JSON: ' + e.message));
          }
        }
      }),
      fs.createWriteStream('output.jsonl')
    );
    console.log('Processing complete');
  } catch (err) {
    // All streams are automatically destroyed
    console.error('Pipeline error:', err.message);
  }
}

7. 文件流式处理

文件流是 Node.js 流最常见的用例。fs.createReadStream() 和 fs.createWriteStream() 以块为单位处理文件,内存占用恒定。对于大文件,这与 readFile/writeFile 的差距是致命的。

const fs = require('fs');
const { pipeline } = require('stream/promises');
const zlib = require('zlib');

// Read a 10GB log file — only ~64KB in memory
async function processLargeFile() {
  const input = fs.createReadStream('server.log', {
    highWaterMark: 64 * 1024,  // 64KB chunks
    encoding: 'utf8'
  });

  let lineCount = 0;
  let errorCount = 0;

  for await (const chunk of input) {
    const lines = chunk.split('\n');
    lineCount += lines.length;
    errorCount += lines.filter(
      l => l.includes('ERROR')
    ).length;
  }

  console.log(`Lines: \${lineCount}, Errors: \${errorCount}`);
}

// Compress a file with streaming
async function compressFile(src, dest) {
  await pipeline(
    fs.createReadStream(src),
    zlib.createGzip({ level: 9 }),
    fs.createWriteStream(dest)
  );
}

8. HTTP 流式传输

Node.js 的 HTTP 请求和响应都是流。http.IncomingMessage 是 Readable 流,http.ServerResponse 是 Writable 流。利用这一点可以高效处理大文件上传/下载和请求代理。

const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');

const server = http.createServer(async (req, res) => {
  if (req.url === '/download') {
    // Stream a large file to the client
    res.writeHead(200, {
      'Content-Type': 'application/octet-stream',
      'Transfer-Encoding': 'chunked'
    });
    await pipeline(
      fs.createReadStream('large-dataset.csv'),
      res
    );
  }

  if (req.url === '/upload' && req.method === 'POST') {
    // Stream upload directly to disk
    await pipeline(
      req,
      fs.createWriteStream('upload.bin')
    );
    res.end('Upload complete');
  }
});

server.listen(3000);

9. 异步迭代器与流

Readable 流实现了异步可迭代协议。使用 for await...of 循环消费流数据更简洁,且自动处理背压。这是现代 Node.js 中推荐的流消费方式。

const fs = require('fs');
const readline = require('readline');

// Read file line by line with async iterators
async function processLines(filePath) {
  const rl = readline.createInterface({
    input: fs.createReadStream(filePath),
    crlfDelay: Infinity
  });

  const stats = { total: 0, errors: 0, warnings: 0 };

  for await (const line of rl) {
    stats.total++;
    if (line.includes('ERROR')) stats.errors++;
    if (line.includes('WARN')) stats.warnings++;
  }

  return stats;
}

// Create a Readable from an async generator
const { Readable } = require('stream');

async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    yield JSON.stringify({ id: i, ts: Date.now() }) + '\n';
  }
}

const stream = Readable.from(generateData());
stream.pipe(process.stdout);

10. 流组合模式

将多个 Transform 流组合成可复用的数据处理管道,可以简化复杂的数据处理逻辑。每个 Transform 承担单一职责,通过 pipeline 组合。

const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const { createReadStream, createWriteStream } = require('fs');

// Reusable transforms
function parseCSVLine() {
  let header = null;
  return new Transform({
    objectMode: true,
    transform(line, enc, cb) {
      const cols = line.toString().trim().split(',');
      if (!header) { header = cols; return cb(); }
      const obj = {};
      header.forEach((h, i) => obj[h] = cols[i]);
      cb(null, obj);
    }
  });
}

function filterBy(field, value) {
  return new Transform({
    objectMode: true,
    transform(obj, enc, cb) {
      if (obj[field] === value) this.push(obj);
      cb();
    }
  });
}

function toJSON() {
  return new Transform({
    writableObjectMode: true,
    readableObjectMode: false,
    transform(obj, enc, cb) {
      cb(null, JSON.stringify(obj) + '\n');
    }
  });
}

// Compose: CSV → parse → filter → JSON
await pipeline(
  createReadStream('users.csv'),
  parseCSVLine(),
  filterBy('role', 'admin'),
  toJSON(),
  createWriteStream('admins.jsonl')
);

11. 内存效率与性能

流的核心价值在于内存效率。以下对比展示了流与非流方式处理大文件的差异。

方法1GB 文件内存适用场景
fs.readFileSync()~1GB+仅小配置文件
fs.readFile()~1GB+小于 100MB 的文件
createReadStream()~64KB任意大小文件
// Memory comparison: Buffer vs Stream
const fs = require('fs');

// BAD: loads entire file into memory
async function bufferApproach() {
  const data = await fs.promises.readFile('big.csv', 'utf8');
  const lines = data.split('\n'); // 2x memory!
  return lines.filter(l => l.includes('ERROR'));
}

// GOOD: constant memory, processes line by line
async function streamApproach() {
  const rl = require('readline').createInterface({
    input: fs.createReadStream('big.csv')
  });
  const errors = [];
  for await (const line of rl) {
    if (line.includes('ERROR')) errors.push(line);
  }
  return errors;
}

// Performance tip: tune highWaterMark
// Default: 16KB for Readable, 16384 bytes
// Increase for sequential reads of large files
fs.createReadStream('data.bin', {
  highWaterMark: 256 * 1024 // 256KB for throughput
});

12. Web Streams API(Node.js 兼容)

Web Streams API(ReadableStream、WritableStream、TransformStream)是 WHATWG 标准,在浏览器、Deno、Cloudflare Workers 和 Node.js 中通用。Node.js 提供了转换方法实现互操作。

const { Readable } = require('stream');

// Convert Node stream → Web stream
const nodeReadable = Readable.from(['hello', ' ', 'world']);
const webReadable = Readable.toWeb(nodeReadable);

// Consume with Web Streams API
const reader = webReadable.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log(value);
}

// Convert Web stream → Node stream
const webStream = new ReadableStream({
  start(controller) {
    controller.enqueue('data chunk 1');
    controller.enqueue('data chunk 2');
    controller.close();
  }
});

const nodeStream = Readable.fromWeb(webStream);
nodeStream.pipe(process.stdout);

13. 实战模式

CSV 处理管道

const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const fs = require('fs');

// Split buffer chunks into individual lines
function lineSplitter() {
  let buffer = '';
  return new Transform({
    transform(chunk, enc, cb) {
      buffer += chunk.toString();
      const lines = buffer.split('\n');
      buffer = lines.pop(); // keep partial line
      for (const line of lines) {
        if (line.trim()) this.push(line);
      }
      cb();
    },
    flush(cb) {
      if (buffer.trim()) this.push(buffer);
      cb();
    }
  });
}

async function processCSV(input, output) {
  let header = null;
  let count = 0;

  await pipeline(
    fs.createReadStream(input),
    lineSplitter(),
    new Transform({
      objectMode: true,
      transform(line, enc, cb) {
        const cols = line.toString().split(',');
        if (!header) { header = cols; return cb(); }
        const row = {};
        header.forEach((h, i) => row[h.trim()] = cols[i]);
        count++;
        cb(null, JSON.stringify(row) + '\n');
      }
    }),
    fs.createWriteStream(output)
  );

  console.log(`Processed \${count} rows`);
}

日志解析与聚合

const { Transform } = require('stream');

// Parse structured log lines into objects
function logParser() {
  return new Transform({
    objectMode: true,
    transform(line, enc, cb) {
      const match = line.toString().match(
        /^\[(.*?)\]\s+(\w+)\s+(.*)/
      );
      if (match) {
        this.push({
          timestamp: new Date(match[1]),
          level: match[2],
          message: match[3]
        });
      }
      cb();
    }
  });
}

// Aggregate counts by level
function aggregator() {
  const counts = {};
  return new Transform({
    objectMode: true,
    transform(entry, enc, cb) {
      counts[entry.level] = (counts[entry.level] || 0) + 1;
      cb();
    },
    flush(cb) {
      this.push(JSON.stringify(counts, null, 2));
      cb();
    }
  });
}

Duplex 流示例

const { Duplex } = require('stream');

class MessageProtocol extends Duplex {
  constructor() {
    super();
    this._buffer = [];
  }

  _write(chunk, encoding, callback) {
    // Writable side: receive raw bytes, frame them
    const msg = chunk.toString().trim();
    const framed = Buffer.from(
      JSON.stringify({ len: msg.length, data: msg }) + '\n'
    );
    this._buffer.push(framed);
    callback();
  }

  _read(size) {
    // Readable side: output framed messages
    const item = this._buffer.shift();
    if (item) {
      this.push(item);
    } else {
      setTimeout(() => this._read(size), 10);
    }
  }
}

总结

Node.js Streams 是处理大规模数据的核心工具。掌握四种流类型(Readable、Writable、Transform、Duplex)及其组合方式,使用 pipeline() 替代 pipe() 确保错误处理和资源清理,理解背压机制避免内存溢出,利用异步迭代器写出更简洁的代码。在需要跨运行时兼容性时,使用 Web Streams API 并通过 toWeb()/fromWeb() 在两种 API 之间转换。流式思维是 Node.js 高性能编程的基础——任何超过几 MB 的数据都应该考虑使用流处理。

常见问题

Node.js 的四种流类型是什么?

Node.js 有四种基本流类型:Readable(数据源,如 fs.createReadStream)、Writable(数据目标,如 fs.createWriteStream)、Transform(数据经过时修改,如 zlib.createGzip)和 Duplex(独立的可读可写端,如 net.Socket)。还有 PassThrough,一种不修改数据的 Transform。

什么是背压以及如何处理?

背压发生在可写流无法以接收数据的速度处理数据时。当 write() 返回 false 时需要暂停写入并等待 drain 事件。pipe() 和 pipeline() 自动处理背压。忽略背压会导致内存使用过多甚至崩溃。

pipe() 和 pipeline() 有什么区别?

pipe() 连接流但不自动传播错误或清理资源。pipeline() 自动传播错误到所有流、出错时销毁所有流、并支持回调或 Promise 完成通知。生产代码应始终使用 pipeline()。

异步迭代器如何与流配合使用?

Readable 流实现了异步可迭代协议,可以使用 for await...of 循环消费。自动处理背压,当循环通过 break 或 throw 退出时流会被自动销毁。

什么是对象模式?

默认流处理 Buffer 或字符串。对象模式(objectMode: true)允许处理任意 JavaScript 值。highWaterMark 计数对象数量而非字节。适用于解析后的 JSON 记录、数据库行或 CSV 行。

如何流式处理大文件?

使用 fs.createReadStream() 按块读取文件并通过 pipeline 传输到可写目标。无论文件多大,内存占用都是恒定的。永远不要对大文件使用 readFileSync() 或 readFile()。

如何处理流管道中的错误?

使用 pipeline() 函数,它将错误传播到所有流并在完成或出错时调用回调。使用 Promise 版本时用 try/catch 包裹。始终处理流错误以避免未捕获异常导致进程崩溃。

Web Streams API 与 Node.js 流有什么区别?

Web Streams API 是浏览器、Deno、Workers 和 Node.js 通用的 WHATWG 标准。Node.js 流功能更丰富(pipe()、pipeline()、更好的背压),Web Streams 更具移植性。Node.js 提供 Readable.toWeb() 和 Readable.fromWeb() 实现转换。

𝕏 Twitterin LinkedIn
这篇文章有帮助吗?

保持更新

获取每周开发技巧和新工具通知。

无垃圾邮件,随时退订。

试试这些相关工具

{ }JSON Formatter±Text Diff CheckerB64Base64 Encoder/Decoder

相关文章

Docker 网络指南:Bridge、Host、Overlay 网络详解

Docker 网络完整指南:bridge、host、overlay 和 macvlan 网络,docker-compose 网络配置和故障排查。

API 限流指南:策略、算法与实现

API 限流完整指南。学习令牌桶、滑动窗口、漏桶算法及代码示例。包含 Express.js 中间件、Redis 分布式限流和最佳实践。