Stream Analytics を作成し、IoTデバイスを監視する!

とめどなく、溢れ出すデータをリアルタイムに処理するニクいやつ Stream Analytics を作成してみたいと思います。IoTデバイスからのセンシングデータを処理するのにうってつけです!!

Stream Analytics は、連続して送られてくるデータを受信し、「クエリ処理」を施した後、任意のサービスに出力する事が出来ます。

例えば、データを加工(クエリ処理)し、PowerBIで可視化したり、アーカイブ目的のため受信データをBlobストレージに保存するなど

今回は、Stream Analytics を使い、接続済み IoTデバイスが切断された場合、Blobストレージに切断ログを保存する。というのを試してみたいと思います。


では、さっそく開始!!

[+新規]ー[モノのインターネット(IoT)]ー[Stream Analytics job]を選択

ジョブ名:表示名
サブスクリプション:Azureサービスの提供範囲
リソースグループ:グループ名(複数のリソースを1つにグループ化する機能)
場所:デプロイするAzureのリージョン

作成した Stream Analytics を選択し、[ロケール]ー[日本語(日本)]を選択 ※時刻合わせ

作成した Stream Analytics を選択し、[入力]ー[+追加]を選択
※入力側は、IoT Hubを指定しデバイスの切断を監視します。

入力のエイリアス:どこからの入力か、特定できる名前を推奨
ソースタイプ:データストリーム/参照データ
ソース:イベントハブ/Blobストレージ/IoTHub
サブスクリプション:現在のサブスクリプション内から上記ソースを選択/手動設定
IoT Hub:選択したソースにより変更。特定のソースを指定する。
エンドポイント:メッセージング/操作の監視
共有アクセスポリシー名:IoTHubへのアクセス権限
共有アクセスポリシーキー:自動入力
コンシューマーグループ:受信するデータをグループに分ける
イベントシリアル化形成:受信するデータのタイプ
エンコード:受信するデータのエンコードタイプ

作成した Stream Analytics を選択し、[出力]ー[+追加]を選択
※出力側は、ログ蓄積用のBlobストレージへの出力を作成

出力のエイリアス:どこへの出力か、特定できる名前を推奨
シンク:SQLデータベース/Blobストレージ/イベントハブ/テーブルストレージ/ServiceBusキュー/ServiceBusトピック/Cosmos DB/PowerBI/DataLakeStore
Azure Function
サブスクリプション:現在のサブスクリプション内から上記ソースを選択/手動設定
ストレージアカウント:保存先のストレージアカウントを指定
ストレージアカウントキー:自動入力
コンテナ:保存先のコンテナを指定
パスパターン:Blob保存時に{data}{time}での階層構造を指定できる。
日付の形式:表示形式を選択
時刻の形式:表示形式を選択
イベントシリアル化形式:受信するデータのタイプ
エンコード:受信するデータのエンコードタイプ
フォーマット:(JSONの場合)改行区切り/配列

作成した Stream Analytics を選択し、[クエリ]を選択
※input(IoTデバイス)の切断を検知するとoutput(Blob)に出力する。

ややこしそうなクエリを書いていますが、下記でもログ出力ができます。

SELECT *
INTO output
FROM input

クエリを[開始]と行きたいのですが、その前に、、IoT Hub側で下記設定を行ないます。

それでは、クエリを[開始]してみましょう!!

エラー無く実行できた場合、IoTデバイスの切断ログがBlobストレージに保存されているはずなので、確認してみましょう!
※今回は、デバイスの電源を落として、切断状態にしてます。

[error]コンテナの中に、ファイルが作成されているのが確認できます。