摘要:
在构建智能家居系统时,实现不同通信协议之间的互操作性是一项关键任务。modbus2mqtt
项目通过桥接Modbus和MQTT协议,为此提供了一个高效的解决方案。本文深入探讨了modbus2mqtt
的实现细节,包括其依赖、功能、和实际应用。
引言:
在智能家居和工业自动化领域,Modbus协议常用于传感器和执行器的通信,而MQTT作为一种轻量级的消息传递协议,适用于物联网(IoT)通信。modbus2mqtt
是一个Python脚本,它实现了Modbus与MQTT之间的数据转换和传输,从而在异构的智能家居环境中提供了一个集中式消息总线解决方案。
1. 技术背景与依赖性:
该项目依赖于两个关键的Python库:
- Eclipse Paho:用于实现MQTT客户端功能,实现与MQTT代理的通信。
- modbus-tk:用于处理Modbus通信,包括读取和写入Modbus从设备的寄存器。
#
# modbus2mqtt - Modbus master with MQTT publishing
#
# Written and (C) 2015 by Oliver Wagner <owagner@tellerulam.com>
# Provided under the terms of the MIT license
#
# Requires:
# - Eclipse Paho for Python - http://www.eclipse.org/paho/clients/python/
# - modbus-tk for Modbus communication - https://github.com/ljean/modbus-tk/
#
import argparse
import logging
import logging.handlers
import time
import socket
import paho.mqtt.client as mqtt
import serial
import io
import sys
import csv
import signal
import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
from modbus_tk import modbus_tcp
version="0.5"
parser = argparse.ArgumentParser(description='Bridge between ModBus and MQTT')
parser.add_argument('--mqtt-host', default='localhost', help='MQTT server address. Defaults to "localhost"')
parser.add_argument('--mqtt-port', default='1883', type=int, help='MQTT server port. Defaults to 1883')
parser.add_argument('--mqtt-topic', default='modbus/', help='Topic prefix to be used for subscribing/publishing. Defaults to "modbus/"')
parser.add_argument('--clientid', default='modbus2mqtt', help='Client ID prefix for MQTT connection')
parser.add_argument('--rtu', help='pyserial URL (or port name) for RTU serial port')
parser.add_argument('--rtu-baud', default='19200', type=int, help='Baud rate for serial port. Defaults to 19200')
parser.add_argument('--rtu-parity', default='even', choices=['even','odd','none'], help='Parity for serial port. Defaults to even')
parser.add_argument('--tcp', help='Act as a Modbus TCP master, connecting to host TCP')
parser.add_argument('--tcp-port', default='502', type=int, help='Port for Modbus TCP. Defaults to 502')
parser.add_argument('--registers', required=True, help='Register definition file. Required!')
parser.add_argument('--log', help='set log level to the specified value. Defaults to WARNING. Use DEBUG for maximum detail')
parser.add_argument('--syslog', action='store_true', help='enable logging to syslog')
parser.add_argument('--force', default='0',type=int, help='publish values after "force" seconds since publish regardless of change. Defaults to 0 (change only)')
args=parser.parse_args()
if args.log:
logging.getLogger().setLevel(args.log)
if args.syslog:
logging.getLogger().addHandler(logging.handlers.SysLogHandler())
else:
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
topic=args.mqtt_topic
if not topic.endswith("/"):
topic+="/"
logging.info('Starting modbus2mqtt V%s with topic prefix \"%s\"' %(version, topic))
def signal_handler(signal, frame):
print('Exiting ' + sys.argv[0])
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
class Register:
def __init__(self,topic,frequency,slaveid,functioncode,register,size,format):
self.topic=topic
self.frequency=int(frequency)
self.slaveid=int(slaveid)
self.functioncode=int(functioncode)
self.register=int(register)
self.size=int(size)
self.format=format.split(":",2)
self.next_due=0
self.lastval=None
self.last = None
def checkpoll(self):
if self.next_due<time.time():
self.poll()
self.next_due=time.time()+self.frequency
def poll(self):
try:
res=master.execute(self.slaveid,self.functioncode,self.register,self.size,data_format=self.format[0])
r=res[0]
if self.format[1]:
r=self.format[1] % r
if r!=self.lastval or (args.force and (time.time() - self.last) > int(args.force)):
self.lastval=r
fulltopic=topic+"status/"+self.topic
logging.info("Publishing " + fulltopic)
mqc.publish(fulltopic,self.lastval,qos=0,retain=True)
self.last = time.time()
except modbus_tk.modbus.ModbusError as exc:
logging.error("Error reading "+self.topic+": Slave returned %s - %s", exc, exc.get_exception_code())
except Exception as exc:
logging.error("Error reading "+self.topic+": %s", exc)
registers=[]
# Now lets read the register definition
with open(args.registers,"r") as csvfile:
dialect=csv.Sniffer().sniff(csvfile.read(8192))
csvfile.seek(0)
defaultrow={"Size":1,"Format":">H","Frequency":60,"Slave":1,"FunctionCode":4}
reader=csv.DictReader(csvfile,fieldnames=["Topic","Register","Size","Format","Frequency","Slave","FunctionCode"],dialect=dialect)
for row in reader:
# Skip header row
if row["Frequency"]=="Frequency":
continue
# Comment?
if row["Topic"][0]=="#":
continue
if row["Topic"]=="DEFAULT":
temp=dict((k,v) for k,v in row.iteritems() if v is not None and v!="")
defaultrow.update(temp)
continue
freq=row["Frequency"]
if freq is None or freq=="":
freq=defaultrow["Frequency"]
slave=row["Slave"]
if slave is None or slave=="":
slave=defaultrow["Slave"]
fc=row["FunctionCode"]
if fc is None or fc=="":
fc=defaultrow["FunctionCode"]
fmt=row["Format"]
if fmt is None or fmt=="":
fmt=defaultrow["Format"]
size=row["Size"]
if size is None or size=="":
size=defaultrow["Size"]
r=Register(row["Topic"],freq,slave,fc,row["Register"],size,fmt)
registers.append(r)
logging.info('Read %u valid register definitions from \"%s\"' %(len(registers), args.registers))
def messagehandler(mqc,userdata,msg):
try:
(prefix,function,slaveid,functioncode,register) = msg.topic.split("/")
if function != 'set':
return
if int(slaveid) not in range(0,255):
logging.warning("on message - invalid slaveid " + msg.topic)
return
if not (int(register) >= 0 and int(register) < sys.maxint):
logging.warning("on message - invalid register " + msg.topic)
return
if functioncode == str(cst.WRITE_SINGLE_COIL):
logging.info("Writing single coil " + register)
elif functioncode == str(cst.WRITE_SINGLE_REGISTER):
logging.info("Writing single register " + register)
else:
logging.error("Error attempting to write - invalid function code " + msg.topic)
return
res=master.execute(int(slaveid),int(functioncode),int(register),output_value=int(msg.payload))
except Exception as e:
logging.error("Error on message " + msg.topic + " :" + str(e))
def connecthandler(mqc,userdata,rc):
logging.info("Connected to MQTT broker with rc=%d" % (rc))
mqc.subscribe(topic+"set/+/"+str(cst.WRITE_SINGLE_REGISTER)+"/+")
mqc.subscribe(topic+"set/+/"+str(cst.WRITE_SINGLE_COIL)+"/+")
mqc.publish(topic+"connected",2,qos=1,retain=True)
def disconnecthandler(mqc,userdata,rc):
logging.warning("Disconnected from MQTT broker with rc=%d" % (rc))
try:
clientid=args.clientid + "-" + str(time.time())
mqc=mqtt.Client(client_id=clientid)
mqc.on_connect=connecthandler
mqc.on_message=messagehandler
mqc.on_disconnect=disconnecthandler
mqc.will_set(topic+"connected",0,qos=2,retain=True)
mqc.disconnected =True
mqc.connect(args.mqtt_host,args.mqtt_port,60)
mqc.loop_start()
if args.rtu:
master=modbus_rtu.RtuMaster(serial.serial_for_url(args.rtu,baudrate=args.rtu_baud,parity=args.rtu_parity[0].upper()))
elif args.tcp:
master=modbus_tcp.TcpMaster(args.tcp,args.tcp_port)
else:
logging.error("You must specify a modbus access method, either --rtu or --tcp")
sys.exit(1)
master.set_verbose(True)
master.set_timeout(5.0)
while True:
for r in registers:
r.checkpoll()
time.sleep(1)
except Exception as e:
logging.error("Unhandled error [" + str(e) + "]")
sys.exit(1)
2. 功能概述:modbus2mqtt
作为Modbus主设备运行,持续轮询连接的Modbus从设备,并将读取的数据通过MQTT发布。它支持自定义的轮询频率和寄存器地址,同时也能处理Modbus的写操作。
3. 命令行参数:
脚本通过命令行参数提供了高度的定制性,包括设置MQTT服务器的地址和端口、定义MQTT主题前缀、配置Modbus RTU串口或TCP连接参数,以及指定寄存器定义文件。
4. 寄存器定义与消息发布:
寄存器的定义存储在CSV文件中,包含寄存器地址、大小、格式和轮询频率等信息。脚本根据这些定义轮询Modbus寄存器,并将结果发布到相应的MQTT主题。
5. 高级特性:
除了基础的读功能,modbus2mqtt
还支持写入Modbus线圈和寄存器,允许通过MQTT消息对Modbus设备进行控制。此外,脚本支持强制定时发布功能,确保即使数据未变化也能定期更新状态。
6. 应用场景:modbus2mqtt
适用于需要将Modbus设备集成进基于MQTT的智能家居或工业自动化系统的场景。例如,在智能家居中,可以通过MQTT控制和监视基于Modbus的温度传感器、开关等设备。
结论:modbus2mqtt
弥合了Modbus和MQTT两大主流通信协议的差异,为智能家居和工业自动化系统提供了一种有效的数据集成方案。通过灵活的配置和强大的功能,它能够满足各种复杂场景的需求。
回复