阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 微服务框架

微服务框架

作者:陈川 阅读数:60821人阅读 分类: Node.js

微服务架构的核心概念

微服务架构是一种将单一应用程序划分为一组小型服务的开发方式,每个服务运行在自己的进程中,通过轻量级机制(通常是HTTP API)进行通信。这些服务围绕业务能力构建,可以独立部署,使用不同的编程语言和数据存储技术。Node.js因其轻量级、事件驱动和非阻塞I/O模型,成为实现微服务的理想选择。

典型的微服务架构包含以下关键组件:

  • 服务发现(Service Discovery)
  • API网关(API Gateway)
  • 配置中心(Configuration Center)
  • 熔断器(Circuit Breaker)
  • 分布式追踪(Distributed Tracing)

Node.js微服务框架选择

Express + 相关中间件

Express是最基础的Node.js Web框架,配合各种中间件可以构建微服务:

const express = require('express');
const { json } = require('body-parser');
const app = express();

app.use(json());

app.get('/products/:id', async (req, res) => {
  const product = await productService.get(req.params.id);
  res.json(product);
});

app.listen(3000, () => console.log('Product service started'));

NestJS

NestJS是一个渐进式Node.js框架,内置支持微服务架构:

import { Controller, Get } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  @MessagePattern({ cmd: 'sum' })
  accumulate(data: number[]): number {
    return (data || []).reduce((a, b) => a + b);
  }
}

Fastify

Fastify是高性能的低开销框架,适合构建微服务:

const fastify = require('fastify')({ logger: true });

fastify.get('/users', async (request, reply) => {
  return userService.getAll();
});

fastify.listen(3000, (err) => {
  if (err) throw err;
});

服务通信机制

REST API

最常用的同步通信方式:

// 使用axios调用其他服务
const axios = require('axios');

async function getOrderDetails(orderId) {
  try {
    const response = await axios.get(`http://order-service/orders/${orderId}`);
    return response.data;
  } catch (error) {
    console.error('Failed to fetch order', error);
    throw error;
  }
}

gRPC

高性能的RPC框架:

// order.proto
service OrderService {
  rpc GetOrder (OrderRequest) returns (OrderResponse);
}

message OrderRequest {
  string id = 1;
}

message OrderResponse {
  string id = 1;
  string status = 2;
}
// 客户端实现
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');

const packageDefinition = protoLoader.loadSync('order.proto');
const orderProto = grpc.loadPackageDefinition(packageDefinition);

const client = new orderProto.OrderService(
  'localhost:50051',
  grpc.credentials.createInsecure()
);

client.GetOrder({ id: '123' }, (err, response) => {
  console.log('Order:', response);
});

消息队列(RabbitMQ/Kafka)

异步通信方式示例:

const amqp = require('amqplib');

async function consumeOrders() {
  const conn = await amqp.connect('amqp://localhost');
  const channel = await conn.createChannel();
  
  const queue = 'orders';
  await channel.assertQueue(queue, { durable: true });
  
  channel.consume(queue, (msg) => {
    const order = JSON.parse(msg.content.toString());
    processOrder(order);
    channel.ack(msg);
  });
}

服务发现与负载均衡

Consul集成

const consul = require('consul')({ host: 'consul-server' });

// 服务注册
consul.agent.service.register({
  name: 'product-service',
  address: 'localhost',
  port: 3000,
  check: {
    http: 'http://localhost:3000/health',
    interval: '10s'
  }
}, (err) => {
  if (err) throw err;
});

// 服务发现
consul.catalog.service.nodes('order-service', (err, result) => {
  const instances = result.map(instance => `http://${instance.ServiceAddress}:${instance.ServicePort}`);
});

Kubernetes服务发现

在K8s环境中:

const dns = require('dns');

dns.resolve('order-service.default.svc.cluster.local', (err, addresses) => {
  console.log('Available order service instances:', addresses);
});

配置管理

使用ConfigMap

const config = {
  db: {
    host: process.env.DB_HOST || 'localhost',
    port: process.env.DB_PORT || 5432
  },
  redis: {
    url: process.env.REDIS_URL || 'redis://localhost:6379'
  }
};

配置中心(如Apollo)

const { ApolloClient } = require('apollo-client');

const client = new ApolloClient({
  configServerUrl: 'http://config-server:8080',
  appId: 'product-service',
  clusterName: 'default'
});

client.getConfig('db.url').then(url => {
  console.log('Database URL:', url);
});

熔断与容错

使用Hystrix模式

const circuitBreaker = require('opossum');

const breaker = new circuitBreaker(async (url) => {
  const response = await axios.get(url);
  return response.data;
}, {
  timeout: 3000,
  errorThresholdPercentage: 50,
  resetTimeout: 30000
});

breaker.fallback(() => 'Service unavailable, please try later');

// 使用断路器
breaker.fire('http://order-service/orders')
  .then(console.log)
  .catch(console.error);

分布式追踪

Jaeger集成

const { initTracer } = require('jaeger-client');

const config = {
  serviceName: 'product-service',
  sampler: {
    type: 'const',
    param: 1,
  },
  reporter: {
    logSpans: true,
    agentHost: 'jaeger-agent',
  },
};

const tracer = initTracer(config);

function getProduct(ctx, id) {
  const span = tracer.startSpan('getProduct', { childOf: ctx.span });
  // ...业务逻辑
  span.finish();
}

容器化部署

Dockerfile示例

FROM node:16-alpine

WORKDIR /app
COPY package*.json ./
RUN npm install --production

COPY . .

EXPOSE 3000
CMD ["node", "server.js"]

Kubernetes部署文件

apiVersion: apps/v1
kind: Deployment
metadata:
  name: product-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: product-service
  template:
    metadata:
      labels:
        app: product-service
    spec:
      containers:
      - name: product
        image: product-service:1.0.0
        ports:
        - containerPort: 3000
        env:
        - name: DB_HOST
          valueFrom:
            configMapKeyRef:
              name: app-config
              key: db.host

监控与日志

Prometheus指标

const prometheus = require('prom-client');

const httpRequestDurationMicroseconds = new prometheus.Histogram({
  name: 'http_request_duration_ms',
  help: 'Duration of HTTP requests in ms',
  labelNames: ['method', 'route', 'code'],
  buckets: [0.1, 5, 15, 50, 100, 200, 300, 400, 500]
});

app.use((req, res, next) => {
  const end = httpRequestDurationMicroseconds.startTimer();
  res.on('finish', () => {
    end({ method: req.method, route: req.route.path, code: res.statusCode });
  });
  next();
});

ELK日志收集

const { createLogger, transports } = require('winston');
const { ElasticsearchTransport } = require('winston-elasticsearch');

const esTransport = new ElasticsearchTransport({
  level: 'info',
  clientOpts: { node: 'http://elasticsearch:9200' }
});

const logger = createLogger({
  transports: [esTransport]
});

logger.info('Service started', { timestamp: new Date() });

测试策略

单元测试示例

const { getProduct } = require('./product-service');
const mockDb = require('./mock-db');

describe('Product Service', () => {
  beforeAll(() => {
    mockDb.setup();
  });

  it('should return product by id', async () => {
    const product = await getProduct('123');
    expect(product.id).toBe('123');
    expect(product.name).toBeDefined();
  });
});

契约测试(Pact)

const { Pact } = require('@pact-foundation/pact');

describe('Product Service', () => {
  const provider = new Pact({
    consumer: 'OrderService',
    provider: 'ProductService',
    port: 1234
  });

  beforeAll(() => provider.setup());
  afterEach(() => provider.verify());
  afterAll(() => provider.finalize());

  describe('GET /products/123', () => {
    beforeAll(() => {
      return provider.addInteraction({
        state: 'product 123 exists',
        uponReceiving: 'a request for product 123',
        withRequest: {
          method: 'GET',
          path: '/products/123'
        },
        willRespondWith: {
          status: 200,
          body: {
            id: '123',
            name: 'Test Product'
          }
        }
      });
    });

    it('should return product 123', async () => {
      const response = await axios.get(`${provider.mockService.baseUrl}/products/123`);
      expect(response.data.id).toBe('123');
    });
  });
});

安全考虑

JWT验证

const jwt = require('express-jwt');
const jwks = require('jwks-rsa');

const jwtCheck = jwt({
  secret: jwks.expressJwtSecret({
    cache: true,
    rateLimit: true,
    jwksRequestsPerMinute: 5,
    jwksUri: 'https://auth.example.com/.well-known/jwks.json'
  }),
  audience: 'product-service',
  issuer: 'https://auth.example.com/',
  algorithms: ['RS256']
});

app.use(jwtCheck);

速率限制

const rateLimit = require('express-rate-limit');

const limiter = rateLimit({
  windowMs: 15 * 60 * 1000, // 15分钟
  max: 100 // 每个IP限制100个请求
});

app.use('/api/', limiter);

性能优化

缓存策略

const NodeCache = require('node-cache');
const cache = new NodeCache({ stdTTL: 600 });

app.get('/products/:id', async (req, res) => {
  const cacheKey = `product_${req.params.id}`;
  let product = cache.get(cacheKey);
  
  if (!product) {
    product = await productService.get(req.params.id);
    cache.set(cacheKey, product);
  }
  
  res.json(product);
});

连接池管理

const { Pool } = require('pg');
const pool = new Pool({
  max: 20, // 最大连接数
  idleTimeoutMillis: 30000, // 空闲连接超时
  connectionTimeoutMillis: 2000 // 连接超时
});

async function query(text, params) {
  const client = await pool.connect();
  try {
    return await client.query(text, params);
  } finally {
    client.release();
  }
}

持续集成与交付

GitHub Actions示例

name: Node.js CI

on: [push]

jobs:
  build:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v2
    - name: Use Node.js
      uses: actions/setup-node@v1
      with:
        node-version: '16'
    - run: npm ci
    - run: npm test
    - run: npm run build
    
  deploy:
    needs: build
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    - run: kubectl apply -f k8s/
      env:
        KUBE_CONFIG: ${{ secrets.KUBE_CONFIG }}

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

上一篇:部署工具

下一篇:Serverless应用

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌