В части 1 мы рассмотрели сценарий, использующий служебную шину в качестве конечной точки ретрансляции WCF, которая позволяла клиентам в сети взаимодействовать со службой WCF, размещенной на моем локальном компьютере. Рабочий процесс выглядит примерно так, как показано ниже, где общедоступная конечная точка (простое приложение ASP.NET в Windows Azure) выполняет вызовы моей службы, размещается в приложении WPF самостоятельно и работает на моей локальной машине за брандмауэром, но подключается к конечная точка реле в служебной шине.
Реализация ретрансляции служебной шины довольно проста, хорошо для любого, кто занимался программированием WCF, поскольку основным отличием от локального сценария является использование пространства имен служебной шины для размещения конечной точки и набора привязок ретрансляции, большинство из которых есть аналоги в локальном мире (мы использовали BasicHttpRelayBinding в предыдущем посте ).
Все это прекрасно работает, но в подходе есть существенный недостаток: обе стороны должны быть одновременно активны, иначе связь не работает. Конечно, это происходит и на месте, и вы можете встроить логику повторов, но разве не будет проще, если все это будет сделано для вас? Вот где приходят очереди служебной шины. Очередь предоставляет место для сообщений, если нет доступных клиентов для их обработки, возможно, из-за того, что клиент не работает или слишком занят. Как прямой результат этой «временной развязки», теперь отправители и получатели могут работать с разными скоростями; Кроме того, несколько потребителей, которые работают в одной и той же очереди, могут обрабатывать сообщения с независимой скоростью. Это модель «конкурирующего потребителя» того же типа, что и очередь в банке, где вас ждет следующий кассир.
В качестве основной темы этого поста я возьму существующий пример, который я прошел в предыдущем посте блога, и изменю его для работы с очередями служебной шины. Как и в предыдущем примере, вы можете загрузить полный исходный код с GitHub , хотя вам нужно будет изменить настройки приложения (в App.config и web.config ), чтобы использовать идентификаторы служебной шины и ключи, которые вы установили для предыдущего примера ,
Создание очереди
Первым шагом является создание очереди служебной шины. Вы можете сделать это программно или через портал Windows Azure непосредственно в пространстве имен служебной шины, которое вы создали в первой части . Здесь я создал очередь с именем thequeue и оставил все свойства по умолчанию:
- Время жизни сообщения по умолчанию (TTL) указывает, как долго сообщение будет находиться в очереди, прежде чем оно будет удалено. Кстати, этот любопытный номер по умолчанию существует чуть более 29000 лет! Если вы также установите флажок Включить мертвые буквы по истечении срока действия сообщения , сообщение будет перемещено в специальную очередь недоставленных сообщений с конечной точкой в этом случае sb: //heyjim.servicebus.windows.net/thequeue/$DeadLetterQueue
- Очереди также поддерживают обнаружение повторяющихся сообщений, где уникальность сообщения определяется свойством MessageId . Чтобы обнаружить дубликаты, вы должны установить флажок « Требуется обнаружение дубликатов» и установить временное окно, в течение которого вы хотите, чтобы дубликаты обнаруживались (« Временное окно истории обнаружения дубликатов» ). По умолчанию установлено значение 10 минут, что означает, что второе сообщение с тем же MessageId, которое поступит в течение 10 минут после первого появления этого сообщения, будет автоматически удалено.
- Длительность блокировки определяет продолжительность времени (не более пяти минут), в течение которого сообщение скрывается для других потребителей, когда к очереди обращаются в режиме PeekLock (по сравнению с режимом ReceiveAndDelete ).
- Максимальный размер очереди может быть задано с шагом 1 ГБ до максимум 5 Гб; каждое сообщение может иметь размер до 256 КБ.
- Если установлен флажок Требуется сеанс , могут обрабатываться сообщения, которые должны обрабатываться одним и тем же потребителем. Сеанс определяется во время создания сообщения с использованием свойства SessionId . Установка этого свойства требует, чтобы клиенты использовали SessionReceiver для получения сообщений из очереди.
Все эти свойства могут быть установлены при программном создании очереди (с помощью метода CreateQueue через идентификатор служебной шины, который имеет утверждение Manage ). На самом деле есть два дополнительных свойства ( QueueDescription ), которые не устанавливаются через портал:
- EnableBatchedOperations указывает, что запросы Send и Complete к очереди могут быть пакетированы (только при использовании асинхронных методов управляемого клиента .NET), что может повысить эффективность; по умолчанию пакетные операции включены.
- MaxDeliveryCount (значение по умолчанию: 10) указывает максимальное количество раз, когда сообщение может быть прочитано из очереди. Этот параметр применяется только в том случае, если MessageReceiver находится в режиме PeekLock , в котором сообщение блокируется на период времени ( LockDuration ), в течение которого оно должно быть помечено как завершенное , или оно снова будет доступно для обработки. Затем MaxDeliveryCount становится стратегией обработки вредоносных сообщений, поскольку обреченное сообщение никогда не будет помечено как завершенное и в противном случае вновь появится в очереди до бесконечности .
очередями Windows Azure и очередями служебной шины Windows Azure — для сравнения и сравнения .
Кодирование потребителя
В нашем примере потребителем очереди сообщений является клиентское приложение WPF. В локальном сценарии, описанном в моем предыдущем посте , приложение WPF самостоятельно размещало конечную точку службы WCF через служебную шину с использованием BasicHttpRelayBinding . С очередями вы также можете использовать семантику WCF с помощью новой привязки, NetMessagingBinding , и Том Холландер рассматривает этот подход в своем блоге . В качестве альтернативы вы можете использовать REST API из любого HTTP-клиента или, как я расскажу здесь, .NET API.
Код для обработки сообщений запускается в отдельном потоке от простого пользовательского интерфейса окна WPF. Полный код приведен ниже, более выделенные части выделены и объяснены ниже.
Строки 5-10 настраивают MessagingFactory, которая устанавливает конечную точку служебной шины и учетные данные для доступа ( предполагается, что пользователь wpfsample представляет заявку на отправку, как было установлено в моем предыдущем сообщении в блоге ). В строке 11 , MessageReceiver конкретизируется , указывающий на очередь, мы явно созданными ранее с помощью портала Windows Azure.
Вызов на получение в строке 15 возвращает следующее сообщение в очереди или время ожидания истекает через пять секунд. Если в течение этого периода времени в очереди не появляется никаких сообщений, результирующее сообщение в строке 16 равно нулю. Включающий цикл while будет повторяться и ожидать следующего сообщения. ( Флаг isProcessing — это переменная уровня класса, которая позволяет пользователю приложения WPF останавливать и начинать прослушивание очереди; он устанавливается с помощью кнопки команды в пользовательском интерфейсе).
Возвращаемое сообщение имеет тип BrokeredServiceMessage , с помощью которого вы можете получить доступ к информации заголовка в пакете свойств и получить саму полезную нагрузку сообщения с помощью метода GetBody ( строки 25-28 ). Если сообщение не содержит ожидаемого содержимого, например, заголовки не установлены, необходимо предпринять некоторые действия. Здесь исправление ( строки 30ff ) состоит в том, чтобы просто отобразить сообщение об ошибке тем же методом, что и для законного уведомления, но мы могли бы использовать другой подход и переместить его в очередь недоставленных сообщений , с помощью метко названного метода DeadLetter , где некоторые другие Процесс может проверять и пересылать сообщения для вмешательства человека или диагностики.
После обработки сообщения последний шаг — пометить его как завершенное ( строка 40 ). Это должно быть подсказкой, что я использую (по умолчанию) семантику PeekLock в очереди. Если бы я установил theQueue.Mode = ReceiveMode.ReceiveAndDelete; сообщение будет автоматически удалено; тем не менее, я бы рискнул потерять это сообщение, если бы произошел сбой или ошибка службы между извлечением сообщения и завершением обработки сообщения.
Другие возможности для обработки сообщения включают в себя:
отказ от него , и в этом случае принудительная блокировка сообщения немедленно снимается, и сообщение снова становится видимым для другого потребителя,
переместить его в очередь мертвых писем, как упоминалось ранее, или
отложить сообщение и перейти к следующему в очереди. Здесь вы должны сохранить SequenceNumber и вызвать перегруженный метод Receive, передавая этот номер в тот момент, когда вы действительно хотите обработать сообщение.
Кодирование производителя
В этом случае источником сообщений является веб-приложение ASP.NET. Он может быть размещен где угодно, например, в Windows Azure, но если вы просто тестируете его, вы можете запустить его из эмулятора Azure на локальном компьютере или даже через веб-сервер разработки (Cassini) из Visual Studio. Код следует:
protected void btnSend_Click(object sender, EventArgs e) { if (txtMessage.Text.Trim().Length == 0) return; String userName = (txtUser.Text.Trim().Length == 0) ? "guest" : txtUser.Text; // create and format the message BrokeredMessage message = new BrokeredMessage(txtMessage.Text); message.Properties["Sender"] = txtUser.Text; message.Properties["Color"] = ddlColors.SelectedItem.Text; // send the message MessagingFactory factory = MessagingFactory.Create( ServiceBusEnvironment.CreateServiceUri("sb", ConfigurationManager.AppSettings["SBNamespace"], String.Empty), TokenProvider.CreateSharedSecretTokenProvider( userName, ConfigurationManager.AppSettings["SBGuestCredentials"])); factory.CreateMessageSender("thequeue").Send(message); }
В строках 7-9 новое сообщение создается из текста, представленного в веб-форме, и устанавливаются два свойства, одно из которых соответствует имени отправителя, а другое соответствует выбранному цвету.
Затем в строках 12-18 также создается экземпляр MessagingFactory ; однако здесь для идентификации служебной шины (гостевой по умолчанию) требуется утверждение « Отправить против прослушивания» .
В строке 19 сообщение отправляется через экземпляр MessageSender . Здесь это делается синхронно, но асинхронные версии Send и других операций над дополнительным классом MessageReceiver также доступны (и предпочтительны в большинстве случаев из-за масштабируемости и опыта пользователя).
Возьми код!
Я добавил код для примера обмена сообщениями в пример кода ретрансляции уже на GitHub ( лицензия MS-LPL ), чтобы вы могли экспериментировать самостоятельно. В следующий раз мы рассмотрим более сложный сценарий публикации / подписки с использованием тем и подписок шины интеграции.