MQTT中文站
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
      • Node-RED 安装部署
      • Node-RED 用户指南
      • Node-RED 创建节点
      • Node-RED 示例教程
      • Node-RED 开发流程
      • Node-RED 接口参考
      • Node-RED 配置模板
      • Node-RED 常见问题
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册

实现智能家居中Modbus与MQTT的无缝集成:modbus2mqtt项目详解

1 年前 • MQTT 编程

摘要:
在构建智能家居系统时,实现不同通信协议之间的互操作性是一项关键任务。modbus2mqtt项目通过桥接Modbus和MQTT协议,为此提供了一个高效的解决方案。本文深入探讨了modbus2mqtt的实现细节,包括其依赖、功能、和实际应用。

引言:
在智能家居和工业自动化领域,Modbus协议常用于传感器和执行器的通信,而MQTT作为一种轻量级的消息传递协议,适用于物联网(IoT)通信。modbus2mqtt是一个Python脚本,它实现了Modbus与MQTT之间的数据转换和传输,从而在异构的智能家居环境中提供了一个集中式消息总线解决方案。

1. 技术背景与依赖性:
该项目依赖于两个关键的Python库:

  • Eclipse Paho:用于实现MQTT客户端功能,实现与MQTT代理的通信。
  • modbus-tk:用于处理Modbus通信,包括读取和写入Modbus从设备的寄存器。
#
# modbus2mqtt - Modbus master with MQTT publishing
#
# Written and (C) 2015 by Oliver Wagner <owagner@tellerulam.com>
# Provided under the terms of the MIT license
#
# Requires:
# - Eclipse Paho for Python - http://www.eclipse.org/paho/clients/python/
# - modbus-tk for Modbus communication - https://github.com/ljean/modbus-tk/
#

import argparse
import logging
import logging.handlers
import time
import socket
import paho.mqtt.client as mqtt
import serial
import io
import sys
import csv
import signal

import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
from modbus_tk import modbus_tcp

version="0.5"
    
parser = argparse.ArgumentParser(description='Bridge between ModBus and MQTT')
parser.add_argument('--mqtt-host', default='localhost', help='MQTT server address. Defaults to "localhost"')
parser.add_argument('--mqtt-port', default='1883', type=int, help='MQTT server port. Defaults to 1883')
parser.add_argument('--mqtt-topic', default='modbus/', help='Topic prefix to be used for subscribing/publishing. Defaults to "modbus/"')
parser.add_argument('--clientid', default='modbus2mqtt', help='Client ID prefix for MQTT connection')
parser.add_argument('--rtu', help='pyserial URL (or port name) for RTU serial port')
parser.add_argument('--rtu-baud', default='19200', type=int, help='Baud rate for serial port. Defaults to 19200')
parser.add_argument('--rtu-parity', default='even', choices=['even','odd','none'], help='Parity for serial port. Defaults to even')
parser.add_argument('--tcp', help='Act as a Modbus TCP master, connecting to host TCP')
parser.add_argument('--tcp-port', default='502', type=int, help='Port for Modbus TCP. Defaults to 502')
parser.add_argument('--registers', required=True, help='Register definition file. Required!')
parser.add_argument('--log', help='set log level to the specified value. Defaults to WARNING. Use DEBUG for maximum detail')
parser.add_argument('--syslog', action='store_true', help='enable logging to syslog')
parser.add_argument('--force', default='0',type=int, help='publish values after "force" seconds since publish regardless of change. Defaults to 0 (change only)')
args=parser.parse_args()

if args.log:
    logging.getLogger().setLevel(args.log)
if args.syslog:
    logging.getLogger().addHandler(logging.handlers.SysLogHandler())
else:
    logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))

topic=args.mqtt_topic
if not topic.endswith("/"):
    topic+="/"

logging.info('Starting modbus2mqtt V%s with topic prefix \"%s\"' %(version, topic))

def signal_handler(signal, frame):
        print('Exiting ' + sys.argv[0])
        sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)

class Register:
    def __init__(self,topic,frequency,slaveid,functioncode,register,size,format):
        self.topic=topic
        self.frequency=int(frequency)
        self.slaveid=int(slaveid)
        self.functioncode=int(functioncode)
        self.register=int(register)
        self.size=int(size)
        self.format=format.split(":",2)
        self.next_due=0
        self.lastval=None
        self.last = None

    def checkpoll(self):
        if self.next_due<time.time():
            self.poll()
            self.next_due=time.time()+self.frequency

    def poll(self):
        try:
            res=master.execute(self.slaveid,self.functioncode,self.register,self.size,data_format=self.format[0])
            r=res[0]
            if self.format[1]:
                r=self.format[1] % r
            if r!=self.lastval or (args.force and (time.time() - self.last) > int(args.force)):
                self.lastval=r
                fulltopic=topic+"status/"+self.topic
                logging.info("Publishing " + fulltopic)
                mqc.publish(fulltopic,self.lastval,qos=0,retain=True)
                self.last = time.time()
        except modbus_tk.modbus.ModbusError as exc:
            logging.error("Error reading "+self.topic+": Slave returned %s - %s", exc, exc.get_exception_code())
        except Exception as exc:
            logging.error("Error reading "+self.topic+": %s", exc)
            

registers=[]

# Now lets read the register definition
with open(args.registers,"r") as csvfile:
    dialect=csv.Sniffer().sniff(csvfile.read(8192))
    csvfile.seek(0)
    defaultrow={"Size":1,"Format":">H","Frequency":60,"Slave":1,"FunctionCode":4}
    reader=csv.DictReader(csvfile,fieldnames=["Topic","Register","Size","Format","Frequency","Slave","FunctionCode"],dialect=dialect)
    for row in reader:
        # Skip header row
        if row["Frequency"]=="Frequency":
            continue
        # Comment?
        if row["Topic"][0]=="#":
            continue
        if row["Topic"]=="DEFAULT":
            temp=dict((k,v) for k,v in row.iteritems() if v is not None and v!="")
            defaultrow.update(temp)
            continue
        freq=row["Frequency"]
        if freq is None or freq=="":
            freq=defaultrow["Frequency"]
        slave=row["Slave"]
        if slave is None or slave=="":
            slave=defaultrow["Slave"]
        fc=row["FunctionCode"]
        if fc is None or fc=="":
            fc=defaultrow["FunctionCode"]
        fmt=row["Format"]
        if fmt is None or fmt=="":
            fmt=defaultrow["Format"]
        size=row["Size"]
        if size is None or size=="":
            size=defaultrow["Size"]
        r=Register(row["Topic"],freq,slave,fc,row["Register"],size,fmt)
        registers.append(r)

logging.info('Read %u valid register definitions from \"%s\"' %(len(registers), args.registers))


def messagehandler(mqc,userdata,msg):

    try:
        (prefix,function,slaveid,functioncode,register) = msg.topic.split("/")
        if function != 'set':
            return
        if int(slaveid) not in range(0,255):
            logging.warning("on message - invalid slaveid " + msg.topic)
            return

        if not (int(register) >= 0 and int(register) < sys.maxint):
            logging.warning("on message - invalid register " + msg.topic)
            return

        if functioncode == str(cst.WRITE_SINGLE_COIL):
            logging.info("Writing single coil " + register)
        elif functioncode == str(cst.WRITE_SINGLE_REGISTER):
            logging.info("Writing single register " + register)
        else:
            logging.error("Error attempting to write - invalid function code " + msg.topic)
            return
                     
        res=master.execute(int(slaveid),int(functioncode),int(register),output_value=int(msg.payload))
        
    except Exception as e:
        logging.error("Error on message " + msg.topic + " :" + str(e))
    
def connecthandler(mqc,userdata,rc):
    logging.info("Connected to MQTT broker with rc=%d" % (rc))
    mqc.subscribe(topic+"set/+/"+str(cst.WRITE_SINGLE_REGISTER)+"/+")
    mqc.subscribe(topic+"set/+/"+str(cst.WRITE_SINGLE_COIL)+"/+")
    mqc.publish(topic+"connected",2,qos=1,retain=True)

def disconnecthandler(mqc,userdata,rc):
    logging.warning("Disconnected from MQTT broker with rc=%d" % (rc))

try:
    clientid=args.clientid + "-" + str(time.time())
    mqc=mqtt.Client(client_id=clientid)
    mqc.on_connect=connecthandler
    mqc.on_message=messagehandler
    mqc.on_disconnect=disconnecthandler
    mqc.will_set(topic+"connected",0,qos=2,retain=True)
    mqc.disconnected =True
    mqc.connect(args.mqtt_host,args.mqtt_port,60)
    mqc.loop_start()
    
    if args.rtu:
        master=modbus_rtu.RtuMaster(serial.serial_for_url(args.rtu,baudrate=args.rtu_baud,parity=args.rtu_parity[0].upper()))
    elif args.tcp:
        master=modbus_tcp.TcpMaster(args.tcp,args.tcp_port)
    else:
        logging.error("You must specify a modbus access method, either --rtu or --tcp")
        sys.exit(1)

    master.set_verbose(True)
    master.set_timeout(5.0)
    
    while True:
        for r in registers:
            r.checkpoll()
        time.sleep(1)

except Exception as e:
    logging.error("Unhandled error [" + str(e) + "]")
    sys.exit(1)
    

2. 功能概述:
modbus2mqtt作为Modbus主设备运行,持续轮询连接的Modbus从设备,并将读取的数据通过MQTT发布。它支持自定义的轮询频率和寄存器地址,同时也能处理Modbus的写操作。

3. 命令行参数:
脚本通过命令行参数提供了高度的定制性,包括设置MQTT服务器的地址和端口、定义MQTT主题前缀、配置Modbus RTU串口或TCP连接参数,以及指定寄存器定义文件。

4. 寄存器定义与消息发布:
寄存器的定义存储在CSV文件中,包含寄存器地址、大小、格式和轮询频率等信息。脚本根据这些定义轮询Modbus寄存器,并将结果发布到相应的MQTT主题。

5. 高级特性:
除了基础的读功能,modbus2mqtt还支持写入Modbus线圈和寄存器,允许通过MQTT消息对Modbus设备进行控制。此外,脚本支持强制定时发布功能,确保即使数据未变化也能定期更新状态。

6. 应用场景:
modbus2mqtt适用于需要将Modbus设备集成进基于MQTT的智能家居或工业自动化系统的场景。例如,在智能家居中,可以通过MQTT控制和监视基于Modbus的温度传感器、开关等设备。

结论:
modbus2mqtt弥合了Modbus和MQTT两大主流通信协议的差异,为智能家居和工业自动化系统提供了一种有效的数据集成方案。通过灵活的配置和强大的功能,它能够满足各种复杂场景的需求。

打赏赞(1)微海报分享
mqtt topic

在C#中使用M2Mqtt.Net库实现Mqtt客户端断线重连机制

MQTT协议:实现物联网高可用性和性能的秘密武器

猜你喜欢

改善基础设施:HiveMQ如何推动智能城市发展

改善基础设施:HiveMQ如何推动智能城市发展

08/07
2024
为什么企业选择全托管HiveMQ云进行MQTT部署

为什么企业选择全托管HiveMQ云进行MQTT部署

07/01
2024
MQTT 赋能工业 PLC 数据采集与应用

MQTT 赋能工业 PLC 数据采集与应用

06/30
2024

回复

抢沙发咯
  • 解决方案
    • 智能家居
    • 汽车与出行
    • 工业制造
    • 能源电力
    • 石油石化
    • 交通物流
    • 零售
  • 学习
    • MQTT 规范
    • MQTT 教程
    • MQTT 软件
    • MQTT 客户端库
    • MQTT 服务器
    • 工具和应用程序
  • 关于我们
    • 了解创科慧仁
    • 加入创科慧仁
    • 投资者关系
    • 新闻动态
    • 合作伙伴
    • 联系我们
  • 友情链接
    • Modbus中文网
    • 跳动符号官网
    • 物联网世界
    • RFID世界网
    • 深圳物联网协会
    • isoftstone软通动力
    • 中国发展战略学研究会
    • B.P商业伙伴
  • 在线客服
  • 全国客户服务热线
    4006909885
  • 官方公众号
  • 联系邮箱
    contact@mqtt.cn
Copyright © 2025 MQTT中文站. All rights reserved.Designed by nicetheme. 京ICP备20029519号
在线客服

微信咨询

微信咨询

4006909885

服务热线 7*24小时

电话咨询
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册
string(5) "2.0.0"