用NodeJS、C#和浏览器测试通过MQTT进行通信的功能

我有一段时间没理会它,结果积累了很多待整理的素材。

当涉及到大量设备的通信,像物联网设备一样,或者在网络质量不高或者有限电池的情况下,这个方法就非常有效。可以在这里查看详细说明。

关于服务器,似乎有一个名为beebotte的服务很有名,但也可以在node.js上搭建本地代理。
有一个叫mosca的项目,但似乎在后续项目aedes中进行了优化加速。
当然,AWS也有一个叫AWS IoT Core的服务存在。
这次我们选择在node.js上搭建。

客户端如果是Python,有一个叫做paho-mqtt的库。如果是Node.js,则有一个叫做mqtt的库。
根据这个链接,看起来C#可以使用MQTTnet。
对于树莓派来说,可以使用Python,但这次我们也想考虑到Unity,所以想要使用C#进行确认。
另外,还想要确认一下从浏览器到MQTT over WebSocket的情况。
这是一个贪婪的设定。

使用经纪人和Node.js进行信息传输和接收。

使用npm安装aedes库。

我几乎完全复制了这个脚本,参考了这里的内容。

const aedes = require('aedes')();

// 各アクションに対する反応
aedes.on('clientError', function (client, err) {
    console.log('client error', client.id, err.message, err.stack)
});

aedes.on('connectionError', function (client, err) {
    console.log('client error', client, err.message, err.stack)
});

aedes.on('publish', function (packet, client) {
    if (client) {
        console.log('message from client', client.id)
    }
});

aedes.on('subscribe', function (subscriptions, client) {
    if (client) {
        console.log('subscribe from client', subscriptions, client.id)
    }
});

aedes.on('unsubscribe', function (subscriptions, client) {
    if (client) {
        console.log('unsubscribe from client', subscriptions, client.id)
    }
});

aedes.on('client', function (client) {
    console.log('new client', client.id)
});

aedes.on('clientReady', function (client) {
    console.log('client is ready', client.id)
});

aedes.on('clientDisconnect', function (client) {
    console.log('client disconnected', client.id)
});

// 実際のサーバ立ち上げ
const server = require('net').createServer(aedes.handle);
const port = 1883;

server.listen(port, function () {
    console.log(`Server running at mqtt://localhost:${port}/`);
});

将每个行动的内容复制并做好备忘。

アクション説明client新規クライアントが登録されたときclientReadyクライアントの初期化完了し、通信待機状態になったときclientDisconnectクライアントの通信が正常に切断されたときclientError通信中のクライアントに異常が発生した場合connectionErrorクライアントの初期化に失敗した場合publishパブリッシャーがメッセージをブローカーに送信した時の処理で、第一引数にメッセージパケットを含めるsubscribeサブスクライバーが購読をリクエストした時の処理で、第一引数にトピックを含むオブジェクトを指定unsubscribeサブスクライバーが購読を取り止める時に処理で、第一引数にトピックを含むオブジェクトを指定closedサーバーが終了した場合

首先,因为我们想要从node.js上轻松确认连接,所以我们需要通过npm install mqtt安装客户端模块,然后通过以下方式进行确认。

据说,发送消息的一方被称为发布者,接收消息的一方被称为订阅者。为了更易于理解,我们可以称之为发送方和接收方。

首先需要创建接收方。

const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');
const topic = 'mqtt/test';

client.on('connect', function() {
    console.log('subscriber connected.');
});

client.subscribe(topic, function(err, granted) {
    console.log('subscriber subscribed.');
});

client.on('message', function(topic_, message) {
    console.log('subscriber received topic:', topic_, 'message:', message.toString());
});

送信方和接收方需要设置相同的Topic。

const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');

const message = 'SAY HELLO TO MQTT';
const topic = 'mqtt/test'

client.on('connect', function() {
    console.log('publisher connected.');
    client.publish(topic, message);
    console.log('send topic:', topic, ', message:', message);
});

只需按照经纪人、接收方和发送方的顺序启动,就能发送消息。

最初から贫弱的网络连接是预期的,但重新连接的考虑并未经过确认,是否需要考虑并不确定。

在C#中的数据传输

从GitHub上下载并使用MQTTnet。许可证是MIT。因为自己编译很麻烦,所以尝试下载Windows版本v3.0.16。似乎在Unity中使用有版本限制。

使用解压后的zip文件中的MQTTnet.dll。
各种设置等在官方wiki上都有介绍。

编译将继续使用.net的csc(需要参考dll选项)。在.NET 4系列中进行编译。错误CS0012:类型’System.IDisposable’在未被引用的程序集中定义。您必须添加对程序集’System.Runtime, Version=5.0.0.0’的引用。出现这个错误是因为仅有MQTTnet.dll不够,以下是出错的部分列举:

C:\Windows\Microsoft.NET\Framework\v4.0.30319\csc.exe /r:System.Runtime.dll,MQTTnet.dll,System.Private.CoreLib.dll -warn:0 publisher.cs

依赖关系错误已消除(虽然有很多警告但忽略)
然而,出现了类重复问题,如’The type ‘System.TimeSpan’ exists in both…’,以及CS1660和CS4010与委托相关的lambda表达式错误。

云々的错误已经消失了,因为我强制使用选项从指定的DLL中而不是.NET标准中加载了System。

不能在catch子句的主体中等待发生的错误是由C#的版本所致,因为我没有给错误加上延迟。

经过各种尝试,最终编译成功的方法如下。

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Disconnecting;
using System;
using System.Threading;
using System.Threading.Tasks;

class publisher {
    static IMqttClient mqttClient = new MqttFactory().CreateMqttClient();
    static IMqttClientOptions options;
    static string topic = "mqtt/test";

    static void Main(string[] args) {
        try{
            options = new MqttClientOptionsBuilder()
            .WithTcpServer("localhost",1883)
            .WithCleanSession()
            .Build();
            // SecureTPC connection = .WithTls()

            OnStarted();
            while(true){
                Thread.Sleep(1000);
                PubMessage();
            }
        }catch(Exception ex){
            Console.WriteLine(ex);
        }
    }


    static void OnStarted(){
        mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());

        mqttClient.UseConnectedHandler(async  (e) =>
        {
            Console.WriteLine("### SUBSCRIBED ###");
        });

        // 送信
        mqttClient.UseDisconnectedHandler(async (e) => {
            Console.WriteLine("### DISCONNECTED FROM SERVER ###");
            await Task.Delay(TimeSpan.FromSeconds(3));
            try{ Connect(); }
            catch { Console.WriteLine("### RECONNECTING FAILED ###"); }
        });

        // 受信
        mqttClient.UseApplicationMessageReceivedHandler( (e) => {
            Console.WriteLine(System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
        });
    }

    static async void Connect(){
        var retry = 0;
        while (!mqttClient.IsConnected && retry < 5)
        {
            try
            {
                mqttClient.ConnectAsync(options).Wait(); //CancellationToken.Noneはエラーになった
            }
            catch
            {
                // 最近のC#ならこれができるので再接続トライになる
                //await Task.Delay(TimeSpan.FromSeconds(1));
            }
            retry++;
        }
    }

    static void PubMessage() {
       Console.WriteLine("Send");

       var message = new MqttApplicationMessageBuilder()
           .WithTopic(topic)
           .WithPayload("Hello World")
           .WithExactlyOnceQoS()
           .WithRetainFlag()
           .Build();

       mqttClient.PublishAsync(message); // awaitできなかった
   }
}}

因为编译成功了,所以执行时出现了 System.TypeLoadException: 方法 ‘Main’ 没有实现 (没有 RVA) 的错误。
我知道在 main 的 MqttClientOptionsBuilder 的 WithTcpServer 的部分出了问题,但是不知道如何解决,
所以暂时先认为这次编译通过了就好。
从 Visual Studio 编译能行吗?是不是需要更高版本的 .NET?我对 C# 不熟悉,不清楚,只能放弃了。

从浏览器发出的MQTT

这里有一个正在使用MQTT的示例,可以尝试参考一下。

由于无法普通连接,所以需要使用WebSocket,这意味着服务器端也需要接收WebSocket,所以仅仅启动的nodejs代理无法正常工作。

因此,根据这里的参考问题,可以将MQTT方面的handle指定给websocket-stream的handle。

npm install websocket-stream

在WebSocket上进行安装,并将其追加到aedes.handle中,如下所示。

// over WebScoekt
const wsPort = 3000;
const httpServer = require('http').createServer();
const ws = require('websocket-stream');
ws.createServer({ server: httpServer }, aedes.handle);

httpServer.listen(wsPort, function () {
    console.log('Aedes MQTT-WS listening on port: ' + wsPort)
});

HTML侧保持了样本的原貌,只是更改了地址。

<!DOCTYPE html>
<html>
<head>
    <title>MQTT.js Test</title>
    <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
    <p>MQTT.js Test</p>

    <input type="button" value="Publish" onclick="OnButtonClick()"/>

    <script>
        var client = mqtt.connect('ws://localhost:3000');

        client.on('connect', () => {
            console.log('connected');
            client.subscribe('test');
        });

        client.on('message', (topic, message) => {
            console.log(topic + ' : ' + message);
        });

        function OnButtonClick() {
            console.log('onClick');
            client.publish('mqtt/test', 'hello world!');
        }

    </script>
</body>
</html>

当在浏览器上点击以启动最初创建的node.js订阅后,将显示一条消息。

因为它多少有点用,所以结束了。

广告
将在 10 秒后关闭
bannerAds