Azure Dynamic Queue Monitoring

Prerequisites

This article assumes you already have Visual Studio installed (I’m using VS 2017), an Azure account, and a queue ready to be monitored.

What’s the problem?

I have these web jobs that are asynchronously doing all this work, pulling data off queues, but I don’t know when they are really “done.”

Background

So, we’ve got this awesome website, hosted on Azure, that pulls URLs from SQL server and needs to do some web scraping.  After the pages are scraped, the site needs to analyze the content and do some more magic. To do this on a grand scale, we’re leveraging Azure queues and scalable instances of Web Jobs to crank through the work. As the data is processed, the results are stored into table storage. Eventually, the results will be exported in JSON format.

Problem Detail

Unsurprisingly, the client wants this end-to-end processing to happen as quickly as possible. In addition, we want to know the queue is empty as soon as possible so we can perform the final step—the export to JSON. We could open up the Azure Storage Explorer, find the queue, and keep hitting refresh. But why would we do that when we have computers to do the work?

Requirements

We want the solution to:
Leverage the same website

  • Monitor the Queues
  • Do so without requiring manual refresh

Bonus Points!
When the queue is empty:

  • Write a record to the SQL database
  • Send an Email
  • Send an SMS Text Message

Step by Step

Let’s first create a brand new, C# Console Application called QueueWatcher.

Add some NuGet goodness:

Windows.AzureStorage (and all the accoutrements):

Microsoft.WindowsAzure.ConfigurationManager

Add the Azure Storage connection string to the App.config file

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <startup>
       <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6.1" />
    </startup>
    <appSettings>
        <add key="StorageConnectionString"
        value="DefaultEndpointsProtocol=https;AccountName=[YourAccountName];AccountKey=[YourAccountKey];EndpointSuffix=core.windows.net" />                
    </appSettings>
</configuration>

Write some code (the fun part!)
Modify Program.cs:

An instance of the CloudQueueWatcher class (explained below) is created.  An asynchronous task is started that periodically invokes the OnTick function.  The OnTick function calls the CheckQueueCount function and writes the result to the console.

using Microsoft.Azure;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace QueueWatcher
{
         class Program
       {
                   private static CloudQueueWatcher cqw;
                   static void Main( string [ ] args )
                  {
                      try
                      {
                           string queueName = "urlqueue";
                           cqw = new CloudQueueWatcher( queueName );
                           // Make sure the queue exists
                           if ( cqw.Queue == null )
                           {
                                 Console.WriteLine( "Queue {0} not found", queueName );
                                 return;
                           }
                           else
                           {
                                 Console.WriteLine( "Queue {0} found!", queueName );
                           }
                           char input = '\0';
                           Console.WriteLine( "Enter q to quit" );
                           Task t = RunPeriodicAsync(
                           OnTick, TimeSpan.FromSeconds( 5 ), TimeSpan.FromSeconds( 5 ), CancellationToken.None );
                           while ( input != 'q' )
                           {
                                 t.ConfigureAwait( false );
                                 input = Console.ReadKey().KeyChar;
                           }
                     }
                     catch ( Exception ex )
                     {
                           Console.WriteLine( "An error occurred: {0}", ex.Message );
                     }
                     finally
                     {
                                 Console.ReadLine();
                     }
                 }
                 // The 'onTick' method will be called periodically unless cancelled.
                 private static async Task RunPeriodicAsync(
                 Action onTick, TimeSpan dueTime, TimeSpan interval, CancellationToken token )
                 {
                           // Initial wait time before we begin the periodic loop.
                           if ( dueTime > TimeSpan.Zero )
                                 await Task.Delay( dueTime, token );
                           // Repeat this loop until cancelled.
                           while ( ! token.IsCancellationRequested )
                           {
                                  // Call our onTick function.
                                  onTick?.Invoke();
                                 // Wait to repeat again.
                                 if ( interval > TimeSpan.Zero )
                                 await Task.Delay( interval, token );
                           }
                 }
                  private static void OnTick()
                 {
                            Console.WriteLine( "Queue Count = {0}", cqw.CheckQueueCount() );
                 }
       }
}

Add a new class called CloudQueueWatcher.cs

This creates a reference to the Azure Storage Account, a client to interact with the account, and a reference to the queue we want to track. The FetchAttributes method will grab info from the queue, including the approximate count.

using Microsoft.Azure;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace QueueWatcher
{
       public class CloudQueueWatcher
       {
           public CloudStorageAccount StorageAccount { get; set; }
           public CloudQueueClient QueueClient { get; set; }
           public CloudQueue Queue { get; set; }
           public CloudQueueWatcher()
           {
           }
           public CloudQueueWatcher( string queueName )
           {
             StorageAccount = CloudStorageAccount.Parse(
             CloudConfigurationManager.GetSetting("StorageConnectionString" ) );
             // Create the queue client.
             CloudQueueClient queueClient = StorageAccount.CreateCloudQueueClient();
             // Retrieve a reference to a queue.
             Queue = queueClient.GetQueueReference( queueName );
          }
          public int CheckQueueCount()
          {
             Queue.FetchAttributes();
             return Queue.ApproximateMessageCount ?? 0;
          }
        }
}

To test, a web application loads the queue, and an Azure function takes messages off the queue. Now I can run the CQW to track progress.

That’s all there is to it!
For my next trick, I will investigate enhancing CQW to write/update SQL and send notifications when the queue is empty. And If your organization needs help with any Azure issue, please don’t hesitate to reach out to Anexinet. We’d love to help you clear things up.
 

Share on

linkedin sharing button twitter sharing button