Итак, последние несколько проектов, над которыми я работал, я хотел иметь систему push-уведомлений, которую я мог бы использовать для отправки сообщений экземплярам ролей, чтобы они могли предпринимать действия. Существует несколько систем push-уведомлений, но я хотел найти несколько простых, которые будут включены в мои службы Windows Azure. Я включил версию этой концепции в несколько предложений, но на этой неделе наконец-то пришло время создать практическую демонстрацию идеи.
Для этой демонстрации я решил использовать разделы Windows Azure Service Bus. Темы, в отличие от очередей хранилища Windows Azure, дают возможность нескольким подписчикам получать копию сообщения. Это также была возможность изучить функцию Windows Azure, с которой я не работал более года. Учитывая, насколько API изменился за это время, это было разочаровывающим, но полезным занятием.
Концепция довольно проста. Сообщения отправляются в централизованную тему для распространения. Затем каждый экземпляр роли создает своего собственного подписчика с соответствующим фильтром для получения сообщений, которые ему нужны. Это решение позволяет использовать несколько издателей и подписчиков и даст мне приличный масштаб. Я слышал сообщения / слухи о проблемах, когда у вас больше нескольких сотен подписчиков, но для этой демонстрации у нас все будет хорошо.
Теперь для этой демонстрационной реализации я хочу, чтобы все было просто. Это должен быть центральный класс, который может использоваться рабочими или веб-ролями для создания своих подписок и получения уведомлений с минимальными усилиями. И чтобы сохранить эту простоту, дайте мне такой же простой способ отправлять сообщения обратно.
NotificationAgent
Мы начнем с создания библиотеки классов для нашего централизованного класса, добавив ссылки на нее для Microsoft.ServiceBus (чтобы мы могли делать наши посреднические сообщения) и Microsoft.WindowsAzure.ServiceRuntime (для доступа к среде ролей). Я также собираюсь создать свой класс NotificationTopic.
Примечание: в решении есть несколько вспомогательных классов, которые я не расскажу в этой статье. Если вам нужен полный код этого решения, вы можете скачать его здесь .
Первый метод, который мы добавим к этому, — это конструктор, который принимает параметры, которые нам понадобятся для подключения к пространству имен служебной шины, а также имя / путь к теме, которую мы будем использовать для трансляции уведомлений. Первым из них является создание менеджера пространства имен, чтобы я мог создавать темы и подписки, а также фабрику обмена сообщениями, которую я буду использовать для получения сообщений. Я немного разбил это, чтобы мой класс мог поддерживать передачу TokenProvider (я ненавижу демонстрации, в которых используется только владелец сервиса). Но вот важные строки:
TokenProvider tmpToken = TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey); Uri namespaceAddress = ServiceBusEnvironment.CreateServiceUri(“sb”, baseAddress, string.Empty); this.namespaceManager = new NamespaceManager(namespaceAddress, tokenProvider); this.messagingFactory = MessagingFactory.Create(namespaceAddress, tokenProvider);
Мы создаем URI и токен безопасности для использования с нашим пространством имен служебной шины. Для простоты я использую имя эмитента (владелец) и ключ администрирования сервиса. Я никогда не рекомендовал бы это для производственного решения, но это хорошо для демонстрационных целей. Мы используем их для создания NamespaceManager и MessagingFactory .
Теперь нам нужно создать тему, если она еще не существует.
try { // doesn’t always work, so wrap it if (!namespaceManager.TopicExists(topicName)) this.namespaceManager.CreateTopic(topicName); } catch (MessagingEntityAlreadyExistsException) { // ignore, timing issues could cause this }
Notice that I check to see if the topic exists, but I also trap for the exception. That’s because I don’t want to assume the operation is single threaded. With this block of code running in many role instances, its possible that between checking if it doesn’t exist and the create. So I like to wrap them in a try/catch. You can also just catch the exception, but I’ve long liked to avoid the overhead of unnecessary exceptions.
Finally, I’ll create a TopicClient that I’ll use to send messages to the topic.
So by creating an instance of this class, I can properly assume that the topic exists, and I have all the items I need to send or receive messages.
Sending Messages
Next up, I create a SendMessage method that accepts a string message payload, the type of message, and a TImeSpan value that indicates how long the message should live. In this method we first create a BrokeredMessage giving it an object that represents my notification message. We use the lifespan value that is passed in and set the type as a property. Finally, we send the message using the TopicClient we created earlier and do appropriate exception handling and cleanup.
try { bm = new BrokeredMessage(msg); bm.TimeToLive = msgLifespan; // used for filtering bm.Properties[MESSAGEPROPERTY_TYPE] = messageType.ToString(); topicClient.Send(bm); success = true; } catch (Exception) { success = false; // TODO: do something } finally { if (bm != null) // if was created successfully bm.Dispose(); }
Now the important piece here is the setting of a BrokeredMessage property. It’s this property that can be used later on to filter the messages we want to receive. So let’s not forget that. And you’ll also notice I have a TODO left to add some intelligent exception handling. Like logging the issue.
Start Receiving
This is when things get a little more complicated. Now the experts (meaning the folks I know/trust that responded to my inquiry), recommend that instead of going “old school” and having a thread that’s continually polling for responses, we instead leverage async processing. So we’re going to make use of delegates.
First we need to define a delegate for the callback method:
public delegate bool RecieverCallback(NotificationMessage mesage, NotificationMessageType type);
We then reference the new delegate in the method signature for the message receiving starter:
public void StartReceiving(RecieverCallback callback, NotificationMessageType msgType = NotificationMessageType.All)
Now inside this method we first need to create our subscriber. Since I want to have one subscriber for each role instance, I’ll need to get this from the Role Environment.
// need to parse out deployment ID string instanceId = Microsoft.WindowsAzure.ServiceRuntime.RoleEnvironment.CurrentRoleInstance.Id; subscriptionName = instanceId.Substring(instanceId.IndexOf(‘.’)+1);SubscriptionDescription tmpSub = new SubscriptionDescription(topicName, subscriptionName);
Now is the point where we’ll add the in a filter using the Property that we set on the notification when we created it.
{ Filter tmpFilter = new SqlFilter(string.Format(“{0} = ‘{1}’”, MESSAGEPROPERTY_TYPE, msgType)); subscriptionClient.AddRule(SUBFILTER, tmpFilter); }
I’m keeping it simple and using a SqlFilter using the property name we assigned when sending. So this subscription will only receive messages that match our filter criteria.
Now that all the setup is done, we’ll delete the subscription if it already exists (this gets rid of any messages and allows us to start clean) and create it new using the NameSpaceManager we instantiated in the class constructor. Then we start our async operation to retrieve messages:
asyncresult = subscriptionClient.BeginReceive(waittime, ReceiveDone, subscriptionClient);
Now in this, ReceiveDone is the callback method for the operation. This method is pretty straight forward. We make sure we’ve gotten a message (in case the operation simply timed out) and that we can get the payload. Then, using the delegate we set up earlier, And then we end by starting another async call to get another message.
if (result != null) { SubscriptionClient tmpClient = result.AsyncState as SubscriptionClient; BrokeredMessage brokeredMessage = tmpClient.EndReceive(result); //brokeredMessage.Complete(); // not really needed because your receive mode is ReceiveAndDelete if (brokeredMessage != null) { NotificationMessage tmpMessage = brokeredMessage.GetBody<NotificationMessage>(); // do some type mapping here recieverCallback(tmpMessage, tmpType); } } // do recieve for next message asyncresult = subscriptionClient.BeginReceive(ReceiveDone, subscriptionClient);
Now I’ve added two null checks in this method just to help out in case a receive operation fails. Even the, I won’t guarantee this works for all situations. In my tests, when I set the lifespan of a message to less than 5 seconds, still had some issues (sorting those out yet, but wanted to get this sample out).
Client side implementation
Whew! Lots of setup there. This is where our hard work pays off. We define a callback method we’re going to hand into our notification helper class using the delegate we defined. We’ll keep it super simple:
private bool NotificationRecieved(NotificationMessage message, NotificationMessageType type) { Console.WriteLine(“Recieved Notification”); return true; }
Now we need to instantiate our helper class and start the process of receiving messages. We can do this with a private variable to hold on our object and a couple lines into role’s OnStart.
tmpNotifier = new NotificationTopic(ServiceNamespace, IssuerName, IssuerKey, TopicName); tmpNotifier.StartReceiving(new NotificationTopic.RecieverCallback(NotificationRecieved), NotificationMessageType.All);
Now if we want to clean things up, we can also add some code to the role’s OnStop.
try { if (tmpNotifier != null) tmpNotifier.StopReceiving(); } catch (Exception e) { Console.WriteLine(“Exception during OnStop: “ + e.ToString()); }base.OnStop();
And that’s all we need.
In Closing
So that’s it for our basic implementation. I’ve uploaded the demo for you to use at your own risk. You’ll need to update the WebRole, WorkerRole, and NotifierSample project with the information about your Service Bus namespace. To run the demo, you will want to set the cloud service project as the startup project, and launch it. Then right click on the NotifierSample project and start debugging on it as well.
While this demo may work fine for certain applications, there is definitely room for enhancement. We can tweak our message lifespan, wait timeouts, and even how many messages we retrieve at one time. And it’s also not the only way to accomplish this. But I think it’s a solid starting point if you need this kind of simple, self-contained notification service.
PS – As configured, this solution will require the ability to send outbound traffic on port 9354.