数据流式传输优化
数据流式传输优化
数据流式传输在现代Web应用中越来越重要,特别是在处理大文件上传、实时视频流或大规模日志传输等场景。Koa2作为新一代Node.js框架,其轻量级和中间件机制为流式传输优化提供了良好基础。
流式传输基础概念
流(Stream)是Node.js中处理流式数据的抽象接口,分为四种基本类型:
- Readable - 可读流
- Writable - 可写流
- Duplex - 双向流
- Transform - 转换流
在Koa2中,流式处理的核心是ctx.req和ctx.res对象,它们分别是IncomingMessage和ServerResponse的实例,天然支持流式操作。
const fs = require('fs');
const Koa = require('koa');
const app = new Koa();
app.use(async ctx => {
const src = fs.createReadStream('./large-file.zip');
ctx.type = 'application/zip';
ctx.body = src;
});
app.listen(3000);
内存优化策略
传统文件传输方式会将整个文件加载到内存,而流式传输可以显著降低内存消耗:
// 传统方式 - 内存消耗大
app.use(async ctx => {
const data = fs.readFileSync('./large-file.zip');
ctx.body = data;
});
// 流式方式 - 内存友好
app.use(async ctx => {
const stream = fs.createReadStream('./large-file.zip');
ctx.body = stream;
});
传输速率优化
通过管道(pipe)和背压(backpressure)机制,可以自动控制数据流动速率:
app.use(async ctx => {
const readStream = fs.createReadStream('./video.mp4');
const transformStream = new Transform({
transform(chunk, encoding, callback) {
// 可以在这里添加处理逻辑
this.push(chunk);
callback();
}
});
ctx.type = 'video/mp4';
readStream.pipe(transformStream).pipe(ctx.res);
});
错误处理机制
流式传输需要特别注意错误处理:
app.use(async ctx => {
try {
const stream = fs.createReadStream('./file.zip');
stream.on('error', err => {
ctx.status = 404;
ctx.body = 'File not found';
});
ctx.type = 'application/zip';
ctx.body = stream;
} catch (err) {
ctx.status = 500;
ctx.body = 'Internal Server Error';
}
});
多流合并处理
Koa2中可以方便地处理多个流的合并:
const { PassThrough } = require('stream');
app.use(async ctx => {
const passThrough = new PassThrough();
const stream1 = fs.createReadStream('./part1.zip');
const stream2 = fs.createReadStream('./part2.zip');
stream1.pipe(passThrough, { end: false });
stream2.pipe(passThrough, { end: false });
stream1.on('end', () => {
stream2.on('end', () => passThrough.end());
});
ctx.type = 'application/zip';
ctx.body = passThrough;
});
进度监控实现
通过监听流事件可以实现传输进度监控:
app.use(async ctx => {
const filePath = './large-file.zip';
const stats = fs.statSync(filePath);
const fileSize = stats.size;
let uploadedBytes = 0;
const stream = fs.createReadStream(filePath);
stream.on('data', chunk => {
uploadedBytes += chunk.length;
const progress = (uploadedBytes / fileSize * 100).toFixed(2);
console.log(`Upload progress: ${progress}%`);
});
ctx.type = 'application/zip';
ctx.body = stream;
});
压缩传输优化
结合压缩流可以进一步优化传输效率:
const zlib = require('zlib');
app.use(async ctx => {
ctx.set('Content-Encoding', 'gzip');
const stream = fs.createReadStream('./large-file.log');
const gzip = zlib.createGzip();
ctx.type = 'text/plain';
ctx.body = stream.pipe(gzip);
});
断点续传支持
通过HTTP Range头实现断点续传功能:
app.use(async ctx => {
const filePath = './video.mp4';
const stats = fs.statSync(filePath);
const fileSize = stats.size;
const range = ctx.headers.range;
if (range) {
const parts = range.replace(/bytes=/, "").split("-");
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
const chunksize = (end - start) + 1;
ctx.set('Content-Range', `bytes ${start}-${end}/${fileSize}`);
ctx.set('Accept-Ranges', 'bytes');
ctx.set('Content-Length', chunksize);
ctx.status = 206;
const stream = fs.createReadStream(filePath, { start, end });
ctx.body = stream;
} else {
ctx.set('Content-Length', fileSize);
ctx.body = fs.createReadStream(filePath);
}
});
流式API设计
设计良好的流式API接口:
router.get('/stream/data', async ctx => {
// 设置流式响应头
ctx.set('Content-Type', 'application/octet-stream');
ctx.set('Transfer-Encoding', 'chunked');
// 创建自定义可读流
const readable = new Readable({
read(size) {
// 模拟数据生成
for (let i = 0; i < 5; i++) {
this.push(`数据块 ${i}\n`);
}
this.push(null); // 结束流
}
});
ctx.body = readable;
});
性能监控指标
实现流式传输的性能监控:
app.use(async (ctx, next) => {
const start = Date.now();
let bytesTransferred = 0;
await next();
if (ctx.body && typeof ctx.body.pipe === 'function') {
const originalPipe = ctx.body.pipe;
ctx.body.pipe = function(destination, options) {
ctx.body.on('data', chunk => {
bytesTransferred += chunk.length;
});
ctx.body.on('end', () => {
const duration = Date.now() - start;
console.log(`传输完成: ${bytesTransferred} bytes, 耗时: ${duration}ms`);
});
return originalPipe.call(ctx.body, destination, options);
};
}
});
浏览器端流处理
前端配合处理流式响应:
// 前端JavaScript代码
fetch('/stream/data')
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder();
function readChunk() {
return reader.read().then(({ done, value }) => {
if (done) return;
console.log('收到数据:', decoder.decode(value));
return readChunk();
});
}
return readChunk();
});
流式数据库查询
处理大量数据库查询结果的流式输出:
const { Pool } = require('pg');
const pool = new Pool();
app.use(async ctx => {
const client = await pool.connect();
try {
const queryStream = client.query(new Cursor('SELECT * FROM large_table'));
ctx.type = 'application/json';
ctx.body = queryStream;
// 确保连接在流结束后释放
ctx.res.on('finish', () => client.release());
ctx.res.on('close', () => client.release());
} catch (err) {
client.release();
throw err;
}
});
流式日志处理
高效处理日志文件的流式传输:
const readline = require('readline');
app.use(async ctx => {
const fileStream = fs.createReadStream('./app.log');
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
const transform = new Transform({
transform(line, encoding, callback) {
// 过滤错误日志
if (line.includes('ERROR')) {
this.push(line + '\n');
}
callback();
}
});
ctx.type = 'text/plain';
rl.on('line', line => transform.write(line));
rl.on('close', () => transform.end());
ctx.body = transform;
});
WebSocket流式传输
结合WebSocket实现双向流式通信:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', ws => {
const fileStream = fs.createReadStream('./data.bin');
fileStream.on('data', chunk => {
ws.send(chunk);
});
fileStream.on('end', () => {
ws.close();
});
ws.on('message', message => {
// 处理客户端消息
});
});
流式请求体处理
处理客户端上传的流式请求体:
const busboy = require('busboy');
app.use(async ctx => {
if (!ctx.req.headers['content-type']) {
ctx.throw(400, 'Content-Type required');
}
const bb = busboy({ headers: ctx.req.headers });
const fileWrites = [];
bb.on('file', (fieldname, file, info) => {
const saveTo = `./uploads/${info.filename}`;
const writeStream = fs.createWriteStream(saveTo);
file.pipe(writeStream);
fileWrites.push(new Promise((resolve, reject) => {
file.on('end', () => writeStream.end());
writeStream.on('finish', resolve);
writeStream.on('error', reject);
}));
});
bb.on('finish', async () => {
await Promise.all(fileWrites);
ctx.body = 'Upload complete';
});
bb.on('error', err => {
ctx.throw(500, 'Upload error');
});
ctx.req.pipe(bb);
});
流式缓存策略
实现流式数据的缓存机制:
const { Writable } = require('stream');
const cache = new Map();
class CacheStream extends Writable {
constructor(key) {
super();
this.chunks = [];
this.key = key;
}
_write(chunk, encoding, callback) {
this.chunks.push(chunk);
callback();
}
_final(callback) {
cache.set(this.key, Buffer.concat(this.chunks));
callback();
}
}
app.use(async ctx => {
const cacheKey = ctx.url;
if (cache.has(cacheKey)) {
ctx.body = cache.get(cacheKey);
return;
}
const sourceStream = fs.createReadStream('./data.json');
const cacheStream = new CacheStream(cacheKey);
sourceStream.pipe(cacheStream);
ctx.body = sourceStream;
});
流式数据转换
在传输过程中实时转换数据格式:
const { Transform } = require('stream');
const csv = require('csv-parser');
const { stringify } = require('JSONStream');
app.use(async ctx => {
const csvStream = fs.createReadStream('./data.csv')
.pipe(csv())
.pipe(new Transform({
objectMode: true,
transform(row, encoding, callback) {
// CSV行转换为对象
this.push(JSON.stringify(row) + '\n');
callback();
}
}));
ctx.type = 'application/json';
ctx.body = csvStream;
});
流式限速控制
控制流式传输的速率:
const Throttle = require('throttle');
app.use(async ctx => {
// 限制为100KB/s
const throttle = new Throttle(1024 * 100);
const videoStream = fs.createReadStream('./video.mp4');
ctx.type = 'video/mp4';
ctx.body = videoStream.pipe(throttle);
});
流式加密传输
实现流式数据的加密传输:
const crypto = require('crypto');
app.use(async ctx => {
const algorithm = 'aes-256-cbc';
const key = crypto.randomBytes(32);
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(algorithm, key, iv);
ctx.set('X-Encryption-Key', key.toString('hex'));
ctx.set('X-Encryption-IV', iv.toString('hex'));
const fileStream = fs.createReadStream('./secret-data.bin');
ctx.type = 'application/octet-stream';
ctx.body = fileStream.pipe(cipher);
});
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn