阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 发布/订阅模式

发布/订阅模式

作者:陈川 阅读数:39178人阅读 分类: Node.js

发布/订阅模式的核心概念

发布/订阅模式(Pub/Sub)是一种消息传递范式,消息发送者(发布者)不直接将消息发送给特定接收者(订阅者),而是将消息分类发布到特定频道。订阅者可以订阅一个或多个频道,只接收感兴趣的消息。这种模式实现了发布者和订阅者的完全解耦,发布者无需知道订阅者的存在,订阅者也无需关心消息来源。

在Node.js中,发布/订阅模式常用于事件处理、实时通信和微服务架构。典型的实现包括:

  • 进程内事件总线(如EventEmitter)
  • 分布式消息队列(如Redis Pub/Sub)
  • WebSocket实时通信

Node.js中的EventEmitter实现

Node.js内置的events模块提供了基础的发布/订阅实现。以下是一个完整示例:

const EventEmitter = require('events');

class OrderService extends EventEmitter {
  constructor() {
    super();
    // 模拟订单创建
    this.createOrder = (orderData) => {
      // 业务逻辑...
      this.emit('order_created', orderData);
    };
  }
}

const orderService = new OrderService();

// 订阅订单创建事件
orderService.on('order_created', (order) => {
  console.log(`[邮件服务] 订单创建通知: ${order.id}`);
});

// 另一个订阅者
orderService.on('order_created', (order) => {
  console.log(`[库存服务] 扣减库存: ${order.items}`);
});

// 触发事件
orderService.createOrder({
  id: '1001',
  items: ['商品A', '商品B'],
  total: 299.99
});

这种实现方式的特性包括:

  • 同步事件触发:emit()会同步调用所有监听器
  • 支持一次性监听器:使用once()方法
  • 可以获取监听器数量:listenerCount()
  • 支持错误事件特殊处理:'error'事件

Redis的Pub/Sub实现

对于分布式系统,可以使用Redis的发布订阅功能:

const redis = require('redis');
const publisher = redis.createClient();
const subscriber = redis.createClient();

// 订阅者
subscriber.on('message', (channel, message) => {
  console.log(`收到 ${channel} 频道的消息: ${message}`);
});

subscriber.subscribe('notifications');

// 发布者
publisher.publish('notifications', '系统将于今晚升级');

Redis Pub/Sub的特点:

  • 跨进程/跨服务器通信
  • 频道支持模式匹配(PSUBSCRIBE)
  • 消息不会持久化
  • 适合广播场景

高级模式实现

更复杂的发布订阅系统可以实现以下特性:

class AdvancedPubSub {
  constructor() {
    this.channels = new Map();
  }

  subscribe(channel, callback) {
    if (!this.channels.has(channel)) {
      this.channels.set(channel, new Set());
    }
    this.channels.get(channel).add(callback);
    return () => this.unsubscribe(channel, callback);
  }

  unsubscribe(channel, callback) {
    if (this.channels.has(channel)) {
      this.channels.get(channel).delete(callback);
    }
  }

  publish(channel, data) {
    if (this.channels.has(channel)) {
      this.channels.get(channel).forEach(callback => {
        try {
          callback(data);
        } catch (e) {
          console.error(`回调执行错误: ${e}`);
        }
      });
    }
  }
}

// 使用示例
const pubsub = new AdvancedPubSub();
const unsubscribe = pubsub.subscribe('news', (data) => {
  console.log(`收到新闻: ${data.title}`);
});

pubsub.publish('news', { title: 'Node.js发布新版本' });
unsubscribe(); // 取消订阅

实际应用场景

  1. 微服务通信:服务间通过消息总线通信
// 订单服务
eventBus.publish('order_created', {
  orderId: '123',
  userId: 'user456'
});

// 用户服务
eventBus.subscribe('order_created', (data) => {
  updateUserOrderHistory(data.userId, data.orderId);
});
  1. 实时通知系统
// WebSocket服务
socket.on('connection', (client) => {
  const userId = getUserId(client);
  eventBus.subscribe(`user_${userId}`, (message) => {
    client.send(JSON.stringify(message));
  });
});

// 业务代码
eventBus.publish(`user_123`, {
  type: 'MESSAGE',
  content: '您有新消息'
});
  1. 插件系统事件钩子
// 核心系统
class CoreSystem {
  constructor() {
    this.hooks = {
      beforeSave: new AsyncSeriesHook(['data']),
      afterSave: new SyncHook(['savedData'])
    };
  }
}

// 插件
coreSystem.hooks.beforeSave.tap('validationPlugin', (data) => {
  if (!data.valid) throw new Error('数据无效');
});

性能考量与最佳实践

  1. 避免内存泄漏
// 错误示范
class LeakyClass {
  constructor(eventEmitter) {
    this.handler = (data) => { /*...*/ };
    eventEmitter.on('event', this.handler);
  }
}

// 正确做法
class SafeClass {
  constructor(eventEmitter) {
    this._cleanup = () => eventEmitter.off('event', this.handler);
    this.handler = (data) => { /*...*/ };
    eventEmitter.on('event', this.handler);
  }
  
  destroy() {
    this._cleanup();
  }
}
  1. 错误处理策略
// 全局错误处理
eventBus.subscribe('error', (err) => {
  monitoringService.report(err);
});

// 带错误传播的发布方法
publishWithErrorHandling(channel, data) {
  try {
    this.publish(channel, data);
  } catch (e) {
    this.publish('error', e);
  }
}
  1. 性能优化技巧
// 使用Set代替数组存储监听器
this.listeners = new Set();

// 批量发布优化
batchPublish(events) {
  const channels = new Set();
  events.forEach(event => channels.add(event.channel));
  
  channels.forEach(channel => {
    const relatedEvents = events.filter(e => e.channel === channel);
    this.publish(channel, relatedEvents);
  });
}

与其他模式的对比

  1. 与观察者模式的区别
// 观察者模式需要明确知道观察目标
class Subject {
  constructor() {
    this.observers = [];
  }
  
  addObserver(observer) {
    this.observers.push(observer);
  }
  
  notify(data) {
    this.observers.forEach(observer => observer.update(data));
  }
}

// 发布订阅模式完全解耦
eventBus.subscribe('event', callback);
eventBus.publish('event', data);
  1. 与中间件模式的结合
// 带中间件的发布订阅系统
publish(channel, data) {
  const middlewarePipeline = this.middlewares[channel] || [];
  
  const runner = (index) => {
    if (index >= middlewarePipeline.length) {
      return this._realPublish(channel, data);
    }
    
    middlewarePipeline[index](data, () => runner(index + 1));
  };
  
  runner(0);
}

现代JavaScript中的变体

  1. React Context中的发布订阅
const EventContext = createContext();

function EventProvider({ children }) {
  const listeners = useRef({});
  
  const subscribe = (event, callback) => {
    listeners.current[event] = listeners.current[event] || [];
    listeners.current[event].push(callback);
  };
  
  const publish = (event, data) => {
    (listeners.current[event] || []).forEach(cb => cb(data));
  };
  
  return (
    <EventContext.Provider value={{ subscribe, publish }}>
      {children}
    </EventContext.Provider>
  );
}
  1. RxJS中的实现
const subject = new Subject();

// 订阅
const subscription = subject.subscribe({
  next: (v) => console.log(`收到: ${v}`)
});

// 发布
subject.next('消息1');
subject.next('消息2');

// 取消订阅
subscription.unsubscribe();

消息协议的扩展

  1. 支持元数据的消息格式
{
  "id": "msg_123456",
  "timestamp": 1620000000,
  "channel": "user_updates",
  "payload": {
    "userId": "usr_789",
    "action": "profile_update"
  },
  "metadata": {
    "source": "auth_service",
    "priority": "high"
  }
}
  1. 带确认机制的发布
async publishWithAck(channel, message, timeout = 5000) {
  return new Promise((resolve, reject) => {
    const ackChannel = `${channel}.ack.${message.id}`;
    let timer;
    
    const cleanup = () => {
      this.unsubscribe(ackChannel, handler);
      clearTimeout(timer);
    };
    
    const handler = (ack) => {
      cleanup();
      resolve(ack);
    };
    
    this.subscribe(ackChannel, handler);
    this.publish(channel, message);
    
    timer = setTimeout(() => {
      cleanup();
      reject(new Error('确认超时'));
    }, timeout);
  });
}

测试策略

  1. 单元测试示例
describe('PubSub系统', () => {
  let pubsub;
  
  beforeEach(() => {
    pubsub = new PubSub();
  });
  
  it('应该接收已订阅频道的消息', () => {
    const mockCallback = jest.fn();
    pubsub.subscribe('test', mockCallback);
    
    pubsub.publish('test', 'message');
    
    expect(mockCallback).toHaveBeenCalledWith('message');
  });
  
  it('不应接收未订阅频道的消息', () => {
    const mockCallback = jest.fn();
    pubsub.subscribe('test1', mockCallback);
    
    pubsub.publish('test2', 'message');
    
    expect(mockCallback).not.toHaveBeenCalled();
  });
});
  1. 性能测试方案
const { performance } = require('perf_hooks');

function runBenchmark() {
  const pubsub = new PubSub();
  const start = performance.now();
  const count = 100000;
  
  // 注册大量监听器
  for (let i = 0; i < count; i++) {
    pubsub.subscribe(`channel_${i % 10}`, () => {});
  }
  
  // 触发事件
  const publishStart = performance.now();
  for (let i = 0; i < count; i++) {
    pubsub.publish(`channel_${i % 10}`, { data: i });
  }
  
  console.log(`注册耗时: ${publishStart - start}ms`);
  console.log(`发布耗时: ${performance.now() - publishStart}ms`);
}

浏览器环境中的实现

  1. 基于CustomEvent的实现
class BrowserPubSub {
  constructor() {
    this.target = new EventTarget();
  }
  
  subscribe(event, callback) {
    const handler = (e) => callback(e.detail);
    this.target.addEventListener(event, handler);
    return () => this.target.removeEventListener(event, handler);
  }
  
  publish(event, data) {
    this.target.dispatchEvent(new CustomEvent(event, { detail: data }));
  }
}

// 使用示例
const pubsub = new BrowserPubSub();
const unsubscribe = pubsub.subscribe('click', (data) => {
  console.log('点击事件:', data);
});

document.body.addEventListener('click', () => {
  pubsub.publish('click', { x: 10, y: 20 });
});
  1. 跨标签页通信
// 主页面
const channel = new BroadcastChannel('app_events');
channel.postMessage({
  type: 'DATA_UPDATE',
  payload: { /*...*/ }
});

// 其他标签页
const channel = new BroadcastChannel('app_events');
channel.onmessage = (event) => {
  if (event.data.type === 'DATA_UPDATE') {
    updateUI(event.data.payload);
  }
};

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

上一篇:事件发射器模式

下一篇:生成器与协程

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌