发布/订阅模式
发布/订阅模式的核心概念
发布/订阅模式(Pub/Sub)是一种消息传递范式,消息发送者(发布者)不直接将消息发送给特定接收者(订阅者),而是将消息分类发布到特定频道。订阅者可以订阅一个或多个频道,只接收感兴趣的消息。这种模式实现了发布者和订阅者的完全解耦,发布者无需知道订阅者的存在,订阅者也无需关心消息来源。
在Node.js中,发布/订阅模式常用于事件处理、实时通信和微服务架构。典型的实现包括:
- 进程内事件总线(如EventEmitter)
- 分布式消息队列(如Redis Pub/Sub)
- WebSocket实时通信
Node.js中的EventEmitter实现
Node.js内置的events
模块提供了基础的发布/订阅实现。以下是一个完整示例:
const EventEmitter = require('events');
class OrderService extends EventEmitter {
constructor() {
super();
// 模拟订单创建
this.createOrder = (orderData) => {
// 业务逻辑...
this.emit('order_created', orderData);
};
}
}
const orderService = new OrderService();
// 订阅订单创建事件
orderService.on('order_created', (order) => {
console.log(`[邮件服务] 订单创建通知: ${order.id}`);
});
// 另一个订阅者
orderService.on('order_created', (order) => {
console.log(`[库存服务] 扣减库存: ${order.items}`);
});
// 触发事件
orderService.createOrder({
id: '1001',
items: ['商品A', '商品B'],
total: 299.99
});
这种实现方式的特性包括:
- 同步事件触发:emit()会同步调用所有监听器
- 支持一次性监听器:使用once()方法
- 可以获取监听器数量:listenerCount()
- 支持错误事件特殊处理:'error'事件
Redis的Pub/Sub实现
对于分布式系统,可以使用Redis的发布订阅功能:
const redis = require('redis');
const publisher = redis.createClient();
const subscriber = redis.createClient();
// 订阅者
subscriber.on('message', (channel, message) => {
console.log(`收到 ${channel} 频道的消息: ${message}`);
});
subscriber.subscribe('notifications');
// 发布者
publisher.publish('notifications', '系统将于今晚升级');
Redis Pub/Sub的特点:
- 跨进程/跨服务器通信
- 频道支持模式匹配(PSUBSCRIBE)
- 消息不会持久化
- 适合广播场景
高级模式实现
更复杂的发布订阅系统可以实现以下特性:
class AdvancedPubSub {
constructor() {
this.channels = new Map();
}
subscribe(channel, callback) {
if (!this.channels.has(channel)) {
this.channels.set(channel, new Set());
}
this.channels.get(channel).add(callback);
return () => this.unsubscribe(channel, callback);
}
unsubscribe(channel, callback) {
if (this.channels.has(channel)) {
this.channels.get(channel).delete(callback);
}
}
publish(channel, data) {
if (this.channels.has(channel)) {
this.channels.get(channel).forEach(callback => {
try {
callback(data);
} catch (e) {
console.error(`回调执行错误: ${e}`);
}
});
}
}
}
// 使用示例
const pubsub = new AdvancedPubSub();
const unsubscribe = pubsub.subscribe('news', (data) => {
console.log(`收到新闻: ${data.title}`);
});
pubsub.publish('news', { title: 'Node.js发布新版本' });
unsubscribe(); // 取消订阅
实际应用场景
- 微服务通信:服务间通过消息总线通信
// 订单服务
eventBus.publish('order_created', {
orderId: '123',
userId: 'user456'
});
// 用户服务
eventBus.subscribe('order_created', (data) => {
updateUserOrderHistory(data.userId, data.orderId);
});
- 实时通知系统:
// WebSocket服务
socket.on('connection', (client) => {
const userId = getUserId(client);
eventBus.subscribe(`user_${userId}`, (message) => {
client.send(JSON.stringify(message));
});
});
// 业务代码
eventBus.publish(`user_123`, {
type: 'MESSAGE',
content: '您有新消息'
});
- 插件系统事件钩子:
// 核心系统
class CoreSystem {
constructor() {
this.hooks = {
beforeSave: new AsyncSeriesHook(['data']),
afterSave: new SyncHook(['savedData'])
};
}
}
// 插件
coreSystem.hooks.beforeSave.tap('validationPlugin', (data) => {
if (!data.valid) throw new Error('数据无效');
});
性能考量与最佳实践
- 避免内存泄漏:
// 错误示范
class LeakyClass {
constructor(eventEmitter) {
this.handler = (data) => { /*...*/ };
eventEmitter.on('event', this.handler);
}
}
// 正确做法
class SafeClass {
constructor(eventEmitter) {
this._cleanup = () => eventEmitter.off('event', this.handler);
this.handler = (data) => { /*...*/ };
eventEmitter.on('event', this.handler);
}
destroy() {
this._cleanup();
}
}
- 错误处理策略:
// 全局错误处理
eventBus.subscribe('error', (err) => {
monitoringService.report(err);
});
// 带错误传播的发布方法
publishWithErrorHandling(channel, data) {
try {
this.publish(channel, data);
} catch (e) {
this.publish('error', e);
}
}
- 性能优化技巧:
// 使用Set代替数组存储监听器
this.listeners = new Set();
// 批量发布优化
batchPublish(events) {
const channels = new Set();
events.forEach(event => channels.add(event.channel));
channels.forEach(channel => {
const relatedEvents = events.filter(e => e.channel === channel);
this.publish(channel, relatedEvents);
});
}
与其他模式的对比
- 与观察者模式的区别:
// 观察者模式需要明确知道观察目标
class Subject {
constructor() {
this.observers = [];
}
addObserver(observer) {
this.observers.push(observer);
}
notify(data) {
this.observers.forEach(observer => observer.update(data));
}
}
// 发布订阅模式完全解耦
eventBus.subscribe('event', callback);
eventBus.publish('event', data);
- 与中间件模式的结合:
// 带中间件的发布订阅系统
publish(channel, data) {
const middlewarePipeline = this.middlewares[channel] || [];
const runner = (index) => {
if (index >= middlewarePipeline.length) {
return this._realPublish(channel, data);
}
middlewarePipeline[index](data, () => runner(index + 1));
};
runner(0);
}
现代JavaScript中的变体
- React Context中的发布订阅:
const EventContext = createContext();
function EventProvider({ children }) {
const listeners = useRef({});
const subscribe = (event, callback) => {
listeners.current[event] = listeners.current[event] || [];
listeners.current[event].push(callback);
};
const publish = (event, data) => {
(listeners.current[event] || []).forEach(cb => cb(data));
};
return (
<EventContext.Provider value={{ subscribe, publish }}>
{children}
</EventContext.Provider>
);
}
- RxJS中的实现:
const subject = new Subject();
// 订阅
const subscription = subject.subscribe({
next: (v) => console.log(`收到: ${v}`)
});
// 发布
subject.next('消息1');
subject.next('消息2');
// 取消订阅
subscription.unsubscribe();
消息协议的扩展
- 支持元数据的消息格式:
{
"id": "msg_123456",
"timestamp": 1620000000,
"channel": "user_updates",
"payload": {
"userId": "usr_789",
"action": "profile_update"
},
"metadata": {
"source": "auth_service",
"priority": "high"
}
}
- 带确认机制的发布:
async publishWithAck(channel, message, timeout = 5000) {
return new Promise((resolve, reject) => {
const ackChannel = `${channel}.ack.${message.id}`;
let timer;
const cleanup = () => {
this.unsubscribe(ackChannel, handler);
clearTimeout(timer);
};
const handler = (ack) => {
cleanup();
resolve(ack);
};
this.subscribe(ackChannel, handler);
this.publish(channel, message);
timer = setTimeout(() => {
cleanup();
reject(new Error('确认超时'));
}, timeout);
});
}
测试策略
- 单元测试示例:
describe('PubSub系统', () => {
let pubsub;
beforeEach(() => {
pubsub = new PubSub();
});
it('应该接收已订阅频道的消息', () => {
const mockCallback = jest.fn();
pubsub.subscribe('test', mockCallback);
pubsub.publish('test', 'message');
expect(mockCallback).toHaveBeenCalledWith('message');
});
it('不应接收未订阅频道的消息', () => {
const mockCallback = jest.fn();
pubsub.subscribe('test1', mockCallback);
pubsub.publish('test2', 'message');
expect(mockCallback).not.toHaveBeenCalled();
});
});
- 性能测试方案:
const { performance } = require('perf_hooks');
function runBenchmark() {
const pubsub = new PubSub();
const start = performance.now();
const count = 100000;
// 注册大量监听器
for (let i = 0; i < count; i++) {
pubsub.subscribe(`channel_${i % 10}`, () => {});
}
// 触发事件
const publishStart = performance.now();
for (let i = 0; i < count; i++) {
pubsub.publish(`channel_${i % 10}`, { data: i });
}
console.log(`注册耗时: ${publishStart - start}ms`);
console.log(`发布耗时: ${performance.now() - publishStart}ms`);
}
浏览器环境中的实现
- 基于CustomEvent的实现:
class BrowserPubSub {
constructor() {
this.target = new EventTarget();
}
subscribe(event, callback) {
const handler = (e) => callback(e.detail);
this.target.addEventListener(event, handler);
return () => this.target.removeEventListener(event, handler);
}
publish(event, data) {
this.target.dispatchEvent(new CustomEvent(event, { detail: data }));
}
}
// 使用示例
const pubsub = new BrowserPubSub();
const unsubscribe = pubsub.subscribe('click', (data) => {
console.log('点击事件:', data);
});
document.body.addEventListener('click', () => {
pubsub.publish('click', { x: 10, y: 20 });
});
- 跨标签页通信:
// 主页面
const channel = new BroadcastChannel('app_events');
channel.postMessage({
type: 'DATA_UPDATE',
payload: { /*...*/ }
});
// 其他标签页
const channel = new BroadcastChannel('app_events');
channel.onmessage = (event) => {
if (event.data.type === 'DATA_UPDATE') {
updateUI(event.data.payload);
}
};
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn