Node.js Streams 指南:Readable、Writable、Transform、Duplex、Pipeline、背压与性能
全面掌握 Node.js Streams:Readable、Writable、Transform、Duplex 和 PassThrough 流。学习 pipeline API、背压处理、对象模式、异步迭代器、HTTP 流式传输、文件流、Web Streams API 兼容性,以及 CSV 处理、日志解析和高性能数据转换的实战模式。
- Streams 以块为单位处理数据,内存占用恒定,与文件大小无关
- 优先使用 pipeline() 而非 pipe()——它自动处理错误传播和流清理
- 背压是关键概念:当 write() 返回 false 时暂停写入并等待 drain 事件
- 异步迭代器(for await...of)是消费 Readable 流的最简洁方式
- 对象模式让流处理结构化数据而非原始缓冲区
- Transform 流是构建数据处理管道的基础组件
- Web Streams API 在 Node.js 中可用,实现跨运行时兼容
1. Stream 基础概念
Node.js 中的 Stream 是处理流式数据的抽象接口。它们不会一次性将所有数据加载到内存中,而是以小块的方式逐步处理。这使得处理数 GB 甚至数 TB 的数据成为可能,而内存占用仅需几 MB。
| 流类型 | 描述 | 常见示例 |
|---|---|---|
Readable | 数据来源,可以读取数据 | fs.createReadStream, http.IncomingMessage, process.stdin |
Writable | 数据目标,可以写入数据 | fs.createWriteStream, http.ServerResponse, process.stdout |
Transform | 数据通过时进行修改 | zlib.createGzip, crypto.createCipher |
Duplex | 独立的可读和可写端 | net.Socket, WebSocket |
PassThrough | 不修改数据直接传递 | 测试、监控、流分支 |
2. 创建自定义流
Readable 流
自定义 Readable 流需要实现 _read() 方法。当消费者请求更多数据时,Node.js 调用此方法。使用 this.push() 发送数据块,push(null) 表示流结束。
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(max) {
super(); // default: Buffer mode
this.max = max;
this.current = 0;
}
_read() {
if (this.current <= this.max) {
this.push(String(this.current++) + '\n');
} else {
this.push(null); // signal end of stream
}
}
}
const counter = new CounterStream(5);
counter.pipe(process.stdout);
// Output: 0 1 2 3 4 5Writable 流
const { Writable } = require('stream');
class LogWriter extends Writable {
_write(chunk, encoding, callback) {
const line = chunk.toString().trim();
const timestamp = new Date().toISOString();
console.log(`[\${timestamp}] \${line}`);
callback(); // signal done, ready for next chunk
}
_final(callback) {
console.log('--- Log stream closed ---');
callback();
}
}
const logger = new LogWriter();
logger.write('Server started\n');
logger.end('Shutdown complete\n');Transform 流
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
_flush(callback) {
// Called once at end — use for final cleanup
this.push('\n--- END ---\n');
callback();
}
}
// Usage: stdin → uppercase → stdout
process.stdin
.pipe(new UpperCaseTransform())
.pipe(process.stdout);3. 管道与 Pipeline API
pipe() 是连接流的基本方法,但它有一个重大缺陷:不会自动传播错误。如果中间流出错,源流不会被销毁,导致内存泄漏。pipeline() 解决了这些问题。
pipe() vs pipeline()
| 特性 | pipe() | pipeline() |
|---|---|---|
| 错误传播 | 不自动传播 | 自动传播到所有流 |
| 流清理 | 出错时不销毁流 | 出错时自动销毁所有流 |
| 完成通知 | 需监听多个事件 | 回调/Promise |
| 推荐场景 | 简单原型验证 | 生产代码 |
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
// pipe() — errors not propagated
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
// If gzip errors, readStream keeps reading!
// pipeline() — production-ready
async function compressFile(src, dest) {
await pipeline(
fs.createReadStream(src),
zlib.createGzip(),
fs.createWriteStream(dest)
);
console.log('Compression complete');
}
compressFile('input.txt', 'input.txt.gz')
.catch(err => console.error('Pipeline failed:', err));4. 背压处理
背压发生在可写流无法以接收数据的速度处理数据时。如果忽略背压,数据会在内存中无限累积,最终导致进程崩溃。write() 方法在内部缓冲区满时返回 false——这是暂停写入的信号。
const fs = require('fs');
const readable = fs.createReadStream('huge-file.csv');
const writable = fs.createWriteStream('output.csv');
// Manual backpressure handling
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Buffer is full — pause reading
readable.pause();
writable.once('drain', () => {
// Buffer drained — resume reading
readable.resume();
});
}
});
readable.on('end', () => writable.end());
// BETTER: pipe() handles backpressure automatically
// readable.pipe(writable);5. 对象模式流
默认情况下,流操作 Buffer 或字符串数据。对象模式允许流处理任意 JavaScript 值——对象、数组、数字等。highWaterMark 在对象模式下计数对象数量而非字节数。
const { Transform, Readable } = require('stream');
// Transform that filters JS objects
const filterAdults = new Transform({
objectMode: true,
transform(user, encoding, callback) {
if (user.age >= 18) {
this.push(user);
}
callback();
}
});
const formatJSON = new Transform({
objectMode: true,
writableObjectMode: true,
readableObjectMode: false,
transform(user, encoding, callback) {
this.push(JSON.stringify(user) + '\n');
callback();
}
});
// Feed objects into the pipeline
const users = Readable.from([
{ name: 'Alice', age: 25 },
{ name: 'Bob', age: 16 },
{ name: 'Charlie', age: 30 }
]);
users.pipe(filterAdults).pipe(formatJSON).pipe(process.stdout);
// {"name":"Alice","age":25}
// {"name":"Charlie","age":30}6. 流事件与错误处理
每种流类型都会发出特定的事件。正确监听这些事件是编写健壮流代码的关键。未处理的 error 事件会导致进程崩溃。
| 事件 | 流类型 | 触发时机 |
|---|---|---|
| data | Readable | 有数据块可读时 |
| end | Readable | 没有更多数据可读时 |
| drain | Writable | 缓冲区排空可继续写入时 |
| finish | Writable | 所有数据已刷新到底层系统 |
| error | 所有类型 | 发生错误时 |
| close | 所有类型 | 流及其底层资源已关闭 |
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const fs = require('fs');
async function safeProcess() {
try {
await pipeline(
fs.createReadStream('data.json'),
new Transform({
transform(chunk, enc, cb) {
try {
const parsed = JSON.parse(chunk);
cb(null, JSON.stringify(parsed) + '\n');
} catch (e) {
cb(new Error('Invalid JSON: ' + e.message));
}
}
}),
fs.createWriteStream('output.jsonl')
);
console.log('Processing complete');
} catch (err) {
// All streams are automatically destroyed
console.error('Pipeline error:', err.message);
}
}7. 文件流式处理
文件流是 Node.js 流最常见的用例。fs.createReadStream() 和 fs.createWriteStream() 以块为单位处理文件,内存占用恒定。对于大文件,这与 readFile/writeFile 的差距是致命的。
const fs = require('fs');
const { pipeline } = require('stream/promises');
const zlib = require('zlib');
// Read a 10GB log file — only ~64KB in memory
async function processLargeFile() {
const input = fs.createReadStream('server.log', {
highWaterMark: 64 * 1024, // 64KB chunks
encoding: 'utf8'
});
let lineCount = 0;
let errorCount = 0;
for await (const chunk of input) {
const lines = chunk.split('\n');
lineCount += lines.length;
errorCount += lines.filter(
l => l.includes('ERROR')
).length;
}
console.log(`Lines: \${lineCount}, Errors: \${errorCount}`);
}
// Compress a file with streaming
async function compressFile(src, dest) {
await pipeline(
fs.createReadStream(src),
zlib.createGzip({ level: 9 }),
fs.createWriteStream(dest)
);
}8. HTTP 流式传输
Node.js 的 HTTP 请求和响应都是流。http.IncomingMessage 是 Readable 流,http.ServerResponse 是 Writable 流。利用这一点可以高效处理大文件上传/下载和请求代理。
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');
const server = http.createServer(async (req, res) => {
if (req.url === '/download') {
// Stream a large file to the client
res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Transfer-Encoding': 'chunked'
});
await pipeline(
fs.createReadStream('large-dataset.csv'),
res
);
}
if (req.url === '/upload' && req.method === 'POST') {
// Stream upload directly to disk
await pipeline(
req,
fs.createWriteStream('upload.bin')
);
res.end('Upload complete');
}
});
server.listen(3000);9. 异步迭代器与流
Readable 流实现了异步可迭代协议。使用 for await...of 循环消费流数据更简洁,且自动处理背压。这是现代 Node.js 中推荐的流消费方式。
const fs = require('fs');
const readline = require('readline');
// Read file line by line with async iterators
async function processLines(filePath) {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity
});
const stats = { total: 0, errors: 0, warnings: 0 };
for await (const line of rl) {
stats.total++;
if (line.includes('ERROR')) stats.errors++;
if (line.includes('WARN')) stats.warnings++;
}
return stats;
}
// Create a Readable from an async generator
const { Readable } = require('stream');
async function* generateData() {
for (let i = 0; i < 1000; i++) {
yield JSON.stringify({ id: i, ts: Date.now() }) + '\n';
}
}
const stream = Readable.from(generateData());
stream.pipe(process.stdout);10. 流组合模式
将多个 Transform 流组合成可复用的数据处理管道,可以简化复杂的数据处理逻辑。每个 Transform 承担单一职责,通过 pipeline 组合。
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const { createReadStream, createWriteStream } = require('fs');
// Reusable transforms
function parseCSVLine() {
let header = null;
return new Transform({
objectMode: true,
transform(line, enc, cb) {
const cols = line.toString().trim().split(',');
if (!header) { header = cols; return cb(); }
const obj = {};
header.forEach((h, i) => obj[h] = cols[i]);
cb(null, obj);
}
});
}
function filterBy(field, value) {
return new Transform({
objectMode: true,
transform(obj, enc, cb) {
if (obj[field] === value) this.push(obj);
cb();
}
});
}
function toJSON() {
return new Transform({
writableObjectMode: true,
readableObjectMode: false,
transform(obj, enc, cb) {
cb(null, JSON.stringify(obj) + '\n');
}
});
}
// Compose: CSV → parse → filter → JSON
await pipeline(
createReadStream('users.csv'),
parseCSVLine(),
filterBy('role', 'admin'),
toJSON(),
createWriteStream('admins.jsonl')
);11. 内存效率与性能
流的核心价值在于内存效率。以下对比展示了流与非流方式处理大文件的差异。
| 方法 | 1GB 文件内存 | 适用场景 |
|---|---|---|
| fs.readFileSync() | ~1GB+ | 仅小配置文件 |
| fs.readFile() | ~1GB+ | 小于 100MB 的文件 |
| createReadStream() | ~64KB | 任意大小文件 |
// Memory comparison: Buffer vs Stream
const fs = require('fs');
// BAD: loads entire file into memory
async function bufferApproach() {
const data = await fs.promises.readFile('big.csv', 'utf8');
const lines = data.split('\n'); // 2x memory!
return lines.filter(l => l.includes('ERROR'));
}
// GOOD: constant memory, processes line by line
async function streamApproach() {
const rl = require('readline').createInterface({
input: fs.createReadStream('big.csv')
});
const errors = [];
for await (const line of rl) {
if (line.includes('ERROR')) errors.push(line);
}
return errors;
}
// Performance tip: tune highWaterMark
// Default: 16KB for Readable, 16384 bytes
// Increase for sequential reads of large files
fs.createReadStream('data.bin', {
highWaterMark: 256 * 1024 // 256KB for throughput
});12. Web Streams API(Node.js 兼容)
Web Streams API(ReadableStream、WritableStream、TransformStream)是 WHATWG 标准,在浏览器、Deno、Cloudflare Workers 和 Node.js 中通用。Node.js 提供了转换方法实现互操作。
const { Readable } = require('stream');
// Convert Node stream → Web stream
const nodeReadable = Readable.from(['hello', ' ', 'world']);
const webReadable = Readable.toWeb(nodeReadable);
// Consume with Web Streams API
const reader = webReadable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
// Convert Web stream → Node stream
const webStream = new ReadableStream({
start(controller) {
controller.enqueue('data chunk 1');
controller.enqueue('data chunk 2');
controller.close();
}
});
const nodeStream = Readable.fromWeb(webStream);
nodeStream.pipe(process.stdout);13. 实战模式
CSV 处理管道
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const fs = require('fs');
// Split buffer chunks into individual lines
function lineSplitter() {
let buffer = '';
return new Transform({
transform(chunk, enc, cb) {
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop(); // keep partial line
for (const line of lines) {
if (line.trim()) this.push(line);
}
cb();
},
flush(cb) {
if (buffer.trim()) this.push(buffer);
cb();
}
});
}
async function processCSV(input, output) {
let header = null;
let count = 0;
await pipeline(
fs.createReadStream(input),
lineSplitter(),
new Transform({
objectMode: true,
transform(line, enc, cb) {
const cols = line.toString().split(',');
if (!header) { header = cols; return cb(); }
const row = {};
header.forEach((h, i) => row[h.trim()] = cols[i]);
count++;
cb(null, JSON.stringify(row) + '\n');
}
}),
fs.createWriteStream(output)
);
console.log(`Processed \${count} rows`);
}日志解析与聚合
const { Transform } = require('stream');
// Parse structured log lines into objects
function logParser() {
return new Transform({
objectMode: true,
transform(line, enc, cb) {
const match = line.toString().match(
/^\[(.*?)\]\s+(\w+)\s+(.*)/
);
if (match) {
this.push({
timestamp: new Date(match[1]),
level: match[2],
message: match[3]
});
}
cb();
}
});
}
// Aggregate counts by level
function aggregator() {
const counts = {};
return new Transform({
objectMode: true,
transform(entry, enc, cb) {
counts[entry.level] = (counts[entry.level] || 0) + 1;
cb();
},
flush(cb) {
this.push(JSON.stringify(counts, null, 2));
cb();
}
});
}Duplex 流示例
const { Duplex } = require('stream');
class MessageProtocol extends Duplex {
constructor() {
super();
this._buffer = [];
}
_write(chunk, encoding, callback) {
// Writable side: receive raw bytes, frame them
const msg = chunk.toString().trim();
const framed = Buffer.from(
JSON.stringify({ len: msg.length, data: msg }) + '\n'
);
this._buffer.push(framed);
callback();
}
_read(size) {
// Readable side: output framed messages
const item = this._buffer.shift();
if (item) {
this.push(item);
} else {
setTimeout(() => this._read(size), 10);
}
}
}总结
Node.js Streams 是处理大规模数据的核心工具。掌握四种流类型(Readable、Writable、Transform、Duplex)及其组合方式,使用 pipeline() 替代 pipe() 确保错误处理和资源清理,理解背压机制避免内存溢出,利用异步迭代器写出更简洁的代码。在需要跨运行时兼容性时,使用 Web Streams API 并通过 toWeb()/fromWeb() 在两种 API 之间转换。流式思维是 Node.js 高性能编程的基础——任何超过几 MB 的数据都应该考虑使用流处理。
常见问题
Node.js 的四种流类型是什么?
Node.js 有四种基本流类型:Readable(数据源,如 fs.createReadStream)、Writable(数据目标,如 fs.createWriteStream)、Transform(数据经过时修改,如 zlib.createGzip)和 Duplex(独立的可读可写端,如 net.Socket)。还有 PassThrough,一种不修改数据的 Transform。
什么是背压以及如何处理?
背压发生在可写流无法以接收数据的速度处理数据时。当 write() 返回 false 时需要暂停写入并等待 drain 事件。pipe() 和 pipeline() 自动处理背压。忽略背压会导致内存使用过多甚至崩溃。
pipe() 和 pipeline() 有什么区别?
pipe() 连接流但不自动传播错误或清理资源。pipeline() 自动传播错误到所有流、出错时销毁所有流、并支持回调或 Promise 完成通知。生产代码应始终使用 pipeline()。
异步迭代器如何与流配合使用?
Readable 流实现了异步可迭代协议,可以使用 for await...of 循环消费。自动处理背压,当循环通过 break 或 throw 退出时流会被自动销毁。
什么是对象模式?
默认流处理 Buffer 或字符串。对象模式(objectMode: true)允许处理任意 JavaScript 值。highWaterMark 计数对象数量而非字节。适用于解析后的 JSON 记录、数据库行或 CSV 行。
如何流式处理大文件?
使用 fs.createReadStream() 按块读取文件并通过 pipeline 传输到可写目标。无论文件多大,内存占用都是恒定的。永远不要对大文件使用 readFileSync() 或 readFile()。
如何处理流管道中的错误?
使用 pipeline() 函数,它将错误传播到所有流并在完成或出错时调用回调。使用 Promise 版本时用 try/catch 包裹。始终处理流错误以避免未捕获异常导致进程崩溃。
Web Streams API 与 Node.js 流有什么区别?
Web Streams API 是浏览器、Deno、Workers 和 Node.js 通用的 WHATWG 标准。Node.js 流功能更丰富(pipe()、pipeline()、更好的背压),Web Streams 更具移植性。Node.js 提供 Readable.toWeb() 和 Readable.fromWeb() 实现转换。