Node.js Stream(流)(手把手讲解)
💡一则或许对你有用的小广告
欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论
- 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于
Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...
,点击查看项目介绍 ;演示链接: http://116.62.199.48:7070 ;- 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;
截止目前, 星球 内专栏累计输出 90w+ 字,讲解图 3441+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 3100+ 小伙伴加入学习 ,欢迎点击围观
前言:为什么需要学习 Node.js Stream(流)?
在现代 Web 开发中,处理大规模数据(如文件传输、实时日志、音视频流)是常见需求。Node.js 作为基于事件驱动的异步运行时环境,其核心优势之一正是高效处理 I/O 操作。而 Node.js Stream(流) 正是实现这一目标的关键机制。
想象你正在用水管传输水:如果每次必须等整桶水装满才能使用,效率会非常低。而水管允许水持续流动,随用随取。Node.js Stream 的工作原理与此类似——它允许数据以"流式"传输,无需一次性加载全部数据到内存中,从而实现高效处理。这对于处理 GB 级文件或实时数据流的应用场景至关重要。
本文将从零开始,通过循序渐进的方式,带您理解流的核心概念、实现原理,并通过实战案例掌握其使用技巧。
基础概念:流的四大核心类型
Node.js 将流分为四种基本类型,它们像乐高积木般组合,构建出复杂的流处理系统:
类型 | 特性 | 典型用途 |
---|---|---|
Readable | 只读 | 文件读取、HTTP 响应 |
Writable | 只写 | 文件写入、HTTP 请求体 |
Duplex | 可读可写 | TCP 连接、双向通信 |
Transform | 处理数据流 | 实时数据转换、压缩/解压 |
形象比喻:
Readable 是"水龙头",持续提供数据;Writable 是"排水管",接收数据;Duplex 是"双向管道",既能进水又能出水;Transform 则像"净水器",在传输过程中对水流进行过滤或净化。
流的工作原理:数据流动的"管道系统"
1. 流的事件驱动模型
流基于事件驱动机制,核心事件包括:
data
:每当有数据可读时触发end
:数据传输完成时触发error
:发生错误时触发finish
:所有数据写入完成时触发
示例代码:读取文件的简单流程
const fs = require('fs');
const readable = fs.createReadStream('large-file.txt');
readable.on('data', (chunk) => {
console.log(`收到 ${chunk.length} 字节数据`);
});
readable.on('end', () => {
console.log('文件读取完成');
});
readable.on('error', (err) => {
console.error('读取错误:', err);
});
2. 流的可拼接特性
流对象可以像管道一样连接(pipe),实现数据的高效流转:
const fs = require('fs');
const zlib = require('zlib');
// 将文件内容压缩后写入新文件
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.gz');
const gzip = zlib.createGzip();
readStream
.pipe(gzip)
.pipe(writeStream);
这个链式调用相当于搭建了三条管道:原始文件 → 压缩处理器 → 输出文件,数据在流动过程中自动完成压缩处理。
进阶知识点:流的可控制性
1. 流的可暂停/恢复机制
通过 pause()
和 resume()
方法,可以控制数据流动速度:
const readable = fs.createReadStream('big-video.mp4');
let buffer = [];
readable.on('data', (chunk) => {
buffer.push(chunk);
if (buffer.length > 100) {
readable.pause(); // 数据堆积过多时暂停
process.nextTick(() => {
// 模拟处理数据
buffer = [];
readable.resume(); // 处理完成后恢复
});
}
});
比喻说明:
这就像交通信号灯控制车流,当道路拥堵时红灯暂停,疏通后绿灯放行,避免系统过载。
2. 流的流量控制
通过 flowing
模式管理数据流动:
- 自动流动模式:当监听
data
事件时自动开启 - 手动模式:通过
read()
方法逐次读取
const readable = fs.createReadStream('file.txt');
// 手动控制模式
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read())) {
console.log(`处理数据块 ${chunk.length}`);
}
});
实战案例:构建实时日志分析系统
案例背景
假设我们需要实时监控日志文件,统计每秒 PV(Page View):
const fs = require('fs');
const readline = require('readline');
// 创建可读流
const logStream = fs.createReadStream('access.log');
// 创建解析器处理逐行读取
const rl = readline.createInterface({
input: logStream,
crlfDelay: Infinity
});
let pvCount = 0;
let startTime = Date.now();
rl.on('line', (line) => {
pvCount++;
if (Date.now() - startTime >= 1000) {
console.log(`当前秒 PV: ${pvCount}`);
pvCount = 0;
startTime = Date.now();
}
});
logStream.on('end', () => {
console.log('日志文件读取结束');
});
关键点解析
- 使用
readline
模块将二进制流转换为文本行流 - 通过时间窗口统计实时数据
- 自动处理大型日志文件,避免内存溢出
高级技巧:自定义流对象
实现 Transform 流压缩数据
const { Transform } = require('stream');
class UppercaseStream extends Transform {
_transform(chunk, encoding, callback) {
const upperChunk = chunk.toString().toUpperCase();
this.push(upperChunk);
callback();
}
}
// 使用自定义流
const uppercaseStream = new UppercaseStream();
process.stdin
.pipe(uppercaseStream)
.pipe(process.stdout);
扩展思考:
通过继承 Transform
类,我们可以创建任意数据处理流。例如:
- 实时过滤敏感词
- 对 JSON 数据流进行格式化
- 在传输过程中加密/解密
性能优化:流的正确使用原则
1. 避免一次性读取全部数据
错误写法:
// 会将整个文件加载到内存
fs.readFile('big-file.mp4', (err, data) => { ... });
正确写法:
const readStream = fs.createReadStream('big-file.mp4');
readStream.on('data', (chunk) => { ... });
2. 处理流的错误事件
const readStream = fs.createReadStream('non-exist.txt');
readStream.on('error', (err) => {
if (err.code === 'ENOENT') {
console.log('文件不存在,尝试创建...');
// 自动创建空文件
fs.writeFile('non-exist.txt', '', () => {
// 重新尝试读取
});
}
});
3. 管理流的生命周期
const writeStream = fs.createWriteStream('output.txt');
// 写入完成后关闭流
writeStream.on('finish', () => {
console.log('写入完成');
writeStream.close();
});
结论:掌握流,掌控数据流动的艺术
通过本文,我们系统地学习了 Node.js Stream 的核心概念、实现原理及实战应用。流机制如同数据的"高速公路",让开发者能够优雅地处理海量数据,构建高并发、低延迟的 Web 应用。
对于初学者,建议从基础的文件流操作开始练习,逐步尝试 HTTP 流、TCP 流等进阶场景。中级开发者则可以深入探索流的复用、错误处理及自定义流的设计模式。记住:流的本质是数据的"持续流动",保持这种思维模式,您将能更高效地利用 Node.js 的异步优势。
现在,您可以尝试用流机制重构现有项目中涉及文件操作或网络传输的代码,体验内存使用率的下降和性能的提升。记住,真正的技术掌握,永远始于实践。