Статьи

Все, что нужно знать о службе обмена сообщениями Azure Service Bus (часть 2)

Это вторая часть Azure Service Bus Brokered Messaging, и вы можете найти первую часть  здесь . В первой части мы сосредоточились исключительно на том, как использовать Service Bus Queues. Продолжая в части 2, из-за огромного объема информации, наше основное внимание будет сосредоточено на особенностях и преимуществах разделов и подписок служебной шины, которые к настоящему времени, как вы должны знать, отличаются от очередей.

темы

topic_message_full

Сейчас действительно трудно говорить о темах без подписок из-за их тесной связи, и почти во всех примерах это именно то, что вы увидите. Но, несмотря на различия, темы имеют много общего с очередью служебной шины. Таким образом, с этой точки зрения, поскольку вы уже знакомы с очередями служебной шины, мы поговорим о темах, а затем представим подписку.

Темы, подобные очереди, предоставляют адресат для отправки сообщений. Но Темы в сочетании с Подписками обеспечивают шаблон публикации и подписки для распространения сообщений. Это не похоже на остров в очереди, где очередь является и получателем, и хранителем полученных сообщений.

Проще говоря, Темы обеспечивают издательскую сторону шаблона публикации / подписки. С высоты 30 000 футов они не сильно отличаются от очередей, поскольку они обеспечивают пункт назначения для отправки сообщений. Тем не менее, темы, как и очереди, должны быть созданы, обработаны и отправлены сообщения.

Многие операции по управлению Темами используются и выполняются так же, как и Очереди, и мы можем использовать то же,  NamespaceManager что мы видели в  части 1,  для облегчения наших операций CRUD.

Создание тем

Создать тему можно так же просто, как дать ей имя:

NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(AccountInfo.ConnectionString);TopicDescription newTopicDescription = await _namespaceManager.CreateTopicAsync(topicName);

Или тема может быть создана из того,  TopicDescription где мы определяем характеристики темы.

TopicDescription description = new TopicDescription(topicPath)
{
    SupportOrdering = true,
    AutoDeleteOnIdle = TimeSpan.FromHours(8),
    DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0)
};

TopicDescription newTopicDescription = null;

//If not exists, we'll create it
if (!await namespaceManager.TopicExistsAsync(description.Path))
{
    newTopicDescription = await namespaceManager.CreateTopicAsync(description);
}

//Or we'll retrieve it if it already exists
TopicDescription topicDescription = newTopicDescription ?? await namespaceManager.GetTopicAsync(description.Path);

С более надуманным примером мы можем продемонстрировать некоторые из утилитарных методов, доступных для проверки, существует ли тема, а также для извлечения подробностей о существующей теме.

Обновление Тем

Как и в случае с очередями, мы также можем обновить свойства существующей темы, предоставив  TopicDescription. Однако есть определенные свойства, которые нельзя изменить после создания, такие как RequiresDuplicateDetection:

TopicDescription description = new TopicDescription(topicPath)
{
    SupportOrdering = true,
    AutoDeleteOnIdle = TimeSpan.FromHours(8),
    DefaultMessageTimeToLive = new TimeSpan(0, 4, 0, 0)
};

await namespaceManager.UpdateTopicAsync(description);

Удаление тем

Когда мы удаляем тему, которой не существует, она не выдает исключение:

await namespaceManager.DeleteTopicAsync(topicName);


Отправка сообщений в темы

Теперь любой, кто хоть немного знаком с тем, как работают Темы и Подписки, будет удивляться, с какой стати я хочу поговорить об отправке сообщений по темам, прежде чем я даже введу Подписки. Его просто, вплоть до отправки сообщений по темам, не нужно много менять, и легко почувствовать темы служебной шины, не увязая в том, что отличает их. Оставайтесь со мной, мы будем там на мгновение.

Вы, вероятно, не помните, что я уже говорил в  части 1,  что  MessageSender абстракция для  QueueClientи использование абстракции будет иметь смысл? Ну, это также абстракция для  TopicClient . TopicClient — прямой клиент для отправки сообщений в тему. Поэтому, если вы можете использовать  MessageSenderвместо соответствующего клиента, тогда вашему приложению не нужно заботиться о том, отправляет ли оно сообщения в очередь или в тему, и в этом заключается отдача от использования  MessageSender.

Вы можете вернуться к нашему примеру с очередью создания  MessageSender  для отправки сообщений, и отправить его так же просто, как и для отправки в очередь.

await messageSender.SendAsync(newBrokeredMessage("SimpleMessage"));

Однако, если вы проверяете это, вас ждет большой сюрприз, поэтому я избавлю вас от выпадения волос (что я умею терять!). Если у темы нет зарегистрированной подписки, сообщение автоматически удаляется. Таким образом, похоже, что это подходящее время для введения подписок.

Подписки

Напоминаем, что темы и подписки вместе представляют шаблон публикации и подписки (см. Предыдущую тему / схему подписки). В то время как тема представляет издателя, подписка представляет подписчика. Когда создается подписка, предоставляется тема, на которую она будет подписана для получения сообщений, в которых заинтересована подписка. В простейшей форме сообщения, поступающие в тему, копируются и направляются в каждую подписку, которая является политикой подписки (мы называем правило) доволен.

Мне нравится приравнивать эти отношения между темами и подписками к сценарию прибытия в аэропорт. Представьте, если хотите, тему, представленную аэропортом. Вы, как сообщение, прибываете в аэропорт и проследуете через пункт выдачи багажа и прибываете там, где водители лимузина стоят с табличками с именами разных людей. Эти драйверы представляют политики подписки (правила). Предполагая, что вы найдете водителя лимузина, на котором стоит табличка с вашим именем, он / она сопроводит вас до своего лимузина (по подписке), где вы будете проживать в лимузине и провожать его в какой-то замечательный отпуск по системе «все включено», и больше не будете проживать в аэропорт. Ну, а если вы не нашли водителя с табличкой с вашим именем на нем?

Вы когда-нибудь видели фильм  Терминал  с Томом Хэнксом? Где Виктор Наворский (Том Хэнкс) оказался в ловушке в международном аэропорту имени Джона Кеннеди, потому что ему отказано во въезде в США и в то же время он не может вернуться в свою родную страну из-за революции? Что ж, в отличие от Виктора, у которого есть место для отдыха (Терминал), когда сообщение приходит в тему, которая не удовлетворяет правилу какой-либо связанной подписки, сообщение отбрасывается.

Итак, давайте подробнее рассмотрим, что означают все эти характеристики подписок и как они связаны в теме.

Создание подписки

Когда мы создаем подписку, мы как минимум связываем ее с темой:

SubscriptionDescription switftSubscription = await namespaceManager.SubscriptionExistsAsync(“singingtelegrams”, “TaylorSwift”);

Кроме того, мы можем установить некоторые свойства подписки, передав 
SubscriptionDescription:

SubscriptionDescription subscription = new SubscriptionDescription((“singingtelegrams”, “TaylorSwift”)
{
    EnableDeadLetteringOnFilterEvaluationExceptions = true,
    EnableDeadLetteringOnMessageExpiration = true,
    EnableBatchedOperations = true,
};

SubscriptionDescription bieberSubscription = await namespaceManager.CreateSubscriptionAsync(subscription);


В приведенных выше примерах мы указываем путь к существующей теме, а также название создаваемой подписки. Таким образом, эти подписки не имеют явного правила, определяющего, какие сообщения они должны получать. Поэтому они подписались на получение всех сообщений, отправленных на «певческие телеграммы». Когда какое-либо сообщение отправляется в тему «singingtelegram», 
 подписки TaylorSwift
 и 
JustinBieber
получат копию в своей виртуальной очереди.

В настоящее время ограничение на количество подписок, которые подписаны на тему, составляет 2000.

Удаление подписки

Единственная небольшая разница с удалением очереди и подписки заключается в том, что мы также должны указать путь к теме, с которой связана подписка, а также имя подписки.

await namespaceManager.DeleteSubscriptionAsync(topicPath, subscriptionName);

Обновить подписку

Опять же, как и в случае с очередями и темами, существуют определенные свойства, которые должны быть установлены во время создания, например  RequireSession, но мы можем предоставить  SubscriptionDescription экземпляр для обновления свойств нашей подписки:

SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName)
{
    DefaultMessageTimeToLive = TimeSpan.FromHours(8),
    LockDuration = TimeSpan.FromSeconds(90)
};

SubscriptionDescription updatedSubscription = await namespaceManager.UpdateSubscriptionAsync(subscription);

Напомним, в первой части я уже говорил, что в конце концов мы все еще имеем дело с очередями? Ну, Microsoft определяет подписку как виртуальную очередь. Копии сообщений, отправляемых в подписку, находятся там и извлекаются из очереди служебной шины.

Получение сообщений по подписке

Как и в случае использования реферата  MessageSender, если мы используем реферат,  MessageReceiver нам не нужно беспокоиться о том, является ли это очередью служебной шины или подпиской, из которой мы получаем сообщения. Ниже приведен точный код  получения сообщений из очередей в части 1,  за исключением одного параметра, переданного для  CreateMessageReceiverAsync метода:

MessageReceiver messageReceiver =  await _messagingFactory.CreateMessageReceiverAsync("singingtelegram/subscriptions/TaylorSwift");

try
{
    while (!cancellationToken.IsCancellationRequested)
    {
        var msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
        if (msg.LockedUntilUtc == DateTime.UtcNow) await msg.RenewLockAsync();
        await ProcessAndReleaseMessageAsync(msg);
        await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
    }
}
catch (Exception ex)
{
    //log error;
    throw;
}

private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)
{
    MessageProcessingAction action = MessageProcessingAction.Abandon;

    try
    {
        //Process message
        action = MessageProcessingAction.Complete;
    }
    catch (Exception ex)
    {
        //log
    }
    finally
    {
        //if something fails update with abandon
        //C# 6.0 allows await calls in a finally blocks
        UpdateMessageState(message, action);
    }
}

private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)
{
    switch (action)
    {
        case MessageProcessingAction.Complete:
            await message.CompleteAsync();
            break;
        case MessageProcessingAction.Abandon:
            await message.AbandonAsync();
            break;
        case MessageProcessingAction.Deadletter:
            await message.DeadLetterAsync();
            break;
        default:
            await message.AbandonAsync();
            break;
    }
}


То, о чем мы должны заботиться, — это строка, которую мы предоставляем  MessagingFactory.CreateMessageReceiverAsync(entityPath) методу. В нашем примере с Queue мы просто предоставляем строку пути к очереди. Но в этом случае нам нужен наш путь подписки в виде <Путь к теме> / подписки / <имя подписки>, как показано в «singingtelegram / subscription / TaylorSwift».

Получив любые сообщения, мы можем следовать той же процедуре, что и в нашем примере с очередью , для обновления сообщения путем вызова  AbondonComplete  или  DeadLetter. Я позволю вам просмотреть детали получения сообщения еще в первой  части .

До настоящего времени единственной убедительной причиной для использования тем и подписок поверх очередей была возможность предоставлять копии одного сообщения нескольким заинтересованным сторонам (подпискам). Но это только одна неотразимая особенность тем и подписок. Другая неотъемлемая функция — это маршрутизация сообщений и возможность контролировать, какие сообщения направляются на какие подписки.

Мы можем обеспечить маршрутизацию сообщений через Правила. Теперь, то, что мы не определили правило для подписок, которые мы уже создали, не означает, что правило не существует. Действительно, все подписки как минимум имеют одно правило.

Правила, фильтры и действия Oh My!

Именно здесь мы достигли зенита того, что делает Темы и Подписки такими мощными. Правило в мире подписок определяет, какие сообщения следует копировать в конкретную подписку. Каждая подписка имеет доступ ко всем сообщениям, полученным Темой, и может индивидуально отфильтровать, какие сообщения принадлежат ей. Эта фильтрация выполняется через фильтр правил, о котором мы поговорим подробнее.

В дополнение к фильтрации, когда сообщение отфильтровывается по подписке, поскольку сообщение удовлетворяет условию правила, у нас есть возможность выполнить действия с сообщением. Эти действия включают добавление, удаление и обновление пользовательских свойств и изменение значений системных свойств. Это все определяется действием правила. Следовательно, это действительно фильтр и действия, которые составляют правило.

С субъективной точки зрения, правило больше напоминает политику, чем правило. Там, где правило определяет не только условие (я) (фильтры), но и то, что должно происходить, когда условие выполняется, пахнет больше политикой. Я только утверждаю, что в случае, если кому-то легче понять концепцию как политику.

Другой способ изложить подробное объяснение, приведенное выше, состоит в том, что подписка отфильтровывает сообщения, которые соответствуют условиям правила, и при необходимости применяет действия со свойствами сообщения, когда отфильтрованное сообщение копируется в подписку. Итак, давайте посмотрим примеры всех этих:

Фильтр правил

В ландшафте .NET правило подписки имеет форму a  RuleDescription и определяет правила  Filter и  Action. Когда мы создали наши подписки, доступно доступное переопределение для передачи  RuleDescription :

RuleDescription rule = new RuleDescription
{
    Name = "ForwardToTaylorSwift",
    Filter = new SqlFilter("RequestedSinger = 'Justin Bieber’ OR RequestedSinger = 'Taylor Swift’")
};

SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName);
SubscriptionDescription description = await namespaceManager.CreateOrRetrieveSubscriptionAsync(subscription, rule);


Итак, вот наше первое введение в правило и его фильтр. Как следует из типа фильтра, он позволяет определить стандартное выражение SQL92 в своем конструкторе, который будет определять, какие сообщения отфильтровываются. Мы можем прочитать это как все сообщения, полученные в теме «singingtelegram», которые имеют пользовательское свойство «RequestedSinger» со значением «Джастин Бибер» или «Taylor Swift», будут отфильтрованы в подписку «TaylorSwift».

Так что, если мы отправим a,  BrokeredMessage который соответствует вышеуказанному правилу, в нашу тему «singingtelegram»:

BrokeredMessage message = new BrokeredMessage();
message.Properties.Add("RequestedSinger", "Justin Bieber");

await messageSender.SendAsync(message);

Но это также не исключает возможность получения этого сообщения другими подписками. Напомним, что я упоминал ранее, что у всех подписок есть правило. Когда мы явно не определяем правило, как это было в наших исходных примерах создания подписки, для нас предоставляется правило по умолчанию, которое имеет фильтр по умолчанию. Поскольку условие правила определено в фильтре правил, давайте посмотрим, какие готовые фильтры доступны:

·  SQLFilter  — фильтр, который ряд других фильтров извлекает из таких TrueFilter и FalseFilter.

·  TrueFilter  — это фильтр по умолчанию, предоставляемый с помощью правила по умолчанию, который создается для нас, когда правило или фильтр не предоставляются явно при создании подписки. В конечном итоге это создает выражение SQL92 1 = 1 и подписывается на получение всех сообщений, связанных с темой.

·  FalseFilter  — антитеза TrueFilter, который генерирует выражение SQL92, равное 1 = 0; подписка с этим фильтром никогда не будет подписываться ни на какие сообщения соответствующей темы.

·  CorrelationFilter  — этот фильтр подписывает подписку на все сообщения определенного свойства CorrelationId сообщения.


Имейте в виду, что значения сравнения в выражении SQL чувствительны к регистру, а имена свойств — нет (например, «RequestedSinger = ‘Taylor Swift’» не совпадает с «RequestedSinger = ‘taylor swift’»)

Правила добавления

Подписки не ограничены одним правилом как бы то ни было. Мы можем добавить дополнительные правила к существующей подписке. Тем не менее, это прекрасный пример того, как  MessageReceiver класс абстракции не может удовлетворить наши потребности, и нам нужно обратиться к  SubscriptionClient.

SubscriptionDescription subscription = await _operations.CreateOrRetrieveSubscriptionAsync(topicPath, subscriptionName);

//MessagingFactory as demo'd in part 1
SubscriptionClient subscriptionClient = messagingFactory.CreateSubscriptionClient(subscription.TopicPath, subscription.Name);

RuleDescription ruleDescription = new RuleDescription("NeverGrowUpSongRule", new SqlFilter("RequestedSong = 'Never Grow Up'"));

await subscriptionClient.AddRuleAsync(ruleDescription);

Первая операция — это просто личная оболочка, которая возвращает проверенное  SubscriptionDescription. После проверки мы можем получить  MessagingFactory (см. «  Отправка сообщений»  в части 1), которая позволит нам создать папку,  SubscriptionClient которая, в свою очередь, позволит нам добавить правило к существующей подписке.


Поймите, что каждое удовлетворенное правило создаст копию сообщения.
Это означает, что если подписка имеет 3 отдельных правила, одно сообщение, удовлетворяющее всем 3 условиям правила, получит 3 отдельные копии сообщения. Если вы хотите одно сообщение, основанное на различных условиях, эти условия должны быть указаны в фильтре одного правила (например, «RequestedSinger = ‘Justin Bieber’ ИЛИ ​​RequestedSinger = ‘TaylorSwift’»).

Краткое примечание о CoorelationFilter

Это  CoorelationFilter специальный фильтр для конкретного использования. Это один из доступных способов реализации шаблона взаимодействия, при котором подписка может получать все сообщения, связанные с одним и тем же идентификатором CoorelationId сообщения. Мы рассмотрим это в третьей части, когда рассмотрим шаблоны обмена сообщениями.

Правила действия

Каждое правило должно иметь фильтр; это то, что определяет условие правила. Но правило не должно иметь действия. Как упоминалось ранее, действие правила позволяет нам манипулировать свойствами (как системными, так и пользовательскими) сообщения, которое соответствует условию правила.

Как мы уже говорили ранее, правило является контейнером для фильтра и действия, и мы можем определить действие в нашем  Action свойстве нашего  RuleDescription:

SubscriptionDescription subscription = await _operations.CreateOrRetrieveSubscriptionAsync(topicPath, subscriptionName);

//MessagingFactory as demo'd in part 1
SubscriptionClient subscriptionClient = messagingFactory.CreateSubscriptionClient(subscription.TopicPath, subscription.Name);

RuleDescription ruleDescription = new RuleDescription("NeverGrowUpSongRule", new SqlFilter("RequestedSong = 'Never Grow Up'"));

await subscriptionClient.AddRuleAsync(ruleDescription);

Здесь мы создаем объект  RuleDescription , который отфильтровывает все сообщения с  RequestedSinger  со значением «Джастин Бибер» и меняет значение этого же свойства на «Taylor Swift», как определено в «SET RequestedSinger = ‘Taylor Swift» ». действие.

Если мы добавим это правило или как показано ниже, создайте подписку с этим правилом:

SubscriptionDescription newSubscriptionDescription = await _namespaceManager.CreateSubscriptionAsync(topicPath, subscriptionName, ruleDescription );


Затем отправьте сообщение, удовлетворяющее условию подписки (например, Filter = new SqlFilter («RequestedSinger = ‘Justin Bieber’»)

BrokeredMessage message = new BrokeredMessage();
message.Properties.Add("RequestedSinger", "Justin Bieber");

await messageSender.SendAsync(message);


Когда мы получим сообщение, мы обнаружим, что значение  пользовательского свойства RequestedSinger  больше не будет «Джастин Бибер», а «TaylorSwift»

BrokeredMessage msg = await messageReceiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false);
message.Properties["RequestedSinger"] // ="Taylor Swift"

Управление пользовательскими свойствами — это только одна из доступных функций Action. Добавление пользовательских свойств или изменение системных свойств также возможно. Представьте, что вам нужна подписка на все сообщения, которая перехватывает любые сообщения без определенного  RequestedSinger,  создает   свойство RequestedSinger и обновляет системное свойство  ContentType.

var ruleDescription = new RuleDescription("ChangeRequestedSinger")
{
    Filter = new SqlFilter("RequestedSinger IS NULL"),
    Action = new SqlRuleAction("SET sys.ContentType = 'audio/mpeg'; SET RequestedSinger = 'Taylor Swift'")
};

При наличии подписки с вышеуказанным  RuleDescriptionвсе отфильтрованные сообщения будут иметь свойство System,  ContentTypeобновленное на  audio / mpeg,  и создать новое настраиваемое свойство  RequestedSinger. 


Обратите внимание, что префикс «sys» в SQLRuleAction необходим для установки области действия свойства System.
По умолчанию для области задаются свойства пользователя (пользовательские свойства), которые обозначаются пользователем. <Имя_свойства>

Итак, как вы можете видеть, в действиях правил много силы. Вы можете узнать больше о синтаксисе и доступных опциях для действий на странице Microsoft  MSDN .

Вывод

Как вы можете ясно видеть, Темы — это большая «тема», которая неотделима от Подписок. Мы узнали, что две из самых мощных функций в разделах «Темы и подписки» — это возможность рассылать сообщения нескольким заинтересованным сторонам (подпискам), и эти стороны могут отфильтровывать, какие сообщения им особенно интересны. освещать тему Azure Service Bus. В заключительной части мы рассмотрим некоторые из более сложных функций, шаблонов, безопасности и лучших практик.