Appearance
内部 MQTT 设计
Atlas 主进程内部维护了一套内联 MQTT Broker,用于在南向采集、北向转发、调度下发之间传递消息。
这套 topic 只面向 Atlas 内部模块,不直接作为外部协议接口对外承诺。
设计目标
当前内部 MQTT 设计主要解决三类问题:
- 南向采集完成后,把一组点位数据广播给多个北向插件;
- 用统一的内部指标格式承载
node/group/device_sn/device_uuid/station_id等上下文; - 为调度与控制提供统一的内部下发总线。
Topic 分类
当前实现中,内部 topic 分为 2 类主通道:
| 类型 | Topic 规则 | 主要用途 |
|---|---|---|
| 指标行协议 | direct/{device_sn}/publish | 向内部消费者广播 Telegraf Line Protocol |
| 控制下发 | /telecontrol/{node}/{group}/{tag} | 向目标设备的可写点位下发控制值 |
其中:
device_sn:设备序列号,来自DatastoreDevice.sndevice_uuid:设备 UUID,来自DatastoreDevice.uuidnode:南向设备节点名称group:点位组名称tag:点位名称station_id:电站站号,来自DatastoreDevice.station_id
命名规则
1. 指标行协议 topic
text
direct/{device_sn}/publish特点:
- 是当前内部采集总线的唯一主通道;
- topic path 使用设备序列号
device_sn; group不出现在 topic path 中,而是作为 Line Protocol tag 进入消息体;device_sn会同时出现在 topic path 和 Line Protocol tag 中;device_uuid在设备映射存在时作为 Line Protocol tag 带出;- 便于 MQTT、DataStorage、InfluxDBv3、eKuiper、Modbus 等北向插件统一解析;
- 也便于复用现有 Telegraf / Line Protocol 写库链路。
示例:
text
direct/inverter-01/publish2. 控制下发 topic
text
/telecontrol/{node}/{group}/{tag}特点:
- 面向设备可写点位;
- 粒度是“单点位单命令”;
- 由页面、API 或调度插件发布,由南向驱动消费。
示例:
text
/telecontrol/inverter-01/group1/active_power_setpoint_kw当前发布链路
南向采集 -> 内部 MQTT
南向驱动在采集到一组数据后,调用:
go
runtime.PublishGroupRead(read)主程序 Jelly 在 PublishGroupRead 中会向内联 MQTT 发布一条 Line Protocol 消息:
text
direct/{device_sn}/publish也就是说,一次组采集当前只对应一条内部消息,不再额外镜像结构化 groupread。
如果当前 south node 能关联到 DatastoreDevice,Jelly 会:
- 用
DatastoreDevice.sn作为 topic path 中的device_sn - 把
DatastoreDevice.station_id作为 Line Protocol tag 发布出去
如果当前 node 没有映射到 DatastoreDevice,则回退使用 node_name 作为 publish topic path,避免内部消息丢失。
调度/页面 -> 内部 MQTT -> 南向写设备
控制链路当前统一走 /telecontrol/...:
- 页面调用
POST /api/v2/telecontrol - Jelly 校验
node/group/tag后,向内联 MQTT 发布:/telecontrol/{node}/{group}/{tag}
modbus_tcp、modbus_rtu等南向插件订阅对应 topic- 驱动调用
WriteOnce(...)把值写给设备
Scheduler 插件当前不直接消费采集 topic;它主要接收 eKuiper 回传的调度结果,再统一发布到 /telecontrol/...。
当前订阅链路
北向应用常见订阅
当前需要消费内部采集数据的北向插件,统一从 direct/{device_sn}/publish 读取:
| 插件 | 主要订阅 |
|---|---|
MQTT | direct/{device_sn}/publish |
DataStorage | direct/{device_sn}/publish |
InfluxDBv3 | direct/{device_sn}/publish |
eKuiper | direct/{device_sn}/publish |
Modbus | direct/{device_sn}/publish |
Scheduler | 不直接消费采集 topic;主要处理 eKuiper 回传结果后再发布 /telecontrol/... |
modbus_tcp | 订阅 /telecontrol/{node}/{group}/+ |
modbus_rtu | 订阅 /telecontrol/{node}/{group}/+ |
注意:
publishtopic path 使用device_sn,不是node;group过滤由北向插件在解码 Line Protocol 后,根据 tag 中的group字段和自身 subscription 再做内存过滤;node仍然保留在 Line Protocol tag 中,作为 south node 上下文;- 这也是当前 north 插件统一采用的实现方式。
Payload 设计
1. publish Line Protocol
direct/{device_sn}/publish 的消息体是 Telegraf Line Protocol 字节流。
其中 measurement 使用当前 Jelly 的 measurement 配置,tags 至少包含:
nodegroupdevice_sndevice_uuidstation_id
示意:
text
jelly,node=inverter-tcp-01,group=group1,device_sn=inverter-01,device_uuid=5c9d6c30-7cb0-4d7e-9a42-0ce9f3d1e7ae,station_id=1 active_power_kw=74.237,power_factor=0.9814 1776316164481000000字段含义:
node:南向节点名group:组名device_sn:设备序列号;默认与 topic path 中的{device_sn}保持一致device_uuid:设备 UUID;如果当前节点没有关联DatastoreDevice,该 tag 会缺省station_id:电站站号;如果当前节点没有关联DatastoreDevice,该 tag 会缺省- fields:本次组读到的点位值
- timestamp:采集时间
2. telecontrol JSON
/telecontrol/{node}/{group}/{tag} 的消息体当前统一为:
json
{
"value": 85.5
}或者布尔值:
json
{
"value": true
}这样设计的原因是:
- topic 已经唯一定位到目标点位;
- payload 只需要携带最终控制值;
- 便于页面、Scheduler、后续其他控制源复用同一套下发格式。
通配订阅规则
当前代码里已经约定了若干常用 filter:
| 用途 | Filter |
|---|---|
| 所有采集 line protocol | direct/+/publish |
| 指定设备采集 line protocol | direct/{device_sn}/publish |
| 单节点所有 telecontrol | /telecontrol/{node}/+/+ |
| 单节点单组 telecontrol | /telecontrol/{node}/{group}/+ |
说明:
direct/{node}/{group}/groupread相关 filter 目前只保留兼容解析意义,不再作为主链路设计推荐。
QoS 与保留策略
当前内部 MQTT 消息统一采用:
QoS = 0retained = false
原因是:
- 这些消息主要用于进程内实时转发;
- 不要求 Broker 做持久保留;
- 历史数据由
DataStorage/InfluxDBv3负责保存,不依赖 retained message。
station_id 设计约定
station_id 是当前 Atlas 在内部 MQTT 总线上新增的站点维度,用于让北向插件、规则引擎和调度逻辑能够明确区分“同一套 Atlas 内部数据属于哪个电站”。
当前约定如下:
- 来源:
DatastoreDevice.station_id - 映射方式:优先按
DatastoreDevice.sn = south node name查找,必要时再按extra_json.node_name回查 - 发布位置:
direct/{device_sn}/publish:作为 topic pathdirect/{device_sn}/publish:同时把device_sn、device_uuid、station_id作为 Line Protocol tag
- 缺省行为:
- 若节点尚未映射到
DatastoreDevice,则不强行伪造站号 - 此时
publish中不带station_id - topic path 回退使用
node_name
- 若节点尚未映射到
设计原则
1. publish 是内部采集主格式
当前 south -> north 的内部采集总线已经统一收敛到:
direct/{device_sn}/publish
原因:
- topic path 直接按设备维度组织;
group/device_sn/device_uuid/station_id都可以稳定放进 tags;node继续作为 south 上下文保留在 tags 中;- 所有 north 插件统一解析一种格式,减少双轨维护成本;
- 更适合继续复用现有 Line Protocol 生态。
2. groupread 不再作为 south 发布格式
虽然代码里仍保留了少量 groupread 兼容解析能力,但当前 south 插件已经不再发布:
direct/{node}/{group}/groupread
这意味着:
- 新增 north 插件不应该再依赖
groupread - 现有 north 插件也统一改为消费
publish
3. telecontrol 以点位为最小粒度
控制 topic 不设计成“整组批量写”,而是按单点位下发:
text
/telecontrol/{node}/{group}/{tag}这样可以:
- 让控制边界更明确
- 简化驱动侧解析逻辑
- 方便审计和问题排查
仓库边界
当前项目已经拆成两个仓库:
github.com/fluxionwatt/neurongo负责运行时基础库,包括服务装配、server、内置插件、utils、errors与migrationsgithub.com/fluxionwatt/atlas负责atlas可执行程序、配置文件解析、环境变量覆盖、daemon/pidfile/stop、内置webuiGin 路由注入,以及发布打包壳层
这样可以把可复用运行时与 CLI/发布壳层拆开,同时保持现有 HTTP API、DB 格式与 migrations 历史不变。
总结
当前内部 MQTT 设计可以概括为:
direct/{device_sn}/publish:唯一的 south -> north 采集主通道/telecontrol/{node}/{group}/{tag}:统一控制下发通道
其中 station_id 作为 publish 的标准 tag,补齐了电站维度;而 north 插件统一消费 publish,则进一步简化了 Atlas 内部“采集 -> 转发 -> 调度 -> 控制”的总线设计。