常见Stream应用场景
文件读写操作
Stream在文件处理中非常实用,特别是大文件场景。传统的fs.readFile
会一次性加载整个文件到内存,而使用Stream可以分块处理。
const fs = require('fs');
// 读取大文件
const readStream = fs.createReadStream('./large-file.txt', {
highWaterMark: 64 * 1024 // 每次读取64KB
});
// 写入新文件
const writeStream = fs.createWriteStream('./copy-file.txt');
readStream.pipe(writeStream);
readStream.on('end', () => {
console.log('文件复制完成');
});
HTTP请求与响应
Node.js的HTTP模块本身就是基于Stream实现的。处理大型HTTP请求体时特别有用:
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
// 流式响应
const fileStream = fs.createReadStream('./large-data.json');
fileStream.pipe(res);
// 流式接收请求体
let body = '';
req.on('data', (chunk) => {
body += chunk;
});
req.on('end', () => {
console.log('请求体接收完成');
});
}).listen(3000);
数据转换处理
通过管道连接多个转换流可以实现复杂的数据处理流水线:
const { Transform } = require('stream');
// 创建转换流
const upperCaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
// 使用转换流
process.stdin
.pipe(upperCaseTransform)
.pipe(process.stdout);
数据库操作
处理大量数据库记录时,Stream可以避免内存溢出:
const { MongoClient } = require('mongodb');
async function streamLargeData() {
const client = await MongoClient.connect('mongodb://localhost:27017');
const collection = client.db('test').collection('largeCollection');
const stream = collection.find().stream();
stream.on('data', (doc) => {
console.log('处理文档:', doc._id);
});
stream.on('end', () => {
client.close();
});
}
实时日志处理
Stream非常适合处理持续产生的日志数据:
const fs = require('fs');
const readline = require('readline');
const logStream = fs.createReadStream('./app.log');
const rl = readline.createInterface({
input: logStream,
crlfDelay: Infinity
});
rl.on('line', (line) => {
if (line.includes('ERROR')) {
console.log('发现错误日志:', line);
}
});
视频流处理
处理视频等大型媒体文件时,Stream是必须的:
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
const range = req.headers.range;
if (!range) {
res.writeHead(400);
return res.end();
}
const videoPath = './sample.mp4';
const videoSize = fs.statSync(videoPath).size;
// 解析范围请求
const CHUNK_SIZE = 10 ** 6; // 1MB
const start = Number(range.replace(/\D/g, ''));
const end = Math.min(start + CHUNK_SIZE, videoSize - 1);
// 设置响应头
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end}/${videoSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': end - start + 1,
'Content-Type': 'video/mp4'
});
// 创建视频流
const videoStream = fs.createReadStream(videoPath, { start, end });
videoStream.pipe(res);
}).listen(3000);
压缩与解压缩
Stream可以高效处理压缩/解压缩任务:
const fs = require('fs');
const zlib = require('zlib');
// 压缩文件
fs.createReadStream('./large-file.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('./large-file.txt.gz'));
// 解压文件
fs.createReadStream('./large-file.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('./large-file-decompressed.txt'));
CSV数据处理
处理大型CSV文件时,Stream可以逐行处理:
const csv = require('csv-parser');
const fs = require('fs');
fs.createReadStream('large-data.csv')
.pipe(csv())
.on('data', (row) => {
console.log('处理行:', row);
})
.on('end', () => {
console.log('CSV文件处理完成');
});
多数据源合并
合并多个数据源时,Stream可以避免内存问题:
const { PassThrough } = require('stream');
const fs = require('fs');
// 创建合并流
const mergeStreams = (...streams) => {
const passThrough = new PassThrough();
let ended = 0;
const checkEnd = () => {
if (++ended === streams.length) {
passThrough.end();
}
};
streams.forEach(stream => {
stream.pipe(passThrough, { end: false });
stream.on('end', checkEnd);
});
return passThrough;
};
// 使用示例
const stream1 = fs.createReadStream('file1.txt');
const stream2 = fs.createReadStream('file2.txt');
const merged = mergeStreams(stream1, stream2);
merged.pipe(fs.createWriteStream('merged.txt'));
实时通信
WebSocket等实时通信协议通常基于Stream实现:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
// 接收客户端消息流
ws.on('message', (message) => {
console.log('收到消息:', message);
});
// 创建定期发送的数据流
const interval = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ time: Date.now() }));
} else {
clearInterval(interval);
}
}, 1000);
});
图像处理
处理大型图像文件时,Stream可以分块处理:
const sharp = require('sharp');
const fs = require('fs');
// 流式图像处理
fs.createReadStream('input.jpg')
.pipe(sharp()
.resize(800, 600)
.jpeg({ quality: 90 })
)
.pipe(fs.createWriteStream('output.jpg'));
加密解密数据
处理加密数据流可以保证安全性同时降低内存使用:
const crypto = require('crypto');
const fs = require('fs');
// 加密流
const cipher = crypto.createCipher('aes-256-cbc', 'secret-key');
// 解密流
const decipher = crypto.createDecipher('aes-256-cbc', 'secret-key');
// 加密文件
fs.createReadStream('secret-data.txt')
.pipe(cipher)
.pipe(fs.createWriteStream('encrypted.dat'));
// 解密文件
fs.createReadStream('encrypted.dat')
.pipe(decipher)
.pipe(fs.createWriteStream('decrypted.txt'));
自定义协议实现
实现自定义网络协议时,Stream提供了很好的抽象:
const net = require('net');
const { Transform } = require('stream');
// 自定义协议解析器
class ProtocolParser extends Transform {
constructor() {
super({ readableObjectMode: true });
this.buffer = Buffer.alloc(0);
}
_transform(chunk, encoding, callback) {
this.buffer = Buffer.concat([this.buffer, chunk]);
while (this.buffer.length >= 4) {
const length = this.buffer.readUInt32BE(0);
if (this.buffer.length >= 4 + length) {
const message = this.buffer.slice(4, 4 + length);
this.push(message.toString());
this.buffer = this.buffer.slice(4 + length);
} else {
break;
}
}
callback();
}
}
// 使用自定义协议
const server = net.createServer((socket) => {
socket
.pipe(new ProtocolParser())
.on('data', (message) => {
console.log('收到消息:', message);
});
});
server.listen(3000);
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:Stream的错误处理
下一篇:fs模块的核心API