Skip to content

内部 MQTT 设计

Atlas 主进程内部维护了一套内联 MQTT Broker,用于在南向采集、北向转发、调度下发之间传递消息。
这套 topic 只面向 Atlas 内部模块,不直接作为外部协议接口对外承诺。

设计目标

当前内部 MQTT 设计主要解决三类问题:

  1. 南向采集完成后,把一组点位数据广播给多个北向插件;
  2. 用统一的内部指标格式承载 node/group/device_sn/device_uuid/station_id 等上下文;
  3. 为调度与控制提供统一的内部下发总线。

Topic 分类

当前实现中,内部 topic 分为 2 类主通道:

类型Topic 规则主要用途
指标行协议direct/{device_sn}/publish向内部消费者广播 Telegraf Line Protocol
控制下发/telecontrol/{node}/{group}/{tag}向目标设备的可写点位下发控制值

其中:

  • device_sn:设备序列号,来自 DatastoreDevice.sn
  • device_uuid:设备 UUID,来自 DatastoreDevice.uuid
  • node:南向设备节点名称
  • group:点位组名称
  • tag:点位名称
  • station_id:电站站号,来自 DatastoreDevice.station_id

命名规则

1. 指标行协议 topic

text
direct/{device_sn}/publish

特点:

  • 是当前内部采集总线的唯一主通道;
  • topic path 使用设备序列号 device_sn
  • group 不出现在 topic path 中,而是作为 Line Protocol tag 进入消息体;
  • device_sn 会同时出现在 topic path 和 Line Protocol tag 中;
  • device_uuid 在设备映射存在时作为 Line Protocol tag 带出;
  • 便于 MQTT、DataStorage、InfluxDBv3、eKuiper、Modbus 等北向插件统一解析;
  • 也便于复用现有 Telegraf / Line Protocol 写库链路。

示例:

text
direct/inverter-01/publish

2. 控制下发 topic

text
/telecontrol/{node}/{group}/{tag}

特点:

  • 面向设备可写点位;
  • 粒度是“单点位单命令”;
  • 由页面、API 或调度插件发布,由南向驱动消费。

示例:

text
/telecontrol/inverter-01/group1/active_power_setpoint_kw

当前发布链路

南向采集 -> 内部 MQTT

南向驱动在采集到一组数据后,调用:

go
runtime.PublishGroupRead(read)

主程序 Jelly 在 PublishGroupRead 中会向内联 MQTT 发布一条 Line Protocol 消息:

text
direct/{device_sn}/publish

也就是说,一次组采集当前只对应一条内部消息,不再额外镜像结构化 groupread

如果当前 south node 能关联到 DatastoreDevice,Jelly 会:

  • DatastoreDevice.sn 作为 topic path 中的 device_sn
  • DatastoreDevice.station_id 作为 Line Protocol tag 发布出去

如果当前 node 没有映射到 DatastoreDevice,则回退使用 node_name 作为 publish topic path,避免内部消息丢失。

调度/页面 -> 内部 MQTT -> 南向写设备

控制链路当前统一走 /telecontrol/...

  1. 页面调用 POST /api/v2/telecontrol
  2. Jelly 校验 node/group/tag 后,向内联 MQTT 发布:
    • /telecontrol/{node}/{group}/{tag}
  3. modbus_tcpmodbus_rtu 等南向插件订阅对应 topic
  4. 驱动调用 WriteOnce(...) 把值写给设备

Scheduler 插件当前不直接消费采集 topic;它主要接收 eKuiper 回传的调度结果,再统一发布到 /telecontrol/...

当前订阅链路

北向应用常见订阅

当前需要消费内部采集数据的北向插件,统一从 direct/{device_sn}/publish 读取:

插件主要订阅
MQTTdirect/{device_sn}/publish
DataStoragedirect/{device_sn}/publish
InfluxDBv3direct/{device_sn}/publish
eKuiperdirect/{device_sn}/publish
Modbusdirect/{device_sn}/publish
Scheduler不直接消费采集 topic;主要处理 eKuiper 回传结果后再发布 /telecontrol/...
modbus_tcp订阅 /telecontrol/{node}/{group}/+
modbus_rtu订阅 /telecontrol/{node}/{group}/+

注意:

  • publish topic path 使用 device_sn,不是 node
  • group 过滤由北向插件在解码 Line Protocol 后,根据 tag 中的 group 字段和自身 subscription 再做内存过滤;
  • node 仍然保留在 Line Protocol tag 中,作为 south node 上下文;
  • 这也是当前 north 插件统一采用的实现方式。

Payload 设计

1. publish Line Protocol

direct/{device_sn}/publish 的消息体是 Telegraf Line Protocol 字节流。
其中 measurement 使用当前 Jelly 的 measurement 配置,tags 至少包含:

  • node
  • group
  • device_sn
  • device_uuid
  • station_id

示意:

text
jelly,node=inverter-tcp-01,group=group1,device_sn=inverter-01,device_uuid=5c9d6c30-7cb0-4d7e-9a42-0ce9f3d1e7ae,station_id=1 active_power_kw=74.237,power_factor=0.9814 1776316164481000000

字段含义:

  • node:南向节点名
  • group:组名
  • device_sn:设备序列号;默认与 topic path 中的 {device_sn} 保持一致
  • device_uuid:设备 UUID;如果当前节点没有关联 DatastoreDevice,该 tag 会缺省
  • station_id:电站站号;如果当前节点没有关联 DatastoreDevice,该 tag 会缺省
  • fields:本次组读到的点位值
  • timestamp:采集时间

2. telecontrol JSON

/telecontrol/{node}/{group}/{tag} 的消息体当前统一为:

json
{
  "value": 85.5
}

或者布尔值:

json
{
  "value": true
}

这样设计的原因是:

  • topic 已经唯一定位到目标点位;
  • payload 只需要携带最终控制值;
  • 便于页面、Scheduler、后续其他控制源复用同一套下发格式。

通配订阅规则

当前代码里已经约定了若干常用 filter:

用途Filter
所有采集 line protocoldirect/+/publish
指定设备采集 line protocoldirect/{device_sn}/publish
单节点所有 telecontrol/telecontrol/{node}/+/+
单节点单组 telecontrol/telecontrol/{node}/{group}/+

说明:

  • direct/{node}/{group}/groupread 相关 filter 目前只保留兼容解析意义,不再作为主链路设计推荐。

QoS 与保留策略

当前内部 MQTT 消息统一采用:

  • QoS = 0
  • retained = false

原因是:

  • 这些消息主要用于进程内实时转发;
  • 不要求 Broker 做持久保留;
  • 历史数据由 DataStorage / InfluxDBv3 负责保存,不依赖 retained message。

station_id 设计约定

station_id 是当前 Atlas 在内部 MQTT 总线上新增的站点维度,用于让北向插件、规则引擎和调度逻辑能够明确区分“同一套 Atlas 内部数据属于哪个电站”。

当前约定如下:

  • 来源:DatastoreDevice.station_id
  • 映射方式:优先按 DatastoreDevice.sn = south node name 查找,必要时再按 extra_json.node_name 回查
  • 发布位置:
    • direct/{device_sn}/publish:作为 topic path
    • direct/{device_sn}/publish:同时把 device_sndevice_uuidstation_id 作为 Line Protocol tag
  • 缺省行为:
    • 若节点尚未映射到 DatastoreDevice,则不强行伪造站号
    • 此时 publish 中不带 station_id
    • topic path 回退使用 node_name

设计原则

1. publish 是内部采集主格式

当前 south -> north 的内部采集总线已经统一收敛到:

  • direct/{device_sn}/publish

原因:

  • topic path 直接按设备维度组织;
  • group/device_sn/device_uuid/station_id 都可以稳定放进 tags;
  • node 继续作为 south 上下文保留在 tags 中;
  • 所有 north 插件统一解析一种格式,减少双轨维护成本;
  • 更适合继续复用现有 Line Protocol 生态。

2. groupread 不再作为 south 发布格式

虽然代码里仍保留了少量 groupread 兼容解析能力,但当前 south 插件已经不再发布:

  • direct/{node}/{group}/groupread

这意味着:

  • 新增 north 插件不应该再依赖 groupread
  • 现有 north 插件也统一改为消费 publish

3. telecontrol 以点位为最小粒度

控制 topic 不设计成“整组批量写”,而是按单点位下发:

text
/telecontrol/{node}/{group}/{tag}

这样可以:

  • 让控制边界更明确
  • 简化驱动侧解析逻辑
  • 方便审计和问题排查

仓库边界

当前项目已经拆成两个仓库:

  • github.com/fluxionwatt/neurongo 负责运行时基础库,包括服务装配、server、内置插件、utilserrorsmigrations
  • github.com/fluxionwatt/atlas 负责 atlas 可执行程序、配置文件解析、环境变量覆盖、daemon/pidfile/stop、内置 webui Gin 路由注入,以及发布打包壳层

这样可以把可复用运行时与 CLI/发布壳层拆开,同时保持现有 HTTP API、DB 格式与 migrations 历史不变。

总结

当前内部 MQTT 设计可以概括为:

  • direct/{device_sn}/publish:唯一的 south -> north 采集主通道
  • /telecontrol/{node}/{group}/{tag}:统一控制下发通道

其中 station_id 作为 publish 的标准 tag,补齐了电站维度;而 north 插件统一消费 publish,则进一步简化了 Atlas 内部“采集 -> 转发 -> 调度 -> 控制”的总线设计。