Using IoT Hub Routing, reading data from Blob Storage

Working through some prototypes, I had a need to monitor data created by IoT Hub for days.  Instead of watching the messages every minute of the hour, I simply configured an IoT Hub route to land data in an Azure Blob storage.  The next step was to read the data.

In the past, I have talked about reading AVRO data here: Using Azure Data Lake Analytics to query AVRO data from an IoT Hub Route.  This time, we will use the newer JSON format, but we still have to over come 2 challenges:

  1. The data is stored in many blob files in the Azure Storage Account
  2. The Body of the message is Base64 encoded.

To solve this, I used the following .Net Core application to iterate over all the entries looking for the message {“booting”:”True”}.

using System;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json.Linq;

namespace checkStorage
{
    class Program
    {
        static string BlobStorageConnectionString = "DefaultEndpointsProtocol=https;AccountName=voiddetection26079089369;AccountKey=Z0f8vjJCnX********************K6TJ0Spx5/gRw==;EndpointSuffix=core.windows.net";
        static string BlobStorageContainer = "iothub";
        static async Task Main(string[] args)
        {
            BlobContinuationToken continuationToken = null;
            CloudStorageAccount mycloudStorageAccount = CloudStorageAccount.Parse(BlobStorageConnectionString);
            CloudBlobClient blobClient = mycloudStorageAccount.CreateCloudBlobClient();
            CloudBlobContainer container = blobClient.GetContainerReference(BlobStorageContainer);
            do
            {
                BlobResultSegment resultSegment = await container.ListBlobsSegmentedAsync(prefix:string.Empty,
                                    true, BlobListingDetails.Metadata, maxResults:1000, continuationToken, null, null);

                foreach (CloudBlob blobItem in resultSegment.Results)
                {
                    try
                    {
                        string BlobContents = await container.GetBlockBlobReference(blobItem.Name).DownloadTextAsync();
                        foreach (String entry in BlobContents.Split("\n"))
                        {
                            JObject entryJSON = JObject.Parse(entry);
                            DateTime EnqueuedTimeUtc = DateTime.Parse(entryJSON["EnqueuedTimeUtc"].ToString());
                            string deviceId = entryJSON["SystemProperties"]["connectionDeviceId"].ToString();
                            string message = System.Text.Encoding.UTF8.GetString(Convert.FromBase64String(entryJSON["Body"].ToString())).Replace("'", "\"");
                            JObject messageJSON = JObject.Parse(message);
                            if (messageJSON.ContainsKey("booting"))
                                {
                                    Console.WriteLine(deviceId + " " + EnqueuedTimeUtc.ToString() + " " + message);
                                }
                        }
                    }
                    catch (Exception er)
                    {
                        Console.WriteLine("\r" + DateTime.Now + " Error: " + er.ToString());
                    }
                }
                continuationToken = resultSegment.ContinuationToken;

            } while (continuationToken != null);

        }
    }
}

Here you see the output below.  Note how, because routing stores by default based on IoT Partition, the data is time ranked per partition (tub3 on partition#0, tub2 on partition#1).

one

 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s