用NodeJS、C#和浏览器测试通过MQTT进行通信的功能
我有一段时间没理会它,结果积累了很多待整理的素材。
当涉及到大量设备的通信,像物联网设备一样,或者在网络质量不高或者有限电池的情况下,这个方法就非常有效。可以在这里查看详细说明。
关于服务器,似乎有一个名为beebotte的服务很有名,但也可以在node.js上搭建本地代理。
有一个叫mosca的项目,但似乎在后续项目aedes中进行了优化加速。
当然,AWS也有一个叫AWS IoT Core的服务存在。
这次我们选择在node.js上搭建。
客户端如果是Python,有一个叫做paho-mqtt的库。如果是Node.js,则有一个叫做mqtt的库。
根据这个链接,看起来C#可以使用MQTTnet。
对于树莓派来说,可以使用Python,但这次我们也想考虑到Unity,所以想要使用C#进行确认。
另外,还想要确认一下从浏览器到MQTT over WebSocket的情况。
这是一个贪婪的设定。
使用经纪人和Node.js进行信息传输和接收。
使用npm安装aedes库。
我几乎完全复制了这个脚本,参考了这里的内容。
const aedes = require('aedes')();
// 各アクションに対する反応
aedes.on('clientError', function (client, err) {
console.log('client error', client.id, err.message, err.stack)
});
aedes.on('connectionError', function (client, err) {
console.log('client error', client, err.message, err.stack)
});
aedes.on('publish', function (packet, client) {
if (client) {
console.log('message from client', client.id)
}
});
aedes.on('subscribe', function (subscriptions, client) {
if (client) {
console.log('subscribe from client', subscriptions, client.id)
}
});
aedes.on('unsubscribe', function (subscriptions, client) {
if (client) {
console.log('unsubscribe from client', subscriptions, client.id)
}
});
aedes.on('client', function (client) {
console.log('new client', client.id)
});
aedes.on('clientReady', function (client) {
console.log('client is ready', client.id)
});
aedes.on('clientDisconnect', function (client) {
console.log('client disconnected', client.id)
});
// 実際のサーバ立ち上げ
const server = require('net').createServer(aedes.handle);
const port = 1883;
server.listen(port, function () {
console.log(`Server running at mqtt://localhost:${port}/`);
});
将每个行动的内容复制并做好备忘。
首先,因为我们想要从node.js上轻松确认连接,所以我们需要通过npm install mqtt安装客户端模块,然后通过以下方式进行确认。
据说,发送消息的一方被称为发布者,接收消息的一方被称为订阅者。为了更易于理解,我们可以称之为发送方和接收方。
首先需要创建接收方。
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');
const topic = 'mqtt/test';
client.on('connect', function() {
console.log('subscriber connected.');
});
client.subscribe(topic, function(err, granted) {
console.log('subscriber subscribed.');
});
client.on('message', function(topic_, message) {
console.log('subscriber received topic:', topic_, 'message:', message.toString());
});
送信方和接收方需要设置相同的Topic。
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');
const message = 'SAY HELLO TO MQTT';
const topic = 'mqtt/test'
client.on('connect', function() {
console.log('publisher connected.');
client.publish(topic, message);
console.log('send topic:', topic, ', message:', message);
});
只需按照经纪人、接收方和发送方的顺序启动,就能发送消息。
最初から贫弱的网络连接是预期的,但重新连接的考虑并未经过确认,是否需要考虑并不确定。
在C#中的数据传输
从GitHub上下载并使用MQTTnet。许可证是MIT。因为自己编译很麻烦,所以尝试下载Windows版本v3.0.16。似乎在Unity中使用有版本限制。
使用解压后的zip文件中的MQTTnet.dll。
各种设置等在官方wiki上都有介绍。
编译将继续使用.net的csc(需要参考dll选项)。在.NET 4系列中进行编译。错误CS0012:类型’System.IDisposable’在未被引用的程序集中定义。您必须添加对程序集’System.Runtime, Version=5.0.0.0’的引用。出现这个错误是因为仅有MQTTnet.dll不够,以下是出错的部分列举:
C:\Windows\Microsoft.NET\Framework\v4.0.30319\csc.exe /r:System.Runtime.dll,MQTTnet.dll,System.Private.CoreLib.dll -warn:0 publisher.cs
依赖关系错误已消除(虽然有很多警告但忽略)
然而,出现了类重复问题,如’The type ‘System.TimeSpan’ exists in both…’,以及CS1660和CS4010与委托相关的lambda表达式错误。
云々的错误已经消失了,因为我强制使用选项从指定的DLL中而不是.NET标准中加载了System。
不能在catch子句的主体中等待发生的错误是由C#的版本所致,因为我没有给错误加上延迟。
经过各种尝试,最终编译成功的方法如下。
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Disconnecting;
using System;
using System.Threading;
using System.Threading.Tasks;
class publisher {
static IMqttClient mqttClient = new MqttFactory().CreateMqttClient();
static IMqttClientOptions options;
static string topic = "mqtt/test";
static void Main(string[] args) {
try{
options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost",1883)
.WithCleanSession()
.Build();
// SecureTPC connection = .WithTls()
OnStarted();
while(true){
Thread.Sleep(1000);
PubMessage();
}
}catch(Exception ex){
Console.WriteLine(ex);
}
}
static void OnStarted(){
mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
mqttClient.UseConnectedHandler(async (e) =>
{
Console.WriteLine("### SUBSCRIBED ###");
});
// 送信
mqttClient.UseDisconnectedHandler(async (e) => {
Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(3));
try{ Connect(); }
catch { Console.WriteLine("### RECONNECTING FAILED ###"); }
});
// 受信
mqttClient.UseApplicationMessageReceivedHandler( (e) => {
Console.WriteLine(System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
});
}
static async void Connect(){
var retry = 0;
while (!mqttClient.IsConnected && retry < 5)
{
try
{
mqttClient.ConnectAsync(options).Wait(); //CancellationToken.Noneはエラーになった
}
catch
{
// 最近のC#ならこれができるので再接続トライになる
//await Task.Delay(TimeSpan.FromSeconds(1));
}
retry++;
}
}
static void PubMessage() {
Console.WriteLine("Send");
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload("Hello World")
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
mqttClient.PublishAsync(message); // awaitできなかった
}
}}
因为编译成功了,所以执行时出现了 System.TypeLoadException: 方法 ‘Main’ 没有实现 (没有 RVA) 的错误。
我知道在 main 的 MqttClientOptionsBuilder 的 WithTcpServer 的部分出了问题,但是不知道如何解决,
所以暂时先认为这次编译通过了就好。
从 Visual Studio 编译能行吗?是不是需要更高版本的 .NET?我对 C# 不熟悉,不清楚,只能放弃了。
从浏览器发出的MQTT
这里有一个正在使用MQTT的示例,可以尝试参考一下。
由于无法普通连接,所以需要使用WebSocket,这意味着服务器端也需要接收WebSocket,所以仅仅启动的nodejs代理无法正常工作。
因此,根据这里的参考问题,可以将MQTT方面的handle指定给websocket-stream的handle。
npm install websocket-stream
在WebSocket上进行安装,并将其追加到aedes.handle中,如下所示。
// over WebScoekt
const wsPort = 3000;
const httpServer = require('http').createServer();
const ws = require('websocket-stream');
ws.createServer({ server: httpServer }, aedes.handle);
httpServer.listen(wsPort, function () {
console.log('Aedes MQTT-WS listening on port: ' + wsPort)
});
HTML侧保持了样本的原貌,只是更改了地址。
<!DOCTYPE html>
<html>
<head>
<title>MQTT.js Test</title>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
<p>MQTT.js Test</p>
<input type="button" value="Publish" onclick="OnButtonClick()"/>
<script>
var client = mqtt.connect('ws://localhost:3000');
client.on('connect', () => {
console.log('connected');
client.subscribe('test');
});
client.on('message', (topic, message) => {
console.log(topic + ' : ' + message);
});
function OnButtonClick() {
console.log('onClick');
client.publish('mqtt/test', 'hello world!');
}
</script>
</body>
</html>
当在浏览器上点击以启动最初创建的node.js订阅后,将显示一条消息。
因为它多少有点用,所以结束了。