阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > Express在物联网中的应用

Express在物联网中的应用

作者:陈川 阅读数:64146人阅读 分类: Node.js

Express作为Node.js的一个轻量级Web框架,以其简洁、灵活的特性在物联网(IoT)领域展现出强大的适配能力。无论是构建设备管理后台、处理传感器数据流,还是实现设备间的实时通信,Express都能通过中间件、路由和RESTful API等机制高效完成任务。

Express与物联网架构的适配性

物联网系统通常采用分层架构,包括设备层、网关层、云平台层和应用层。Express主要作用于网关层和云平台层,承担协议转换、数据聚合和API暴露等关键功能。其非阻塞I/O模型特别适合处理高并发的设备连接请求。

例如,一个智能农业系统可能同时有数千个传感器上报数据。通过Express构建的网关服务可以这样处理:

const express = require('express');
const bodyParser = require('body-parser');
const app = express();

app.use(bodyParser.json());

// 处理传感器数据上报
app.post('/api/sensor-data', (req, res) => {
  const { deviceId, temperature, humidity } = req.body;
  
  // 数据预处理
  const normalizedData = {
    deviceId,
    timestamp: Date.now(),
    metrics: { temperature: parseFloat(temperature), humidity: parseFloat(humidity) }
  };

  // 转发到消息队列
  messageQueue.publish('sensor-data', normalizedData);
  
  res.status(202).json({ status: 'accepted' });
});

app.listen(3000, () => console.log('Gateway running on port 3000'));

设备认证与安全控制

物联网设备的安全认证是核心需求。Express结合Passport.js等中间件可以实现多种认证方案:

  1. 设备证书认证
const passport = require('passport');
const ClientCertStrategy = require('passport-client-cert');

passport.use(new ClientCertStrategy((cert, done) => {
  DeviceModel.findByThumbprint(cert.thumbprint)
    .then(device => done(null, device || false));
}));

app.post('/api/telemetry', 
  passport.authenticate('client-cert', { session: false }),
  (req, res) => {
    // 处理已认证设备的数据
  }
);
  1. JWT令牌验证
const jwt = require('express-jwt');
const jwks = require('jwks-rsa');

const jwtCheck = jwt({
  secret: jwks.expressJwtSecret({
    jwksUri: 'https://your-domain/.well-known/jwks.json'
  }),
  algorithms: ['RS256']
});

app.use('/api/commands', jwtCheck);

实时通信方案

Express与WebSocket结合可以构建设备控制通道:

const WebSocket = require('ws');
const express = require('express');

const app = express();
const server = app.listen(3001);
const wss = new WebSocket.Server({ server });

wss.on('connection', (ws, req) => {
  const deviceId = req.url.split('=')[1];
  
  ws.on('message', (message) => {
    console.log(`Received from ${deviceId}: ${message}`);
    // 广播到控制台
    wss.clients.forEach(client => {
      if (client !== ws && client.readyState === WebSocket.OPEN) {
        client.send(`${deviceId}: ${message}`);
      }
    });
  });
});

// HTTP接口与WebSocket联动
app.get('/api/devices/:id/alert', (req, res) => {
  wss.clients.forEach(client => {
    if (client.deviceId === req.params.id) {
      client.send('ALERT: Check your sensors immediately');
    }
  });
  res.sendStatus(200);
});

数据流处理中间件

针对高频传感器数据,可以设计专用中间件:

const sensorDataMiddleware = (options = {}) => {
  return (req, res, next) => {
    const buffer = [];
    let lastFlush = Date.now();
    
    req.on('data', chunk => {
      buffer.push(chunk);
      
      // 达到批量处理条件
      if (buffer.length >= options.batchSize || 
          Date.now() - lastFlush > options.timeout) {
        processBatch(buffer);
        buffer.length = 0;
        lastFlush = Date.now();
      }
    });
    
    req.on('end', () => {
      if (buffer.length > 0) {
        processBatch(buffer);
      }
      next();
    });
  };
  
  function processBatch(batch) {
    // 数据批处理逻辑
  }
};

app.post('/sensor-stream', sensorDataMiddleware({ batchSize: 100, timeout: 1000 }), (req, res) => {
  res.status(200).send('Data stream processed');
});

设备影子服务实现

设备影子是物联网中的常见模式,Express可实现影子同步:

const deviceShadow = new Map();

// 设备上报状态
app.put('/shadow/:deviceId', (req, res) => {
  const { state } = req.body;
  deviceShadow.set(req.params.deviceId, {
    reported: state,
    metadata: { updated: Date.now() }
  });
  res.sendStatus(200);
});

// 应用查询影子状态
app.get('/shadow/:deviceId', (req, res) => {
  const shadow = deviceShadow.get(req.params.deviceId) || {};
  res.json({
    state: {
      desired: shadow.desired || {},
      reported: shadow.reported || {}
    },
    metadata: shadow.metadata || {}
  });
});

// 应用更新期望状态
app.patch('/shadow/:deviceId', (req, res) => {
  const shadow = deviceShadow.get(req.params.deviceId) || {};
  shadow.desired = req.body.state;
  shadow.metadata = { desiredUpdated: Date.now() };
  deviceShadow.set(req.params.deviceId, shadow);
  
  // 触发状态同步事件
  eventEmitter.emit('shadow-update', req.params.deviceId);
  res.sendStatus(200);
});

协议转换网关

Express可处理不同协议到HTTP的转换:

const mqtt = require('mqtt');
const coap = require('coap');

// MQTT到HTTP桥接
const mqttClient = mqtt.connect('mqtt://broker');
mqttClient.subscribe('devices/+/data');

mqttClient.on('message', (topic, payload) => {
  const deviceId = topic.split('/')[1];
  fetch('http://localhost:3000/api/ingest', {
    method: 'POST',
    body: JSON.stringify({ deviceId, data: JSON.parse(payload) })
  });
});

// CoAP到HTTP代理
app.coap = coap.createServer();
app.coap.on('request', (req, res) => {
  const url = `http://localhost:3000${req.url}`;
  fetch(url, {
    method: req.method,
    headers: req.headers
  }).then(proxyRes => {
    res.setOption('Content-Format', 'application/json');
    res.end(JSON.stringify(proxyRes.body));
  });
});
app.coap.listen(5683);

边缘计算场景

在边缘节点上运行Express处理本地计算:

const tf = require('@tensorflow/tfjs-node');

// 本地模型推理端点
app.post('/infer', async (req, res) => {
  const { sensorReadings } = req.body;
  
  const model = await tf.loadLayersModel('file://./edge-model/model.json');
  const input = tf.tensor2d([sensorReadings]);
  const output = model.predict(input);
  
  res.json({
    anomalyScore: output.dataSync()[0],
    processingTime: Date.now() - req.startTime
  });
});

// 添加请求计时中间件
app.use((req, res, next) => {
  req.startTime = Date.now();
  next();
});

大规模部署优化

针对海量设备连接需要优化Express配置:

const cluster = require('cluster');
const os = require('os');

if (cluster.isMaster) {
  // 启动工作进程集群
  os.cpus().forEach(() => cluster.fork());
} else {
  const app = express();
  
  // 调整事件循环监控
  const monitor = require('express-status-monitor');
  app.use(monitor({
    healthChecks: [{
      protocol: 'http',
      path: '/health',
      port: '3000'
    }]
  }));
  
  // 连接池管理
  const pool = new (require('pg').Pool)({
    max: 20, // 控制数据库连接数
    idleTimeoutMillis: 30000
  });
  
  app.get('/device/:id', async (req, res) => {
    const client = await pool.connect();
    try {
      const result = await client.query('SELECT * FROM devices WHERE id=$1', [req.params.id]);
      res.json(result.rows[0]);
    } finally {
      client.release();
    }
  });
  
  app.listen(3000);
}

设备管理API设计

完整的设备CRUD接口示例:

const express = require('express');
const router = express.Router();

router.route('/devices')
  .get(async (req, res) => {
    try {
      const { page = 1, limit = 50 } = req.query;
      const devices = await Device.find()
        .skip((page - 1) * limit)
        .limit(limit);
      res.json(devices);
    } catch (err) {
      res.status(500).json({ error: err.message });
    }
  })
  .post(async (req, res) => {
    try {
      const device = new Device(req.body);
      await device.save();
      res.status(201).json(device);
    } catch (err) {
      res.status(400).json({ error: err.message });
    }
  });

router.route('/devices/:id')
  .get(async (req, res) => {
    try {
      const device = await Device.findById(req.params.id);
      if (!device) return res.status(404).end();
      res.json(device);
    } catch (err) {
      res.status(500).json({ error: err.message });
    }
  })
  .put(async (req, res) => {
    try {
      const device = await Device.findByIdAndUpdate(
        req.params.id,
        req.body,
        { new: true }
      );
      if (!device) return res.status(404).end();
      res.json(device);
    } catch (err) {
      res.status(400).json({ error: err.message });
    }
  })
  .delete(async (req, res) => {
    try {
      await Device.findByIdAndDelete(req.params.id);
      res.status(204).end();
    } catch (err) {
      res.status(500).json({ error: err.message });
    }
  });

// 设备命令接口
router.post('/devices/:id/commands', async (req, res) => {
  const command = {
    type: req.body.command,
    parameters: req.body.params || {},
    issuedAt: new Date()
  };
  
  await CommandQueue.publish({
    deviceId: req.params.id,
    command
  });
  
  res.status(202).json({
    commandId: command._id,
    status: 'queued'
  });
});

性能监控与日志

集成监控工具链的配置示例:

const { createLogger, transports } = require('winston');
const { ElasticsearchTransport } = require('winston-elasticsearch');

// 创建ELK日志收集器
const esTransport = new ElasticsearchTransport({
  level: 'info',
  clientOpts: { node: 'http://elastic:9200' }
});

const logger = createLogger({
  transports: [
    new transports.Console(),
    esTransport
  ]
});

// 请求日志中间件
app.use((req, res, next) => {
  const start = Date.now();
  res.on('finish', () => {
    logger.info({
      method: req.method,
      url: req.url,
      status: res.statusCode,
      responseTime: Date.now() - start,
      deviceId: req.headers['x-device-id']
    });
  });
  next();
});

// 异常处理
app.use((err, req, res, next) => {
  logger.error({
    error: err.stack,
    request: {
      method: req.method,
      url: req.url,
      headers: req.headers
    }
  });
  
  res.status(500).json({
    error: 'Internal Server Error',
    requestId: req.id
  });
});

固件OTA更新服务

实现设备固件空中升级接口:

const multer = require('multer');
const upload = multer({ storage: multer.memoryStorage() });

// 上传新固件
app.post('/firmware', upload.single('firmware'), async (req, res) => {
  const { version, deviceType } = req.body;
  
  const firmware = new Firmware({
    version,
    deviceType,
    binary: req.file.buffer,
    checksum: crypto.createHash('sha256').update(req.file.buffer).digest('hex')
  });
  
  await firmware.save();
  
  // 触发设备更新通知
  mqttClient.publish(`ota/${deviceType}`, JSON.stringify({
    version,
    url: `https://your-api/firmware/${firmware._id}/download`,
    checksum: firmware.checksum
  }));
  
  res.status(201).json(firmware);
});

// 设备查询更新
app.get('/ota/:deviceType', async (req, res) => {
  const latest = await Firmware.findOne({ deviceType: req.params.deviceType })
    .sort('-version')
    .limit(1);
  
  if (!latest) return res.status(404).end();
  
  res.json({
    version: latest.version,
    url: `https://your-api/firmware/${latest._id}/download`,
    checksum: latest.checksum,
    size: latest.binary.length
  });
});

// 固件下载
app.get('/firmware/:id/download', async (req, res) => {
  const firmware = await Firmware.findById(req.params.id);
  if (!firmware) return res.status(404).end();
  
  res.set({
    'Content-Type': 'application/octet-stream',
    'Content-Disposition': `attachment; filename=${firmware.deviceType}_v${firmware.version}.bin`,
    'Content-Length': firmware.binary.length,
    'X-Checksum-SHA256': firmware.checksum
  });
  
  res.end(firmware.binary);
});

地理位置数据处理

处理带有地理坐标的设备数据:

const { Point } = require('geojson');

app.post('/telemetry', async (req, res) => {
  const { deviceId, coords, ...metrics } = req.body;
  
  const point = new Point([coords.longitude, coords.latitude]);
  
  await Telemetry.create({
    deviceId,
    location: point,
    metrics,
    timestamp: new Date()
  });
  
  // 空间查询示例
  const nearbyDevices = await Telemetry.find({
    location: {
      $near: {
        $geometry: point,
        $maxDistance: 1000 // 1公里范围内
      }
    },
    timestamp: { $gte: new Date(Date.now() - 3600000) }
  }).limit(10);
  
  res.json({ nearbyDevices });
});

// 地理围栏触发
app.post('/geofences', async (req, res) => {
  const { deviceId, geofence } = req.body;
  
  const fence = new Geofence({
    deviceId,
    geometry: geofence,
    created: new Date()
  });
  
  await fence.save();
  
  // 启动围栏监控
  geofenceMonitor.watch(deviceId, geofence, (event) => {
    websocketServer.emitToDevice(deviceId, 'geofence', event);
  });
  
  res.status(201).json(fence);
});

时序数据库集成

连接InfluxDB等时序数据库的示例:

const { InfluxDB, Point } = require('@influxdata/influxdb-client');

const influx = new InfluxDB({
  url: process.env.INFLUX_URL,
  token: process.env.INFLUX_TOKEN
});

const writeApi = influx.getWriteApi('iot', 'sensors');

app.post('/sensor-data', (req, res) => {
  const point = new Point('sensor_reading')
    .tag('device_id', req.body.deviceId)
    .tag('sensor_type', req.body.sensorType)
    .floatField('value', req.body.value)
    .timestamp(new Date(req.body.timestamp));
  
  writeApi.writePoint(point);
  
  res.sendStatus(202);
});

// 查询时序数据
app.get('/sensor-history', async (req, res) => {
  const queryApi = influx.getQueryApi('iot');
  const fluxQuery = `
    from(bucket: "sensors")
      |> range(start: -1h)
      |> filter(fn: (r) => r.device_id == "${req.query.deviceId}")
      |> aggregateWindow(every: 1m, fn: mean)
  `;
  
  const results = [];
  for await (const { values, tableMeta } of queryApi.iterateRows(fluxQuery)) {
    results.push(tableMeta.toObject(values));
  }
  
  res.json(results);
});

规则引擎集成

实现基于规则的设备自动化:

const { Engine } = require('json-rules-engine');

// 创建规则引擎实例
const engine = new Engine();

// 添加温度告警规则
engine.addRule({
  conditions: {
    all: [{
      fact: 'temperature',
      operator: 'greaterThan',
      value: 30
    }, {
      fact: 'deviceType',
      operator: 'equal',
      value: 'freezer'
    }]
  },
  event: {
    type: 'temperature-alert',
    params: {
      message: 'Freezer temperature too high!'
    }
  }
});

// 规则评估端点
app.post('/evaluate-rules', async (req, res) => {
  const { deviceId, facts } = req.body;
  
  // 获取设备元数据
  const device = await Device.findById(deviceId);
  facts.deviceType = device.type;
  
  // 执行规则
  const { events } = await engine.run(f

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌