MQTT中文站
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
      • Node-RED 安装部署
      • Node-RED 用户指南
      • Node-RED 创建节点
      • Node-RED 示例教程
      • Node-RED 开发流程
      • Node-RED 接口参考
      • Node-RED 配置模板
      • Node-RED 常见问题
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册

利用MQTT、NiFi和InfluxDB构建IoT数据流

2 年前 • MQTT 客户端库

架构

介绍 在本教程中,我们将学习创建物联网应用程序的数据流处理所需的内容。 在这个过程中,我们将了解物联网架构的特点,并了解如何利用MQTT代理、NiFi和InfluxDB等不同工具来构建高度可扩展的物联网应用程序的数据流处理。

物联网及其架构 首先,让我们学习一些基本概念,了解物联网应用程序的一般架构。

2.1. 什么是物联网?

物联网(IoT)广泛指的是物理对象的网络,称为“物”。例如,这些物可以包括从普通家用物品(如灯泡)到复杂的工业设备等任何物。通过这个网络,我们可以连接各种传感器和执行器以交换数据。

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

现在,我们可以在非常不同的环境中部署这些物 - 例如,环境可以是我们的家,也可以是一辆行驶中的货车。然而,我们不能对这些物将可以使用的电源和网络的质量进行任何假设。因此,这为物联网应用程序提出了独特的要求。

2.2. 物联网架构简介

典型的物联网架构通常分为四个不同的层次。让我们了解数据如何在这些层次之间流动:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

首先,感知层主要由从环境中收集测量数据的传感器组成。然后,网络层帮助聚合原始数据并通过互联网进行传输。进一步,数据处理层会过滤原始数据并生成早期分析。最后,应用层使用强大的数据处理能力来执行更深入的数据分析和管理。

MQTT、NiFi和InfluxDB简介 现在,让我们来看看今天在物联网设置中广泛使用的一些产品。这些产品都提供了一些独特的特性,使它们适用于物联网应用程序的数据需求。

3.1. MQTT

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布-订阅网络协议。它现在是OASIS和ISO标准。IBM最初开发它用于在设备之间传输消息。MQTT适用于内存、网络带宽和电源供应有限的受限环境。

MQTT遵循客户端-服务器模型,不同组件可以充当客户端并通过TCP连接到服务器。我们将此服务器称为MQTT代理。客户端可以将消息发布到称为主题的地址。它们还可以订阅主题并接收发布到它的所有消息。

在典型的物联网设置中,传感器可以将测量数据(如温度)发布到MQTT代理,而上游数据处理系统可以订阅这些主题以接收数据:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

正如我们所见,MQTT中的主题是分层的。系统可以通过使用通配符轻松订阅整个主题层次结构。

MQTT支持三个不同的服务质量(QoS)级别。它们分别是“至多传递一次”、“至少传递一次”和“确保仅传递一次”。QoS定义了客户端与服务器之间的协议级别。每个客户端可以选择适合其环境的服务级别。

客户端还可以在发布时请求代理保留消息。在某些设置中,MQTT代理可能需要从客户端那里进行用户名和密码验证以建立连接。此外,出于隐私考虑,TCP连接可能会使用SSL/TLS进行加密。

有几个MQTT代理实现和客户端库可供使用,例如HiveMQ、Mosquitto和Paho MQTT。在本教程中,我们将使用Mosquitto作为示例。Mosquitto是Eclipse基金会的一部分,我们可以轻松地将其安装在树莓派或Arduino等开发板上。

3.2. Apache NiFi

Apache NiFi最初由美国国家安全局(NSA)开发,是一种自动化和数据流管理工具,基于基于流的编程模型,将应用程序定义为黑盒进程网络。

首先,让我们先了解一些基本概念。在NiFi中,系统中移动的对象称为FlowFile。FlowFile处理器实际上执行诸如路由、转换和调解FlowFiles等有用工作。FlowFile处理器通过连接与连接一起使用。

处理组是一种将组件组合在一起以组织NiFi数据流的机制。处理组可以通过输入端口接收数据并通过输出端口发送数据。远程处理组(RPG)提供了一种从远程NiFi实例发送数据或接收数据的机制。

现在,有了这些知识,让我们了解NiFi架构:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

NiFi是一个基于Java的程序,运行在JVM中的多个组件。Web服务器是托管命令和控制API的组件。流控制器是NiFi的核心组件,负责管理扩展何时接收资源以执行。扩展允许NiFi具有可扩展性,并支持与不同系统的集成。

NiFi通过FlowFile存储库跟踪FlowFile的状态。FlowFile的实际内容字节存储在内容存储库中。与FlowFile相关的可信事件数据存储在可信存储库中。

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

由于数据在源头的收集可能需要较小的占用空间和低资源消耗,NiFi有一个名为MiNiFi的子项目。MiNiFi提供了一种补充的数据收集方法,可以通过Site-to-Site(S2S)协议轻松集成到NiFi中。

此外,它通过MiNiFi命令和控制(C2)协议实现了对代理的集中管理。此外,它有助于建立数据溯源,生成完整的链式保管信息。

3.3. InfluxDB

InfluxDB是由InfluxData开发的,使用Go编写的时序数据库。它专为快速和高可用性的时序数据存储和检索而设计。这在处理应用程序指标、物联网传感器数据和实时分析方面特别合适。

首先,InfluxDB中的数据是按时序组织的。时序可以包含零个或多个数据点。数据点代表具有四个组成部分的单个数据记录,包括测量、标签集、字段集和时间戳:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

首先,时间戳显示与特定数据点关联的UTC日期和时间。字段集由一个或多个字段键和字段值对组成。它们捕获了点的实际数据,并附带标签。标签集也由标签键和标签值对组成,但它们是可选的。它们基本上充当点的元数据,并可用于加快查询响应。

测量充当标签集、字段集和时间戳的容器。此外,InfluxDB中的每个数据点都可以与其关联的保留策略。保留策略描述了InfluxDB将保留数据的时间以及通过复制将创建多少副本。

最后,数据库充当用户、保留策略、连续查询和时序数据的逻辑容器。我们可以将InfluxDB中的数据库理解为与传统关系数据库 loosly 相似。

此外,InfluxDB是InfluxData平台的一部分,提供了多种其他产品来高效处理时序数据。InfluxData现在将其作为InfluxDB OSS 2.0(开源平台)和InfluxDB Cloud(商业产品)提供:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

除了InfluxDB之外,该平台还包括Chronograf,它为InfluxData平台提供了完整的界面。此外,它包括Telegraf,用于收集和报告度量和事件的代

理。最后,还有Kapacitor,一个实时流式数据处理引擎。

实践物联网数据管道

现在,我们已经掌握了足够的知识,可以将这些产品一起使用,为我们的物联网应用程序创建数据管道。在本教程中,我们假设我们正在从多个城市的多个观测站收集与空气质量相关的测量数据。例如,这些测量包括地面臭氧、一氧化碳、二氧化硫、二氧化氮和气溶胶等。

4.1. 基础设施设置

首先,我们假设每个城市的气象站都配备了所有感应设备。此外,这些传感器已连接到类似Raspberry Pi的开发板,用于收集模拟数据并将其数字化。该板通过无线连接发送原始测量数据:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

物联网基础设施设置
一个区域控制站收集来自城市中所有气象站的数据。我们可以汇总并将此数据提供给一些本地分析引擎,以获得更快的见解。来自所有区域控制中心的经过筛选的数据被发送到中央指挥中心,该中央指挥中心通常托管在云中。

4.2. 创建物联网架构

现在,我们准备为我们的简单空气质量应用程序设计物联网架构。在这里,我们将使用MQTT代理、MiNiFi Java代理、NiFi和InfluxDB:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站


正如我们所看到的,我们在气象站点使用了Mosquitto MQTT代理和MiNiFi Java代理。在区域控制中心,我们使用NiFi服务器来汇总和路由数据。最后,我们使用InfluxDB来存储中央指挥中心级别的测量数据。

4.3. 执行安装

在像Raspberry Pi这样的开发板上安装Mosquitto MQTT代理和MiNiFi Java代理非常容易。但是,对于本教程,我们将在本地计算机上安装它们。

Eclipse Mosquito的官方下载页面提供了多个平台的二进制文件。一旦安装完成,可以从安装目录轻松启动Mosquitto:

net start mosquitto

此外,NiFi二进制文件也可以从其官方网站下载。我们必须在合适的目录中提取下载的存档。由于MiNiFi将使用站点到站点协议连接到NiFi,因此必须在/conf/nifi.properties中指定站点到站点输入套接字端口:

# Site to Site properties
nifi.remote.input.host=
nifi.remote.input.secure=false
nifi.remote.input.socket.port=1026
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec

然后,我们可以启动NiFi:

<NIFI_HOME>/bin/run-nifi.bat

同样,可以从官方网站下载Java或C++ MiNiFi代理和工具包二进制文件。同样,我们必须将存档提取到适当的目录中。

默认情况下,MiNiFi附带一组非常少的处理器。因为我们将从MQTT中获取数据,所以必须将MQTT处理器复制到/lib目录。这些处理器捆绑为NiFi Archive (NAR)文件,并位于/lib目录中:

COPY <NIFI_HOME>/lib/nifi-mqtt-nar-x.x.x.nar <MINIFI_HOME>/lib/nifi-mqtt-nar-x.x.x.nar

然后,我们可以启动MiNiFi代理:

<MINIFI_HOME>/bin/run-minifi.bat

最后,可以从其官方网站下载InfluxDB的开源版本。与以前一样,可以提取存档,并使用简单的命令启动InfluxDB:

<INFLUXDB_HOME>/influxd.exe

对于本教程,我们应该保留所有其他配置,包括端口,以默认值。这完成了我们在本地计算机上的安装和设置。

4.4. 定义NiFi数据流

现在,我们已经准备好定义我们的数据流。NiFi提供了一个易于使用的界面,用于创建和监控数据流。这可通过URL http://localhost:8080/nifi 访问。

首先,我们将定义将在NiFi服务器上运行的主数据流:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

如我们所见,我们定义了一个输入端口,该端口将从MiNiFi代理接收数据。它通过连接将数据发送到PutInfluxDB处理器,该处理器负责将数据存储在InfluxDB中。在此处理器的配置中,我们定义了InfluxDB的连接URL以及要发送数据的数据库名称。

4.5. 定义MiNiFi数据流

接下来,我们将定义在MiNiFi代理上运行的数据流。我们将使用NiFi的相同用户界面,并将数据流导出为模板,然后在MiNiFi代理中进行配置。让我们定义MiNiFi代理的数据流:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

在这里,我们定义了ConsumeMQTT处理器,负责从MQTT代理获取数据。我们在属性中提供了代理URI以及主题过滤器。我们正在从层次结构"air-quality"下定义的所有主题中获取数据。

我们还定义了一个远程处理组,并将其连接到ConcumeMQTT处理器。远程处理组负责通过站点到站点协议将数据推送到NiFi。

我们可以将此数据流保存为模板,并将其下载为XML文件。让我们将此文件命名为config.xml。现在,我们可以使用转换工具包将此模板从XML转换为MiNiFi代理使用的YAML格式:

<MINIFI_TOOLKIT_HOME>/bin/config.bat transform config.xml config.yml

这将生成config.yml文件,其中我们必须手动添加NiFi服务器的主机和端口:

  Input Ports:
  - id: 19442f9d-aead-3569-b94c-1ad397e8291c
    name: From MiNiFi
    comment: ''
    max concurrent tasks: 1
    use compression: false
    Properties: # Deviates from spec and will later be removed when this is autonegotiated      
      Port: 1026      
      Host Name: localhost

现在,我们可以将此文件放入/conf目录,替换可能已经存在的文件。之后,我们需要重新启动MiNiFi代理。

在这里,我们需要手动进行大量工作来创建数据流并在MiNiFi代理中进行配置。这在实际情况下是不切实际的,因为遥远的地点可能存在数百个代理。然而,正如我们之前所看到的,我们可以使用MiNiFi C2服务器来自动化此过程。但这超出了本教程的范围。

4.6. 测试数据管道

最后,我们准备测试我们的数据管道!由于我们无法使用真实传感器,我们将创建一个小型模拟。我们将使用一个小的Java程序生成传感器数据:

class Sensor implements Callable<Boolean> {
    String city;
    String station;
    String pollutant;
    String topic;
    Sensor(String city, String station, String pollutant, String topic) {
        this.city = city;
        this.station = station;
        this.pollutant = pollutant;
        this.topic = topic;
    }

    @Override
    public Boolean call() throws Exception {
        MqttClient publisher = new MqttClient(
          "tcp://localhost:1883", UUID.randomUUID().toString());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);
        options.setCleanSession(true);
        options.setConnectionTimeout(10);
        publisher.connect(options);
        IntStream.range(0, 10).forEach(i -> {
            String payload = String.format("%1$s,city=%2$s,station=%3$s value=%4$04.2f",
              pollutant,
              city,
              station,
              ThreadLocalRandom.current().nextDouble(0, 100));
            MqttMessage message = new MqttMessage(payload.getBytes());
            message.setQos(0);
            message.setRetained(true);
            try {
                publisher.publish(topic, message);
                Thread.sleep(1000);
            } catch (MqttException | InterruptedException e) {
                e.printStackTrace();
            }
        });
        return true;
    }
}

在这里,我们使用Eclipse Paho Java客户端生成消息并发送到MQTT代理。我们可以添加尽可能多的传感器以创建我们的模拟:

ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<Boolean>> sensors = Arrays.asList(
  new Simulation.Sensor("london", "central", "ozone", "air-quality/ozone"),
  new Simulation.Sensor("london", "central", "co", "air-quality/co"),
  new Simulation.Sensor("london", "central", "so2", "air-quality/so2"),
  new Simulation.Sensor("london", "central", "no2", "air-quality/no2"),
  new Simulation.Sensor("london", "central", "aerosols", "air-quality/aerosols"));
List<Future<Boolean>> futures = executorService.invokeAll(sensors);

如果一切正常,我们将能够在InfluxDB数据库中查询我们的数据:

利用MQTT、NiFi和InfluxDB构建IoT数据流-MQTT中文站

例如,我们可以查看属于“ozone”测量的所有数据点,这些数据存储在数据库“airquality”中。

结论

总之,本教程涵盖了一个基本的IoT用例。我们还了解了如何使用MQTT、NiFi和InfluxDB等工具来构建可扩展的数据管道。当然,这并不涵盖IoT应用程序的全部范围,扩展数据分析管道的可能性是无限的。

此外,本教程中选择的示例仅供演示目的。实际的IoT应用程序的基础架构和架构可以非常多样化和复杂。此外,我们可以通过将可操作的见解作为命令向后推送来完成反馈循环。

打赏赞微海报分享
influxdb mqtt nifi

Node.js和MQTT:构建高性能IoT解决方案

深入理解Home Assistant MQTT:配置、自动化、发布等

猜你喜欢

改善基础设施:HiveMQ如何推动智能城市发展

改善基础设施:HiveMQ如何推动智能城市发展

08/07
2024
为什么企业选择全托管HiveMQ云进行MQTT部署

为什么企业选择全托管HiveMQ云进行MQTT部署

07/01
2024
MQTT 赋能工业 PLC 数据采集与应用

MQTT 赋能工业 PLC 数据采集与应用

06/30
2024

回复

抢沙发咯
  • 解决方案
    • 智能家居
    • 汽车与出行
    • 工业制造
    • 能源电力
    • 石油石化
    • 交通物流
    • 零售
  • 学习
    • MQTT 规范
    • MQTT 教程
    • MQTT 软件
    • MQTT 客户端库
    • MQTT 服务器
    • 工具和应用程序
  • 关于我们
    • 了解创科慧仁
    • 加入创科慧仁
    • 投资者关系
    • 新闻动态
    • 合作伙伴
    • 联系我们
  • 友情链接
    • Modbus中文网
    • 跳动符号官网
    • 物联网世界
    • RFID世界网
    • 深圳物联网协会
    • isoftstone软通动力
    • 中国发展战略学研究会
    • B.P商业伙伴
  • 在线客服
  • 全国客户服务热线
    4006909885
  • 官方公众号
  • 联系邮箱
    contact@mqtt.cn
Copyright © 2025 MQTT中文站. All rights reserved.Designed by nicetheme. 京ICP备20029519号
在线客服

微信咨询

微信咨询

4006909885

服务热线 7*24小时

电话咨询
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册
 

正在加载评论...
 

您必须登录才能发表评论。

    string(5) "2.0.0"