Статьи

EasyNetQ, простой .NET API для RabbitMQ


После обдумывания результатов нашей
перестрелки из
очереди сообщений мы решили работать с
Rabbit MQ . Кролик помечает все флажки, он поддерживается (в конечном итоге Spring Source, а затем и VMware), масштабируется и обладает необходимыми функциями и производительностью.
RabbitMQ.Client обеспечивается Spring Source тонкая оболочка , которая довольно точно выставляет
AMQP протокол, поэтому он ожидает сообщения в виде байтовых массивов.

Для тестов перестрелки было хорошо разбрызгивать массивы байтов, но в реальном мире мы хотим, чтобы наши сообщения имели тип .NET. Я также хотел предоставить разработчикам очень простой API, который абстрагирует модель AMQP Exchange / Binding / Queue и вместо этого предоставляет простую модель публикации / подписки и запроса / ответа. Моим вдохновением была отличная работа, проделанная Дрю Селлерсом и Крисом Паттерсоном с MassTransit (новая бета-версия V2.0 только что вышла).

Код находится на GitHub здесь:

https://github.com/mikehadlow/EasyNetQ

API сосредоточен вокруг интерфейса IBus, который выглядит следующим образом:

/// <summary>
/// Provides a simple Publish/Subscribe and Request/Response API for a message bus.
/// </summary>
public interface IBus : IDisposable
{
    /// <summary>
    /// Publishes a message.
    /// </summary>
    /// <typeparam name="T">The message type</typeparam>
    /// <param name="message">The message to publish</param>
    void Publish<T>(T message);

    /// <summary>
    /// Subscribes to a stream of messages that match a .NET type.
    /// </summary>
    /// <typeparam name="T">The type to subscribe to</typeparam>
    /// <param name="subscriptionId">
    /// A unique identifier for the subscription. Two subscriptions with the same subscriptionId
    /// and type will get messages delivered in turn. This is useful if you want multiple subscribers
    /// to load balance a subscription in a round-robin fashion.
    /// </param>
    /// <param name="onMessage">
    /// The action to run when a message arrives.
    /// </param>
    void Subscribe<T>(string subscriptionId, Action<T> onMessage);

    /// <summary>
    /// Makes an RPC style asynchronous request.
    /// </summary>
    /// <typeparam name="TRequest">The request type.</typeparam>
    /// <typeparam name="TResponse">The response type.</typeparam>
    /// <param name="request">The request message.</param>
    /// <param name="onResponse">The action to run when the response is received.</param>
    void Request<TRequest, TResponse>(TRequest request, Action<TResponse> onResponse);

    /// <summary>
    /// Responds to an RPC request.
    /// </summary>
    /// <typeparam name="TRequest">The request type.</typeparam>
    /// <typeparam name="TResponse">The response type.</typeparam>
    /// <param name="responder">
    /// A function to run when the request is received. It should return the response.
    /// </param>
    void Respond<TRequest, TResponse>(Func<TRequest, TResponse> responder);
}

Чтобы создать автобус, просто используйте RabbitHutch, извините, я не смог устоять перед этим ?

var bus = RabbitHutch.CreateRabbitBus("localhost");

Вы можете просто передать имя сервера для использования виртуального хоста Rabbit по умолчанию ‘/’, или вы можете указать именованный виртуальный хост следующим образом:

var bus = RabbitHutch.CreateRabbitBus("localhost/myVirtualHost");

Первым шаблоном обмена сообщениями, который я хотел поддержать, была публикация / подписка. Получив экземпляр автобуса, вы можете опубликовать следующее сообщение:

var message = new MyMessage {Text = "Hello!"};
bus.Publish(message);

Это публикует сообщение для обмена, названного типом сообщения.

Вы подписываетесь на сообщение, подобное этому:

bus.Subscribe<MyMessage>("test", message => Console.WriteLine(message.Text));

Это создает очередь с именем «test_ <тип сообщения>» и связывает ее с обменом типа сообщения. Когда сообщение получено, оно передается делегату Action <T>. Если у одного и того же типа сообщений с именем «test» более одного подписчика, Rabbit раздаст сообщения в циклическом порядке, поэтому вы получите простую балансировку нагрузки из коробки. Подписчики одного и того же типа сообщения, но с разными именами, каждый получит копию сообщения, как и следовало ожидать.

Второй шаблон обмена сообщениями — это асинхронный RPC. Вы можете вызвать удаленный сервис, например так:

var request = new TestRequestMessage {Text = "Hello from the client! "};

bus.Request<TestRequestMessage, TestResponseMessage>(request, response => 
    Console.WriteLine("Got response: '{0}'", response.Text));

Сначала создается новая временная очередь для TestResponseMessage. Затем он публикует TestRequestMessage с обратным адресом во временной очереди. Когда TestResponseMessage получен, он передает его делегату Action <T>. RabbitMQ успешно создает временные очереди и предоставляет заголовок обратного адреса, так что это было очень легко реализовать.

Написать сервер RPC. Просто используйте метод Respond следующим образом:

bus.Respond<TestRequestMessage, TestResponseMessage>(request => 
    new TestResponseMessage { Text = request.Text + " all done!" });

Это создает подписку для TestRequestMessage. Когда сообщение получено, делегат Func <TRequest, TResponse> передает запрос и возвращает ответ. Ответное сообщение затем публикуется во временную очередь клиента.

Еще раз, масштабирование серверов RPC — это просто вопрос запуска новых экземпляров. Кролик автоматически раздает им сообщения.

Функции AMQP (и Rabbit) делают создание этого вида API легким. Проверьте это и дайте мне знать, что вы думаете.