obniz

obnizから使いやすいMQTTブローカーとライブラリ

この記事はobniz Advent Calendar 2019の17日目の記事です。

obnizと他のシステムを連携動作させたいとき、MQTTを使うと割と楽にプログラムを作れます。

MQTTを使う場合

  • どのブローカーを使うか
  • どのライブラリを使うか

を決めなければなりません。私は今まで自分では調査せずにネット記事のコピペでその場凌ぎを続けてきました。今回一念発起して、どのようなブローカーやライブラリがあるか調べてみました。その中で、obnizで使いやすいものを紹介します。

  • ブローカー:
    自分で運用するブローカー: mosquittomosca
    クラウドサービス:mosquittoshiftr.io
  • ライブラリ:
    ブラウザ用:MQTT.jspaho
    node.js用:MQTT.js

MQTT ブローカーとMQTTライブラリの紹介なので、obnizのコードは登場しません。あしからず。

MQTTの概要

本題に入る前に簡単にMQTTの概要を説明します。既にご存知の方は読み飛ばしてください。

登場人物と役割

MQTTによる通信は、

  • メッセージを中継するbroker
  • メッセージを送信あるいは受信するclient

で行われます。clientは、送信側と受信側を区別するためにpublisher、subscriberと呼ばれる場合があります。

  • メッセージを送信するpublisher
  • メッセージを受信するsubscriber

メッセージの配信は次のようなステップで行われます。

  1. subscriberが、メッセージを受信したいトピックをsubscribeする
  2. publisherがトピックにメッセージをpublishする
  3. subscriberがメッセージを受け取る

②と③を繰り返すことで複数のメッセージを送信できます。また、1つのプログラムがpublisherとsubscriberを兼ねられるので、双方向の通信を実現できます。

メッセージ配信の確実さの種類

subscriberが停止していたりネットワークに障害が発生した場合、メッセージの配信がうまくいかない場合があります。QoS(サービス品質)を指定することでメッセージ配信の確実さを指定することができます。

  • QoS 0:At most once:メッセージが配信されるかもしれないし、ネットワーク障害などで配信されないかもしれません。配信される場合は最大1回です。受信確認や再送のための処理が不要なので実装が軽量になります。
  • QoS 1:At least once メッセージは必ず配信されます。受信が確認されるまで再送を繰り返すのでメッセージが2回以上配信される場合があります。
  • QoS 2:Exactly once メッセージは必ず1回だけ配信されます。

これ以外に、

  • publisher側が切断した時に事前に決めておいたメッセージを配信する「Last Will and Testament」。「サーバがダウンしました」といったメッセージを流すのによく用いられます。
  • メッセージをpublishした後からsubscribeした場合にsubscribe前のメッセージを配信できるように最後のメッセージを保存しておく「Retain

などがあります。

本記事ではこれらの確実さの仕組みについては扱いません。

通信路の種類

clientとbrokerの間で通信に用いられる通信路は、TCPとWebSocketの2種類があります。WebSocketは、publisherやsubscriberがWebブラウザ上で動作する場合に用いられます。

MQTTの仕様そのものでは通信路を規定していないので他の通信路を使ってMQTT通信することも可能ですが、現時点で簡単に利用できるものはTCPとWebSocketの2種類です。

通信内容が外部に漏れないように、TLSで暗号化することもできます。

利用者認証

誰がMQTTで通信できるかを限定する仕組みがあります。

  • anonymous(認証なし。誰でも通信できる)
  • ID, PASSWORDで認証
  • Client証明書で認証(TLS暗号化と組み合わせて使用)

MQTTのバージョン

MQTTにはいくつかバージョンがあります。

  • MQTT v3.1 2013年にIBMが仕様を公開した
  • MQTT v3.1.1 2014年にOASIS標準として公開された
  • MQTT v5.0  2019年にOASIS標準として公開された

MQTT v5.0は標準が規定されてから日が浅いため、対応していないMQTT BrokerやMQTTライブラリが多くあります。

obnizでMQTTを使うのに必要なもの

例えば、obnizとM5StackをMQTT経由で通信させたい場合、構成はこうなります。

用意しなければいけないものは、

  • MQTT Broker
  • obnizで使うMQTT Client Library
  • M5Stackで使うMQTT Client Library (本記事では扱わない)

です。

MQTT Brokerは、Clientに依存していないのでMQTTの仕様を満たすものであればなんでも使えます。

MQTT Client Libraryは、obnizのプログラムがどの言語で書かれていて、何で実行されるのかによって選択肢が変わってきます。本記事ではブラウザで実行されるJavaScriptの場合と、node.jsで実行されるJavaScriptの場合を考えます。pythonなどの他の言語の場合は扱いません。

 

MQTT Broker

MQTT Brokerには、

  • 自分でBrokerを運用する方法
  • クラウドサービスを使う方法

の2種類があります。

自分で運用する方法だと、全てを自分でコントロールすることが可能ですが、上手く動かない時にBrokerの設定が間違っているのかClientの書き方が間違っているのかの切り分けが難しくなります。

クラウドサービスを使う場合は、Broker側に問題がある可能性は低いのでclientのデバッグに集中すれば良くなります。

サクッとMQTTを使ってみるだけならクラウドサービスを利用することをお勧めします。

MQTT Brokerの自主運用

使い方の情報がインタネット上に豊富あるMQTT Brokerには、

があります。

変わり種としてNode-RED内で1つのノードとして動作するMQTT Brokerがあります。実態はmoscaのようです。

mosquitto

mosquittoはECLIPSEのプロジェクトの1つで、MQTTのリファレンス実装の役割を担っているようです。そのため最新の仕様への対応が早いようです。

対応しているMQTTバージョン、QoS、通信路は次の通りです。(注:調査が甘いので間違っているかもしれませ)

MQTTバージョン v3.1、v3.1.1、v5.0
QoS 0:at most once 対応
QoS 1:at least once 対応
QoS2:exactly once 対応
TCP 対応
TCP+TLS 対応
TCP+TLS+Client証明書 対応
WebSocket 対応
WebSocket+TLS 対応

mosquittoのソースコードはこちらから入手できます。

mosquittoのサイト:https://mosquitto.org/files/source

github:https://github.com/eclipse/mosquitto

バイナリーパッケージも用意されているので、お使いの環境に応じてインストール方法を選択してください。例えば、

macでhomebrewを使う場合: brew install mosquitto

Raspberry Piの場合:sudo apt-get install mosquitto

となります。インストール方法を紹介した記事がネット上にたくさんあるのでググって参考にしてみてください。

mosquittoのインストールは簡単ですが、設定は少し難しいです。特に、複数の通信路に対応できるように設定するところでつまずきやすいです。ネット上の記事も誤っているものが散見されます。

自分で色々試してみた結果、

  • anonymous接続
  • ID/PW接続
  • TCP接続
  • WebSocket接続

を全て対応してくれるように設定するには、configファイルに下記を記入すれば良いことがわかりました。正しい設定方法かどうか自信がありませんが、とりあえず動作しています。

allow_anonymous true
password_file /usr/local/opt/mosquitto/etc/mosquitto/passwordfiletest

listener 1883

listener 9090 127.0.0.1
protocol websockets

(注:パスワードファイルのファイル名、パスは上記でなくてもなんでも良いです)

TLSに対応するにはサーバ証明書を用意する必要があり、設定が面倒なので今回は試していません。TLSを試してみるときは、自分で運用しているBrokerを使うよりもCloudサービスを使う方がずっと簡単なので、そちらをお勧めします。

mosca

moscaはnode.js上で動作するMQTT Brokerです。npmで簡単にインストールできます。また、単独のMQTTサーバとして動作させることも、プログラム内のライブラリとして動作させることもできます。MQTT Brokerを手軽に動かしてみたいときは、mosquittoよりもmoscaをお勧めします。

moscaを単独のMQTTサーバとして動作させたい場合は、こうします。詳しくはこちらを参照してください。

$ nmp install mosca -g
$ mosca -v

詳しい設定はコマンドラインオプションかconfigファイルで行います。QoS 1を使うにはメッセージを保存する場所が必要になります。moscaではRedis、MongoDBなどをbackendとして使う設計になっています。これらの指定もconfigファイルで行います。

node.jsのアプリケーション内にライブラリとして組み込むときは、下記のような書き方をします。この例ではbackendとしてRedisを使っています。詳しくはこちらを参照してください。

var mosca = require('mosca')

var ascoltatore = {
  type: 'redis',
  redis: require('redis'),
  db: 12,
  port: 6379,
  return_buffers: true, // to handle binary payloads
  host: "localhost"
};

var moscaSettings = {
  port: 1883,
  backend: ascoltatore,
  persistence: {
    factory: mosca.persistence.Redis
  }
};

var server = new mosca.Server(moscaSettings);
server.on('ready', setup);

server.on('clientConnected', function(client) {
  console.log('client connected', client.id);		
});

// fired when a message is received
server.on('published', function(packet, client) {
  console.log('Published', packet.topic, packet.payload);
});

// fired when the mqtt server is ready
function setup() {
  console.log('Mosca server is up and running')
}

moscaが対応しているMQTTバージョン、QoS、通信路は次の通りです。(注:調査が甘いので間違っているかもしれません)

MQTTバージョン v3.1、v3.1.1
QoS 0:at most once 対応
QoS 1:at least once 対応
QoS 2:exactly once 非対応
TCP 対応
TCP+TLS 対応
TCP+TLS+Client証明書 対応
WebSocket 対応
WebSocket+TLS 対応

QoS 2に対応していませんが、単純なシステムだとQoS 2が必要になる場面は稀なのでmoscaで十分です。

クラウドサービスのMQTT Brokerの利用

MQTTのCloudサービスはたくさんあります。例えばよく聞くサービスにはこんなものがあります。

AWSやGoogle Cloudを日頃から使い慣れている方は、それぞれのMQTT Brokerを使うのが手っ取り早いと思います。使ったことがない人はAWSやGoogle Cloudを使い始めるまでの手間が大きいのでお勧めしません。

とりあえずちょっとMQTTを試してみるだけなら、mosquittoやshiftr.ioがお勧めです。

mosquitto

MQTT Brokerソフトであるmosquittoをクラウドサービスとして提供しているサイトです。テスト目的のサイトなので無料です。

対応している通信路が豊富なのが特徴です。

ユーザ登録なしに使えるサービスなので、anonymous接続のみです。

利用の上で注意すべき点は、テスト目的のBrokerなんで一時的に機能の一部が使えなくなっている場合があります。この点に関してWebページに注意書きがあります。

したがって、クライアントプログラムがうまく動作しない時、クライアントが悪いのか、単にBrokerのその機能が止まっているだけなのか区別がつきません。この点を頭に置いて使うのであればとても便利なサービスです。

shiftr.io

最近人気のMQTTサービスです。MQTT v3.1.1に準拠しています。通信路として下記が使えます。

ユーザ登録してtoken(ID, PASSWORDペア)を作成してBrokerを使う仕組みなので、anonymous接続はできません。ID,PW接続になります。

shiftr.ioの大きな特徴は、Brokerに接続しているクライアントや送受されているメッセージがリアルタイムにアニメーションで見られるところです。

デバッグするときに

  • クライアントがちゃんとBrokerに接続できているか
  • 正しいトピックをsubscribeしているか
  • メッセージが思った通りに配信されているか

を目で見てリアルタイムに確認できるのでとても便利です。

MQTT Client Library

obnizのプログラムを書く場合、ブラウザ用のJavaScriptで書くか、node.js用のJavaScriptで書くかがほとんどなので、この2つのケースを考えます。

ブラウザ用のJavaScriptで使えるMQTT Client Library

ネット上に情報が豊富な代表的ライブラリは

です。

MQTT.js

MQTT.jsはブラウザとnode.jsの両方に対応しています。プログラムのわずかな変更でどちらでも動作するのでプログラムを書くときの負担が少なく便利なライブラリです。

MQTT v3.1.1に準拠しています。実験的にv5.0に対応している部分もありますが、利用は推奨されていません。

shiftr.ioにpublish, subscribeするプログラムを書いてみた例がこれです。

<!DOCTYPE html>
<html>
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <script src="https://code.jquery.com/jquery-3.2.1.min.js"></script>
  <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>

<body>
<h2 id="clientIdHeader"></h2>
<hr/>
<h3>Send data</h3>
<div>
  <input type="text" id="text" value="Hello World"></input>
  <button id="send">SEND</button>
</div>
<hr/>
<h3>Received data from topic "irtest" </h3>
<div id="received">
  <ul id="list">
  </ul>
</div>
<hr>

<script>
  let mqtturl = 'mqtt://ID記入:PW記入@broker.shiftr.io';
  let options = {
    clientId: 'Browser-'+Math.floor(Math.random() * 100) // ID Example: "Browser-23"
  }
  let topic = 'testtopic';

  $("#clientIdHeader").text('Client ID: '+options.clientId);

  console.log('clientId: ' + options.clientId);
  console.log('connecting to: ' + mqtturl);
  let client = mqtt.connect(mqtturl, options);

  client.on('connect', function(){
    console.log('connected.');
    client.subscribe(topic, function(err, granted) {
      if (err) {
        console.log('subscribe failed:', err);
      } else {
        console.log('subscribe succeeded');
      }
    });
  });

  // message受信時の動作:
  client.on('message', function(topic, message){
      console.log('subscriber.on.message', 'topic:', topic, 'message:', message.toString());
      $("#list").prepend("<li>"+message.toString()+"</li>");
  });

  // ボタンクリック時の動作: 
  $("#send").on("click", function(){
    client.publish(topic, $("#text").val());
  });
</script>
</body>
</html>

 

上記の例では、ID,PWの指定をURL内で行なっています。

let mqtturl = 'mqtt://ID文字列:PW文字列@broker.shiftr.io';

 

optionとして外出しする方法もあります。

let mqtturl = 'mqtt://broker.shiftr.io';
let options = {
  username: 'ID文字列',
  password: 'PW文字列',
  clientId: 'Browser-'+Math.floor(Math.random() * 100) // ID Example: "Browser-23"
};

 

paho

pahoはpython用のライブラリとして有名ですが、JavaScript版もあります。サポートしている機能は下図の通りです。 pahoのサイトに、使用例としてこのコードが記載されています。

// Create a client instance
client = new Paho.MQTT.Client(location.hostname, Number(location.port), "clientId");

// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;

// connect the client
client.connect({onSuccess:onConnect});


// called when the client connects
function onConnect() {
  // Once a connection has been made, make a subscription and send a message.
  console.log("onConnect");
  client.subscribe("World");
  message = new Paho.MQTT.Message("Hello");
  message.destinationName = "World";
  client.send(message);
}

// called when the client loses its connection
function onConnectionLost(responseObject) {
  if (responseObject.errorCode !== 0) {
    console.log("onConnectionLost:"+responseObject.errorMessage);
  }
}

// called when a message arrives
function onMessageArrived(message) {
  console.log("onMessageArrived:"+message.payloadString);
}

 

これを参考にしてshiftr.ioに接続するコードを書いてみたのがこちらです。shiftr.ioではID,PWによる認証が必要ですが、URLに埋め込む方式はpahoが受け付けてくれないので、optionとして渡しています。

<html>
<head>
  <meta charset="utf-8">
  <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
  <meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>

<h1>paho test</h1>

<script>
  // MQTTクライアント生成
  var client = new Paho.MQTT.Client(
    "broker.shiftr.io",
    Number(80),
    '',
    "web_" + parseInt(Math.random() * 100, 10));

  // コールバックの定義
  client.onConnectionLost = onConnectionLost; // 接続が切れたとき
  client.onMessageArrived = onMessageArrived; // メッセージ受信したとき

  // 接続
  var options = {
    userName: "IDを記入",
    password: "PWを記入",
    useSSL: false,
    onSuccess:onConnect, // 接続したときのコールバック
    onFailure:doFail // 失敗したときのコールバック
  }
  client.connect(options);

  // 接続したとき
  function onConnect() {
    console.log("onConnect");
    
    // トピックを購読する
    client.subscribe("sampletopic");
    
    // トピックにメッセージを発行してみる
    message = new Paho.MQTT.Message("Hello, World!");
    message.destinationName = "sampletopic";
    client.send(message);
  }
  
  // 失敗したとき
  function doFail(e){
    console.log(e);
  }

  // 接続が切れたとき
  function onConnectionLost(responseObject) {
    if (responseObject.errorCode !== 0) {
      console.log("onConnectionLost:"+responseObject.errorMessage);
    }
  }

  // メッセージを受信したとき
  function onMessageArrived(message) {
    console.log("onMessageArrived:"+message.payloadString);
  }
</script>
</body>
</html>

 

node.jsのJavaScriptで使えるMQTT Client Library

いくつか種類があるようですが、MQTT.jsが一番使いやすいと思います。

MQTT.js

サポートしているMQTTの機能は、JavaScript版と同じです。

下記のコード例は、shiftr.ioのトピックをsubscribeするコード例です。

var mqtt = require('mqtt');
var mqtturl = 'mqtt://IDを記載:PWを記載@broker.shiftr.io';
console.log('connecting to: '+mqtturl);
var client = mqtt.connect(mqtturl, {cliendId: 'nodejs_sub'});

client.on('connect', function(){
    console.log('subscriber connected.');
});

client.subscribe('topic0', function(err, granted){
    if (err) {
        console.log('subscriber subscribe failed:', err);
    } else {
        console.log('subscriber subscribed.');
    }
});

client.on('message', function(topic, message){
    console.log('subscriber on.message', 'topic:', topic, 'message:', message.toString());
});

 

下記のコード例は、shiftr.ioのトピックにpublishするコード例です。

var mqtt = require('mqtt');
var mqtturl = 'mqtt://IDを記載:PWを記載@broker.shiftr.io';
console.log('connecting to: '+mqtturl);
var client = mqtt.connect(mqtturl, {cliendId: 'nodejs_pub'});

client.on('connect', function(){
    console.log('publisher connected.');
});

setInterval(function(){
    var message = new Date().toString();
    client.publish('topic0', message);
    console.log('publisher published:', message);
}, 5000);