Express在物联网中的应用
Express作为Node.js的一个轻量级Web框架,以其简洁、灵活的特性在物联网(IoT)领域展现出强大的适配能力。无论是构建设备管理后台、处理传感器数据流,还是实现设备间的实时通信,Express都能通过中间件、路由和RESTful API等机制高效完成任务。
Express与物联网架构的适配性
物联网系统通常采用分层架构,包括设备层、网关层、云平台层和应用层。Express主要作用于网关层和云平台层,承担协议转换、数据聚合和API暴露等关键功能。其非阻塞I/O模型特别适合处理高并发的设备连接请求。
例如,一个智能农业系统可能同时有数千个传感器上报数据。通过Express构建的网关服务可以这样处理:
const express = require('express');
const bodyParser = require('body-parser');
const app = express();
app.use(bodyParser.json());
// 处理传感器数据上报
app.post('/api/sensor-data', (req, res) => {
const { deviceId, temperature, humidity } = req.body;
// 数据预处理
const normalizedData = {
deviceId,
timestamp: Date.now(),
metrics: { temperature: parseFloat(temperature), humidity: parseFloat(humidity) }
};
// 转发到消息队列
messageQueue.publish('sensor-data', normalizedData);
res.status(202).json({ status: 'accepted' });
});
app.listen(3000, () => console.log('Gateway running on port 3000'));
设备认证与安全控制
物联网设备的安全认证是核心需求。Express结合Passport.js等中间件可以实现多种认证方案:
- 设备证书认证:
const passport = require('passport');
const ClientCertStrategy = require('passport-client-cert');
passport.use(new ClientCertStrategy((cert, done) => {
DeviceModel.findByThumbprint(cert.thumbprint)
.then(device => done(null, device || false));
}));
app.post('/api/telemetry',
passport.authenticate('client-cert', { session: false }),
(req, res) => {
// 处理已认证设备的数据
}
);
- JWT令牌验证:
const jwt = require('express-jwt');
const jwks = require('jwks-rsa');
const jwtCheck = jwt({
secret: jwks.expressJwtSecret({
jwksUri: 'https://your-domain/.well-known/jwks.json'
}),
algorithms: ['RS256']
});
app.use('/api/commands', jwtCheck);
实时通信方案
Express与WebSocket结合可以构建设备控制通道:
const WebSocket = require('ws');
const express = require('express');
const app = express();
const server = app.listen(3001);
const wss = new WebSocket.Server({ server });
wss.on('connection', (ws, req) => {
const deviceId = req.url.split('=')[1];
ws.on('message', (message) => {
console.log(`Received from ${deviceId}: ${message}`);
// 广播到控制台
wss.clients.forEach(client => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(`${deviceId}: ${message}`);
}
});
});
});
// HTTP接口与WebSocket联动
app.get('/api/devices/:id/alert', (req, res) => {
wss.clients.forEach(client => {
if (client.deviceId === req.params.id) {
client.send('ALERT: Check your sensors immediately');
}
});
res.sendStatus(200);
});
数据流处理中间件
针对高频传感器数据,可以设计专用中间件:
const sensorDataMiddleware = (options = {}) => {
return (req, res, next) => {
const buffer = [];
let lastFlush = Date.now();
req.on('data', chunk => {
buffer.push(chunk);
// 达到批量处理条件
if (buffer.length >= options.batchSize ||
Date.now() - lastFlush > options.timeout) {
processBatch(buffer);
buffer.length = 0;
lastFlush = Date.now();
}
});
req.on('end', () => {
if (buffer.length > 0) {
processBatch(buffer);
}
next();
});
};
function processBatch(batch) {
// 数据批处理逻辑
}
};
app.post('/sensor-stream', sensorDataMiddleware({ batchSize: 100, timeout: 1000 }), (req, res) => {
res.status(200).send('Data stream processed');
});
设备影子服务实现
设备影子是物联网中的常见模式,Express可实现影子同步:
const deviceShadow = new Map();
// 设备上报状态
app.put('/shadow/:deviceId', (req, res) => {
const { state } = req.body;
deviceShadow.set(req.params.deviceId, {
reported: state,
metadata: { updated: Date.now() }
});
res.sendStatus(200);
});
// 应用查询影子状态
app.get('/shadow/:deviceId', (req, res) => {
const shadow = deviceShadow.get(req.params.deviceId) || {};
res.json({
state: {
desired: shadow.desired || {},
reported: shadow.reported || {}
},
metadata: shadow.metadata || {}
});
});
// 应用更新期望状态
app.patch('/shadow/:deviceId', (req, res) => {
const shadow = deviceShadow.get(req.params.deviceId) || {};
shadow.desired = req.body.state;
shadow.metadata = { desiredUpdated: Date.now() };
deviceShadow.set(req.params.deviceId, shadow);
// 触发状态同步事件
eventEmitter.emit('shadow-update', req.params.deviceId);
res.sendStatus(200);
});
协议转换网关
Express可处理不同协议到HTTP的转换:
const mqtt = require('mqtt');
const coap = require('coap');
// MQTT到HTTP桥接
const mqttClient = mqtt.connect('mqtt://broker');
mqttClient.subscribe('devices/+/data');
mqttClient.on('message', (topic, payload) => {
const deviceId = topic.split('/')[1];
fetch('http://localhost:3000/api/ingest', {
method: 'POST',
body: JSON.stringify({ deviceId, data: JSON.parse(payload) })
});
});
// CoAP到HTTP代理
app.coap = coap.createServer();
app.coap.on('request', (req, res) => {
const url = `http://localhost:3000${req.url}`;
fetch(url, {
method: req.method,
headers: req.headers
}).then(proxyRes => {
res.setOption('Content-Format', 'application/json');
res.end(JSON.stringify(proxyRes.body));
});
});
app.coap.listen(5683);
边缘计算场景
在边缘节点上运行Express处理本地计算:
const tf = require('@tensorflow/tfjs-node');
// 本地模型推理端点
app.post('/infer', async (req, res) => {
const { sensorReadings } = req.body;
const model = await tf.loadLayersModel('file://./edge-model/model.json');
const input = tf.tensor2d([sensorReadings]);
const output = model.predict(input);
res.json({
anomalyScore: output.dataSync()[0],
processingTime: Date.now() - req.startTime
});
});
// 添加请求计时中间件
app.use((req, res, next) => {
req.startTime = Date.now();
next();
});
大规模部署优化
针对海量设备连接需要优化Express配置:
const cluster = require('cluster');
const os = require('os');
if (cluster.isMaster) {
// 启动工作进程集群
os.cpus().forEach(() => cluster.fork());
} else {
const app = express();
// 调整事件循环监控
const monitor = require('express-status-monitor');
app.use(monitor({
healthChecks: [{
protocol: 'http',
path: '/health',
port: '3000'
}]
}));
// 连接池管理
const pool = new (require('pg').Pool)({
max: 20, // 控制数据库连接数
idleTimeoutMillis: 30000
});
app.get('/device/:id', async (req, res) => {
const client = await pool.connect();
try {
const result = await client.query('SELECT * FROM devices WHERE id=$1', [req.params.id]);
res.json(result.rows[0]);
} finally {
client.release();
}
});
app.listen(3000);
}
设备管理API设计
完整的设备CRUD接口示例:
const express = require('express');
const router = express.Router();
router.route('/devices')
.get(async (req, res) => {
try {
const { page = 1, limit = 50 } = req.query;
const devices = await Device.find()
.skip((page - 1) * limit)
.limit(limit);
res.json(devices);
} catch (err) {
res.status(500).json({ error: err.message });
}
})
.post(async (req, res) => {
try {
const device = new Device(req.body);
await device.save();
res.status(201).json(device);
} catch (err) {
res.status(400).json({ error: err.message });
}
});
router.route('/devices/:id')
.get(async (req, res) => {
try {
const device = await Device.findById(req.params.id);
if (!device) return res.status(404).end();
res.json(device);
} catch (err) {
res.status(500).json({ error: err.message });
}
})
.put(async (req, res) => {
try {
const device = await Device.findByIdAndUpdate(
req.params.id,
req.body,
{ new: true }
);
if (!device) return res.status(404).end();
res.json(device);
} catch (err) {
res.status(400).json({ error: err.message });
}
})
.delete(async (req, res) => {
try {
await Device.findByIdAndDelete(req.params.id);
res.status(204).end();
} catch (err) {
res.status(500).json({ error: err.message });
}
});
// 设备命令接口
router.post('/devices/:id/commands', async (req, res) => {
const command = {
type: req.body.command,
parameters: req.body.params || {},
issuedAt: new Date()
};
await CommandQueue.publish({
deviceId: req.params.id,
command
});
res.status(202).json({
commandId: command._id,
status: 'queued'
});
});
性能监控与日志
集成监控工具链的配置示例:
const { createLogger, transports } = require('winston');
const { ElasticsearchTransport } = require('winston-elasticsearch');
// 创建ELK日志收集器
const esTransport = new ElasticsearchTransport({
level: 'info',
clientOpts: { node: 'http://elastic:9200' }
});
const logger = createLogger({
transports: [
new transports.Console(),
esTransport
]
});
// 请求日志中间件
app.use((req, res, next) => {
const start = Date.now();
res.on('finish', () => {
logger.info({
method: req.method,
url: req.url,
status: res.statusCode,
responseTime: Date.now() - start,
deviceId: req.headers['x-device-id']
});
});
next();
});
// 异常处理
app.use((err, req, res, next) => {
logger.error({
error: err.stack,
request: {
method: req.method,
url: req.url,
headers: req.headers
}
});
res.status(500).json({
error: 'Internal Server Error',
requestId: req.id
});
});
固件OTA更新服务
实现设备固件空中升级接口:
const multer = require('multer');
const upload = multer({ storage: multer.memoryStorage() });
// 上传新固件
app.post('/firmware', upload.single('firmware'), async (req, res) => {
const { version, deviceType } = req.body;
const firmware = new Firmware({
version,
deviceType,
binary: req.file.buffer,
checksum: crypto.createHash('sha256').update(req.file.buffer).digest('hex')
});
await firmware.save();
// 触发设备更新通知
mqttClient.publish(`ota/${deviceType}`, JSON.stringify({
version,
url: `https://your-api/firmware/${firmware._id}/download`,
checksum: firmware.checksum
}));
res.status(201).json(firmware);
});
// 设备查询更新
app.get('/ota/:deviceType', async (req, res) => {
const latest = await Firmware.findOne({ deviceType: req.params.deviceType })
.sort('-version')
.limit(1);
if (!latest) return res.status(404).end();
res.json({
version: latest.version,
url: `https://your-api/firmware/${latest._id}/download`,
checksum: latest.checksum,
size: latest.binary.length
});
});
// 固件下载
app.get('/firmware/:id/download', async (req, res) => {
const firmware = await Firmware.findById(req.params.id);
if (!firmware) return res.status(404).end();
res.set({
'Content-Type': 'application/octet-stream',
'Content-Disposition': `attachment; filename=${firmware.deviceType}_v${firmware.version}.bin`,
'Content-Length': firmware.binary.length,
'X-Checksum-SHA256': firmware.checksum
});
res.end(firmware.binary);
});
地理位置数据处理
处理带有地理坐标的设备数据:
const { Point } = require('geojson');
app.post('/telemetry', async (req, res) => {
const { deviceId, coords, ...metrics } = req.body;
const point = new Point([coords.longitude, coords.latitude]);
await Telemetry.create({
deviceId,
location: point,
metrics,
timestamp: new Date()
});
// 空间查询示例
const nearbyDevices = await Telemetry.find({
location: {
$near: {
$geometry: point,
$maxDistance: 1000 // 1公里范围内
}
},
timestamp: { $gte: new Date(Date.now() - 3600000) }
}).limit(10);
res.json({ nearbyDevices });
});
// 地理围栏触发
app.post('/geofences', async (req, res) => {
const { deviceId, geofence } = req.body;
const fence = new Geofence({
deviceId,
geometry: geofence,
created: new Date()
});
await fence.save();
// 启动围栏监控
geofenceMonitor.watch(deviceId, geofence, (event) => {
websocketServer.emitToDevice(deviceId, 'geofence', event);
});
res.status(201).json(fence);
});
时序数据库集成
连接InfluxDB等时序数据库的示例:
const { InfluxDB, Point } = require('@influxdata/influxdb-client');
const influx = new InfluxDB({
url: process.env.INFLUX_URL,
token: process.env.INFLUX_TOKEN
});
const writeApi = influx.getWriteApi('iot', 'sensors');
app.post('/sensor-data', (req, res) => {
const point = new Point('sensor_reading')
.tag('device_id', req.body.deviceId)
.tag('sensor_type', req.body.sensorType)
.floatField('value', req.body.value)
.timestamp(new Date(req.body.timestamp));
writeApi.writePoint(point);
res.sendStatus(202);
});
// 查询时序数据
app.get('/sensor-history', async (req, res) => {
const queryApi = influx.getQueryApi('iot');
const fluxQuery = `
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r.device_id == "${req.query.deviceId}")
|> aggregateWindow(every: 1m, fn: mean)
`;
const results = [];
for await (const { values, tableMeta } of queryApi.iterateRows(fluxQuery)) {
results.push(tableMeta.toObject(values));
}
res.json(results);
});
规则引擎集成
实现基于规则的设备自动化:
const { Engine } = require('json-rules-engine');
// 创建规则引擎实例
const engine = new Engine();
// 添加温度告警规则
engine.addRule({
conditions: {
all: [{
fact: 'temperature',
operator: 'greaterThan',
value: 30
}, {
fact: 'deviceType',
operator: 'equal',
value: 'freezer'
}]
},
event: {
type: 'temperature-alert',
params: {
message: 'Freezer temperature too high!'
}
}
});
// 规则评估端点
app.post('/evaluate-rules', async (req, res) => {
const { deviceId, facts } = req.body;
// 获取设备元数据
const device = await Device.findById(deviceId);
facts.deviceType = device.type;
// 执行规则
const { events } = await engine.run(f
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:Express与微服务架构
下一篇:实时通信方案集成