MQTT中文站
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
      • Node-RED 安装部署
      • Node-RED 用户指南
      • Node-RED 创建节点
      • Node-RED 示例教程
      • Node-RED 开发流程
      • Node-RED 接口参考
      • Node-RED 配置模板
      • Node-RED 常见问题
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册

Ascoltatori:支持 MQTT 及多种代理/协议的消息发布/订阅库

2 年前 • MQTT 客户端库

"Ascoltatori" 是一个简单的发布/订阅库,支持以下代理/协议:

  • Redis,由 @antirez 创建的键/值存储。
  • MongoDB,可扩展、高性能的文档型数据库。
  • Mosquitto 和所有 MQTT 协议的实现。
  • RabbitMQ 和所有 AMQP 协议的实现。
  • ZeroMQ,用于在 P2P 模式下使用 Ascoltatori。
  • QlobberFSQ,共享文件系统队列。
  • Apache Kafka,高吞吐量的分布式消息系统。

安装:

通过 npm 安装库:

$ npm install ascoltatori --save

通过 Git 安装:

$ git clone git://github.com/mcollina/ascoltatori.git
$ cd ascoltatori
$ npm install

入门示例(使用 Redis):

Ascoltatori 专注于为所有支持的代理提供简单且统一的抽象。以下是一个使用 Redis 的简单示例:

var ascoltatori = require('ascoltatori');

ascoltatori.build(function (err, ascoltatore) {
  // 订阅主题
  ascoltatore.subscribe('hello', function() {
    console.log(arguments);
    // { '0': 'hello', '1': 'a message' }
  });

  // 发布消息到主题 'hello'
  ascoltatore.publish('hello', 'a message', function() {
    console.log('message published');
  });
});

通配符支持:

所有 ascoltatori 都支持通配符的使用,因此应该在每个代理上都能正常工作。您可能会发现一些差异,如果是这种情况,请提交错误报告以便修复。

通配符 + 匹配精确一个单词:

var ascoltatori = require('ascoltatori');

ascoltatori.build(function (err, ascoltatore) {
  ascoltatore.subscribe("hello/+/world", function() {
    // 这将打印 { '0': "hello/there/world", '1': "a message" }
    console.log(arguments);
  });

  ascoltatore.subscribe("hello/+", function() {
    // 这不会被调用
    console.log(arguments);
  });

  ascoltatore.publish("hello/there/world", "a message", function() {
    console.log("message published");
  });
});

通配符 * 匹配零个或多个单词:

var ascoltatori = require('ascoltatori');

ascoltatori.build(function (err, ascoltatore) {
  ascoltatore.subscribe("hello/*", function() {
    // 这将打印 { '0': "hello/there/world", '1': "a message" }
    console.log(arguments);
  });

  ascoltatore.subscribe("*", function() {
    // 这将打印 { '0': "hello/there/world", '1': "a message" }
    console.log(arguments);
  });

  ascoltatore.subscribe("hello/there/world/*", function() {
    // 这将打印 { '0': "hello/there/world", '1': "a message" }
    console.log(arguments);
  });

  ascoltatore.publish("hello/there/world", "a message", function() {
    console.log("message published");
  });
});

当然,您可以在同一订阅中混合使用 * 和 +:

var ascoltatori = require('ascoltatori');

ascoltatori.build(function (err, ascoltatore) {
  ascoltatore.subscribe("hello/+/world/*", function() {
    // 这将打印 { '0': "hello/foo/world/bar/42", '1': "a message" }
    console.log(arguments);
  });

  ascoltatore.publish("hello/foo/world/bar/42", "a message", function() {
    console.log("message published");
  });
});

各代理示例:

Ascoltatori 支持不同的代理。以下是如何使用每个代理的示例。

Redis:

var ascoltatori = require('ascoltatori');
var settings = {
  type: 'redis',
  redis: require('redis'),
  db: 12,
  port: 6379,
  return_buffers: true, // 用于处理二进制数据
  host: 'localhost'
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

MongoDB:

MongoDB 使用 Capped Collections 来实现发布/订阅模式。

var ascoltatori = require('ascoltatori');
var settings = {
  type: 'mongo',
  url: 'mongodb://127.0.0.1/ascoltatori',
  pubsubCollection: 'ascoltatori',
  mongo: {} // MongoDB 特定选项
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

您还可以重用现有的 MongoDB 连接:

var ascoltatori = require('ascoltatori');
var MongoClient = require('mongodb').MongoClient;

MongoClient.connect('mongodb://127.0.0.1/ascoltatori', {}, function (err, db) {
  var settings = {
    type: 'mongo',
    db: db,
    pubsubCollection: 'ascoltatori'
  };
  ascoltatori.build(settings, function (err, ascoltatore) {
    // ...
  });
})

MQTT(Mosquitto):

var ascoltatori = require('ascoltatori');
var settings = {
  type: 'mqtt',
  json: false,
  mqtt: require('mqtt'),
  url: 'mqtt://127.0.0.1:1883'
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

AMQP(RabbitMQ):

var ascoltatori = require('ascoltatori');
var settings = {
  type: 'amqp',
  json: false,
  amqp: require('amqp'),
  exchange: 'ascoltatore5672'
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

使用 amqplib:

var ascoltatori = require('ascoltatori');
var settings = {
  type: 'amqplib',
  json: false,
  amqp: require('amqplib/callback_api'),
  exchange: 'ascoltatore5672',


 queue: 'queueName',
  durableQueue: true
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

ZeroMQ:

var ascoltatori = require('ascoltatori');
var settings = {
  type: 'zmq',
  json: false,
  zmq: require("zeromq"),
  port: "tcp://127.0.0.1:33333",
  controlPort: "tcp://127.0.0.1:33334",
  delay: 10
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

QlobberFSQ:

您可以使用任何 QlobberFSQ 构造函数选项,例如:

var ascoltatori = require('ascoltatori');
var settings = {
  type: 'filesystem',
  json: false,
  qlobber_fsq: require("qlobber-fsq"),
  fsq_dir: "/shared/fsq"
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

如果不指定 fsq_dir,则消息将写入 qlobber-fsq 模块目录中名为 fsq 的目录中。

Memory:

var ascoltatori = require('ascoltatori');
ascoltatori.build(function (err, ascoltatore) {
  // ...
});

JSON:

默认情况下,由 ascoltatori.build 创建的每个 ascoltatore 将每个发布的消息包装在 JSON 格式中。可以通过传递 { json: false } 选项来禁用此行为。

require('ascoltatori').build({ json: false }, function(err, a) {
  // ...
});

Apache Kafka:

var ascoltatori = require('ascoltatori');
var settings = {
  type: 'kafka',
  json: false,
  kafka: require("kafka-node"),
  connectionString: "localhost:2181",
  clientId: "ascoltatori",
  groupId: "ascoltatori",
  defaultEncoding: "utf8",
  encodings: {
    image: "buffer"
  }
};

ascoltatori.build(settings, function (err, ascoltatore) {
  // ...
});

如果发布到不存在的 Kafka 主题,则将使用默认设置创建该主题。如果订阅不存在的 Kafka 主题,则仅在通过 Ascoltatori 发布到该主题时才会生效。

调试:

Ascoltatori 支持调试包,并根据外部环境变量触发日志记录。

$ DEBUG=ascoltatori:mqtt node examples/mqtt_topic_bridge.js

支持的调试标志包括:

  • ascoltatori:amqp
  • ascoltatori:trie
  • ascoltatori:mqtt
  • ascoltatori:prefix
  • ascoltatori:redis
  • ascoltatori:zmq
  • ascoltatori:ee2
  • ascoltatori:filesystem
  • ascoltatori:kafka

可靠性:

由于 Ascoltatori 使用各种传输方式,因此无法保证在所有传输方式上具有各种可靠性属性。但是,MQTT 和 AMQP Ascoltatori 提供至少一次语义,这意味着消息可能会被接收多次,但至少一次。

打赏赞(1)微海报分享
Ascoltatori mqtt 客户端服务器

实现百万级物联网通信:探索Mica-MQTT的潜力

使用as3MQTT:实现轻量级MQTT通信的全面指南

猜你喜欢

改善基础设施:HiveMQ如何推动智能城市发展

改善基础设施:HiveMQ如何推动智能城市发展

08/07
2024
为什么企业选择全托管HiveMQ云进行MQTT部署

为什么企业选择全托管HiveMQ云进行MQTT部署

07/01
2024
MQTT 赋能工业 PLC 数据采集与应用

MQTT 赋能工业 PLC 数据采集与应用

06/30
2024
  • 解决方案
    • 智能家居
    • 汽车与出行
    • 工业制造
    • 能源电力
    • 石油石化
    • 交通物流
    • 零售
  • 学习
    • MQTT 规范
    • MQTT 教程
    • MQTT 软件
    • MQTT 客户端库
    • MQTT 服务器
    • 工具和应用程序
  • 关于我们
    • 了解创科慧仁
    • 加入创科慧仁
    • 投资者关系
    • 新闻动态
    • 合作伙伴
    • 联系我们
  • 友情链接
    • Modbus中文网
    • 跳动符号官网
    • 物联网世界
    • RFID世界网
    • 深圳物联网协会
    • isoftstone软通动力
    • 中国发展战略学研究会
    • B.P商业伙伴
  • 在线客服
  • 全国客户服务热线
    4006909885
  • 官方公众号
  • 联系邮箱
    contact@mqtt.cn
Copyright © 2025 MQTT中文站. All rights reserved.Designed by nicetheme. 京ICP备20029519号
在线客服

微信咨询

微信咨询

4006909885

服务热线 7*24小时

电话咨询
  • 首页
  • MQTT 学习
    • MQTT 入门
    • MQTT 进阶
    • MQTT 编程
    • MQTT 实例
    • MQTT 要点
    • MQTT5 要点
    • MQTT 工具
    • MQTT 客户端库
    • MQTT 服务器
    • Zigbee2MQTT
    • Sparkplug
    • Home Assistant
    • Node-RED
  • MQTT 规范
    • MQTT 5 规范
    • MQTT 3.1.1 规范
    • MQTT 3.1 规范
    • MQTT-SN v1.2规范
    • Sparkplug® v3.0.0规范
  • 产品中心
  • 解决方案
    • 环境监测
    • 工业制造
    • 智慧水利
    • 水利管网
    • 积水监测
    • 综合管廊
    • 档案库房
    • 交通物流
    • 智慧城市
    • 智慧农业
    • 智慧养殖
    • 能源电力
    • 石油石化
    • 智能家居
    • 物联网
    • 汽车与出行
  • 使用文档
  • MQTT 云平台
  • 登录
  • 注册
string(5) "2.0.0"