阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 集群模式与负载均衡

集群模式与负载均衡

作者:陈川 阅读数:19839人阅读 分类: Node.js

集群模式与负载均衡

Koa2作为Node.js的轻量级框架,在处理高并发请求时,单进程模式容易成为性能瓶颈。通过集群模式与负载均衡技术,可以充分利用多核CPU资源,显著提升应用的吞吐量和稳定性。

集群模式基础实现

Node.js的cluster模块允许创建共享同一端口的子进程。主进程(master)负责管理工作进程(worker),每个worker都是独立的V8实例。基础实现如下:

const cluster = require('cluster');
const os = require('os');
const Koa = require('koa');

if (cluster.isMaster) {
  const cpuCount = os.cpus().length;
  
  for (let i = 0; i < cpuCount; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker) => {
    console.log(`Worker ${worker.id} died`);
    cluster.fork();
  });
} else {
  const app = new Koa();
  
  app.use(async ctx => {
    ctx.body = `Worker ${cluster.worker.id} handled this request`;
  });

  app.listen(3000);
}

这种模式下,操作系统会以轮询方式分配请求到各worker进程。当某个worker崩溃时,主进程会立即重启新的worker实例。

高级负载均衡策略

除了操作系统自带的轮询策略,还可以实现更精细的负载控制:

1. 基于连接数的动态分配

// master进程
const workers = {};
const createWorker = () => {
  const worker = cluster.fork();
  workers[worker.id] = { 
    conns: 0,
    instance: worker
  };
  
  worker.on('message', (msg) => {
    if (msg.type === 'updateConn') {
      workers[worker.id].conns = msg.count;
    }
  });
};

// 选择连接数最少的worker
const getWorker = () => {
  return Object.values(workers)
    .sort((a, b) => a.conns - b.conns)[0]
    .instance;
};

2. 基于响应时间的权重分配

// worker进程
let responseTimes = [];
setInterval(() => {
  process.send({
    type: 'perfMetrics',
    avgTime: responseTimes.reduce((a,b) => a+b, 0) / responseTimes.length || 0
  });
  responseTimes = [];
}, 5000);

app.use(async (ctx, next) => {
  const start = Date.now();
  await next();
  const duration = Date.now() - start;
  responseTimes.push(duration);
  ctx.set('X-Response-Time', `${duration}ms`);
});

进程间通信优化

集群模式下需要特别注意进程间状态共享问题:

使用Redis共享会话

const session = require('koa-session');
const RedisStore = require('koa-redis');

app.keys = ['some secret key'];
app.use(session({
  store: new RedisStore({
    host: '127.0.0.1',
    port: 6379,
    ttl: 86400 * 30
  }),
  key: 'koa:sess'
}, app));

事件广播机制

// master进程
cluster.on('message', (worker, message) => {
  if (message.type === 'broadcast') {
    for (const id in cluster.workers) {
      cluster.workers[id].send(message);
    }
  }
});

// worker进程
process.on('message', (msg) => {
  if (msg.event === 'configUpdate') {
    reloadConfig(msg.data);
  }
});

function broadcast(data) {
  process.send({
    type: 'broadcast',
    event: 'configUpdate',
    data: data
  });
}

零停机部署方案

通过负载均衡器可以实现无缝重启:

// 优雅关闭worker
process.on('SIGTERM', () => {
  server.close(() => {
    process.exit(0);
  });
  
  setTimeout(() => {
    process.exit(1);
  }, 5000);
});

// 滚动重启策略
let restartQueue = [];
function rollingRestart() {
  const workers = Object.values(cluster.workers);
  restartQueue = [...workers];
  
  const restartNext = () => {
    const worker = restartQueue.pop();
    if (!worker) return;
    
    worker.once('disconnect', () => {
      const newWorker = cluster.fork();
      newWorker.once('listening', restartNext);
    });
    
    worker.send('shutdown');
  };
  
  restartNext();
}

性能监控与调优

完善的监控体系对集群管理至关重要:

// 收集各worker指标
const stats = {};
setInterval(() => {
  for (const worker of Object.values(cluster.workers)) {
    worker.send({ type: 'getStats' });
  }
}, 10000);

cluster.on('message', (worker, msg) => {
  if (msg.type === 'stats') {
    stats[worker.id] = {
      memory: msg.memory,
      load: msg.load,
      uptime: msg.uptime
    };
  }
});

// worker端实现
const os = require('os');
setInterval(() => {
  const mem = process.memoryUsage();
  const load = os.loadavg()[0];
  
  process.send({
    type: 'stats',
    memory: {
      rss: mem.rss,
      heapTotal: mem.heapTotal,
      heapUsed: mem.heapUsed
    },
    load: load,
    uptime: process.uptime()
  });
}, 5000);

容器化部署考量

在Docker环境中运行集群需要特殊处理:

# Dockerfile示例
FROM node:14
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
CMD ["node", "cluster.js"]

启动时需注意CPU限制:

# 根据实际CPU核心数设置
docker run -e NODE_CLUSTER_SCHED_POLICY='rr' -e NODE_CLUSTER_WORKERS='max' -p 3000:3000 app

常见问题解决方案

端口冲突问题

// 使用SO_REUSEPORT选项(Node.js 13.0+)
const net = require('net');
const server = net.createServer({ reusePort: true });
server.listen(3000);

文件描述符限制

# 查看当前限制
ulimit -n

# 临时提高限制
ulimit -n 100000

内存泄漏排查

const heapdump = require('heapdump');
process.on('SIGUSR2', () => {
  const filename = `/tmp/heapdump-${process.pid}-${Date.now()}.heapsnapshot`;
  heapdump.writeSnapshot(filename);
  console.log(`Heap dump written to ${filename}`);
});

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌