阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 常见Stream应用场景

常见Stream应用场景

作者:陈川 阅读数:63724人阅读 分类: Node.js

文件读写操作

Stream在文件处理中非常实用,特别是大文件场景。传统的fs.readFile会一次性加载整个文件到内存,而使用Stream可以分块处理。

const fs = require('fs');

// 读取大文件
const readStream = fs.createReadStream('./large-file.txt', {
  highWaterMark: 64 * 1024 // 每次读取64KB
});

// 写入新文件
const writeStream = fs.createWriteStream('./copy-file.txt');

readStream.pipe(writeStream);

readStream.on('end', () => {
  console.log('文件复制完成');
});

HTTP请求与响应

Node.js的HTTP模块本身就是基于Stream实现的。处理大型HTTP请求体时特别有用:

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
  // 流式响应
  const fileStream = fs.createReadStream('./large-data.json');
  fileStream.pipe(res);
  
  // 流式接收请求体
  let body = '';
  req.on('data', (chunk) => {
    body += chunk;
  });
  
  req.on('end', () => {
    console.log('请求体接收完成');
  });
}).listen(3000);

数据转换处理

通过管道连接多个转换流可以实现复杂的数据处理流水线:

const { Transform } = require('stream');

// 创建转换流
const upperCaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

// 使用转换流
process.stdin
  .pipe(upperCaseTransform)
  .pipe(process.stdout);

数据库操作

处理大量数据库记录时,Stream可以避免内存溢出:

const { MongoClient } = require('mongodb');

async function streamLargeData() {
  const client = await MongoClient.connect('mongodb://localhost:27017');
  const collection = client.db('test').collection('largeCollection');
  
  const stream = collection.find().stream();
  
  stream.on('data', (doc) => {
    console.log('处理文档:', doc._id);
  });
  
  stream.on('end', () => {
    client.close();
  });
}

实时日志处理

Stream非常适合处理持续产生的日志数据:

const fs = require('fs');
const readline = require('readline');

const logStream = fs.createReadStream('./app.log');
const rl = readline.createInterface({
  input: logStream,
  crlfDelay: Infinity
});

rl.on('line', (line) => {
  if (line.includes('ERROR')) {
    console.log('发现错误日志:', line);
  }
});

视频流处理

处理视频等大型媒体文件时,Stream是必须的:

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
  const range = req.headers.range;
  if (!range) {
    res.writeHead(400);
    return res.end();
  }
  
  const videoPath = './sample.mp4';
  const videoSize = fs.statSync(videoPath).size;
  
  // 解析范围请求
  const CHUNK_SIZE = 10 ** 6; // 1MB
  const start = Number(range.replace(/\D/g, ''));
  const end = Math.min(start + CHUNK_SIZE, videoSize - 1);
  
  // 设置响应头
  res.writeHead(206, {
    'Content-Range': `bytes ${start}-${end}/${videoSize}`,
    'Accept-Ranges': 'bytes',
    'Content-Length': end - start + 1,
    'Content-Type': 'video/mp4'
  });
  
  // 创建视频流
  const videoStream = fs.createReadStream(videoPath, { start, end });
  videoStream.pipe(res);
}).listen(3000);

压缩与解压缩

Stream可以高效处理压缩/解压缩任务:

const fs = require('fs');
const zlib = require('zlib');

// 压缩文件
fs.createReadStream('./large-file.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('./large-file.txt.gz'));

// 解压文件
fs.createReadStream('./large-file.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('./large-file-decompressed.txt'));

CSV数据处理

处理大型CSV文件时,Stream可以逐行处理:

const csv = require('csv-parser');
const fs = require('fs');

fs.createReadStream('large-data.csv')
  .pipe(csv())
  .on('data', (row) => {
    console.log('处理行:', row);
  })
  .on('end', () => {
    console.log('CSV文件处理完成');
  });

多数据源合并

合并多个数据源时,Stream可以避免内存问题:

const { PassThrough } = require('stream');
const fs = require('fs');

// 创建合并流
const mergeStreams = (...streams) => {
  const passThrough = new PassThrough();
  
  let ended = 0;
  const checkEnd = () => {
    if (++ended === streams.length) {
      passThrough.end();
    }
  };
  
  streams.forEach(stream => {
    stream.pipe(passThrough, { end: false });
    stream.on('end', checkEnd);
  });
  
  return passThrough;
};

// 使用示例
const stream1 = fs.createReadStream('file1.txt');
const stream2 = fs.createReadStream('file2.txt');
const merged = mergeStreams(stream1, stream2);

merged.pipe(fs.createWriteStream('merged.txt'));

实时通信

WebSocket等实时通信协议通常基于Stream实现:

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  // 接收客户端消息流
  ws.on('message', (message) => {
    console.log('收到消息:', message);
  });
  
  // 创建定期发送的数据流
  const interval = setInterval(() => {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({ time: Date.now() }));
    } else {
      clearInterval(interval);
    }
  }, 1000);
});

图像处理

处理大型图像文件时,Stream可以分块处理:

const sharp = require('sharp');
const fs = require('fs');

// 流式图像处理
fs.createReadStream('input.jpg')
  .pipe(sharp()
    .resize(800, 600)
    .jpeg({ quality: 90 })
  )
  .pipe(fs.createWriteStream('output.jpg'));

加密解密数据

处理加密数据流可以保证安全性同时降低内存使用:

const crypto = require('crypto');
const fs = require('fs');

// 加密流
const cipher = crypto.createCipher('aes-256-cbc', 'secret-key');
// 解密流
const decipher = crypto.createDecipher('aes-256-cbc', 'secret-key');

// 加密文件
fs.createReadStream('secret-data.txt')
  .pipe(cipher)
  .pipe(fs.createWriteStream('encrypted.dat'));

// 解密文件
fs.createReadStream('encrypted.dat')
  .pipe(decipher)
  .pipe(fs.createWriteStream('decrypted.txt'));

自定义协议实现

实现自定义网络协议时,Stream提供了很好的抽象:

const net = require('net');
const { Transform } = require('stream');

// 自定义协议解析器
class ProtocolParser extends Transform {
  constructor() {
    super({ readableObjectMode: true });
    this.buffer = Buffer.alloc(0);
  }
  
  _transform(chunk, encoding, callback) {
    this.buffer = Buffer.concat([this.buffer, chunk]);
    
    while (this.buffer.length >= 4) {
      const length = this.buffer.readUInt32BE(0);
      if (this.buffer.length >= 4 + length) {
        const message = this.buffer.slice(4, 4 + length);
        this.push(message.toString());
        this.buffer = this.buffer.slice(4 + length);
      } else {
        break;
      }
    }
    callback();
  }
}

// 使用自定义协议
const server = net.createServer((socket) => {
  socket
    .pipe(new ProtocolParser())
    .on('data', (message) => {
      console.log('收到消息:', message);
    });
});

server.listen(3000);

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌