とめどなく、溢れ出すデータをリアルタイムに処理するニクいやつ Stream Analytics を作成してみたいと思います。IoTデバイスからのセンシングデータを処理するのにうってつけです!!
Stream Analytics は、連続して送られてくるデータを受信し、「クエリ処理」を施した後、任意のサービスに出力する事が出来ます。
例えば、データを加工(クエリ処理)し、PowerBIで可視化したり、アーカイブ目的のため受信データをBlobストレージに保存するなど
今回は、Stream Analytics を使い、接続済み IoTデバイスが切断された場合、Blobストレージに切断ログを保存する。というのを試してみたいと思います。
では、さっそく開始!!
[+新規]ー[モノのインターネット(IoT)]ー[Stream Analytics job]を選択
ジョブ名:表示名
サブスクリプション:Azureサービスの提供範囲
リソースグループ:グループ名(複数のリソースを1つにグループ化する機能)
場所:デプロイするAzureのリージョン
作成した Stream Analytics を選択し、[ロケール]ー[日本語(日本)]を選択 ※時刻合わせ
作成した Stream Analytics を選択し、[入力]ー[+追加]を選択
※入力側は、IoT Hubを指定しデバイスの切断を監視します。
入力のエイリアス:どこからの入力か、特定できる名前を推奨
ソースタイプ:データストリーム/参照データ
ソース:イベントハブ/Blobストレージ/IoTHub
サブスクリプション:現在のサブスクリプション内から上記ソースを選択/手動設定
IoT Hub:選択したソースにより変更。特定のソースを指定する。
エンドポイント:メッセージング/操作の監視
共有アクセスポリシー名:IoTHubへのアクセス権限
共有アクセスポリシーキー:自動入力
コンシューマーグループ:受信するデータをグループに分ける
イベントシリアル化形成:受信するデータのタイプ
エンコード:受信するデータのエンコードタイプ
作成した Stream Analytics を選択し、[出力]ー[+追加]を選択
※出力側は、ログ蓄積用のBlobストレージへの出力を作成
出力のエイリアス:どこへの出力か、特定できる名前を推奨
シンク:SQLデータベース/Blobストレージ/イベントハブ/テーブルストレージ/ServiceBusキュー/ServiceBusトピック/Cosmos DB/PowerBI/DataLakeStore
Azure Function
サブスクリプション:現在のサブスクリプション内から上記ソースを選択/手動設定
ストレージアカウント:保存先のストレージアカウントを指定
ストレージアカウントキー:自動入力
コンテナ:保存先のコンテナを指定
パスパターン:Blob保存時に{data}{time}での階層構造を指定できる。
日付の形式:表示形式を選択
時刻の形式:表示形式を選択
イベントシリアル化形式:受信するデータのタイプ
エンコード:受信するデータのエンコードタイプ
フォーマット:(JSONの場合)改行区切り/配列
作成した Stream Analytics を選択し、[クエリ]を選択
※input(IoTデバイス)の切断を検知するとoutput(Blob)に出力する。
ややこしそうなクエリを書いていますが、下記でもログ出力ができます。
SELECT *
INTO output
FROM input
クエリを[開始]と行きたいのですが、その前に、、IoT Hub側で下記設定を行ないます。
それでは、クエリを[開始]してみましょう!!
エラー無く実行できた場合、IoTデバイスの切断ログがBlobストレージに保存されているはずなので、確認してみましょう!
※今回は、デバイスの電源を落として、切断状態にしてます。
[error]コンテナの中に、ファイルが作成されているのが確認できます。