После обдумывания результатов нашей перестрелки из
очереди сообщений мы решили работать с
Rabbit MQ . Кролик помечает все флажки, он поддерживается (в конечном итоге Spring Source, а затем и VMware), масштабируется и обладает необходимыми функциями и производительностью.
RabbitMQ.Client обеспечивается Spring Source тонкая оболочка , которая довольно точно выставляет
AMQP протокол, поэтому он ожидает сообщения в виде байтовых массивов.
Для тестов перестрелки было хорошо разбрызгивать массивы байтов, но в реальном мире мы хотим, чтобы наши сообщения имели тип .NET. Я также хотел предоставить разработчикам очень простой API, который абстрагирует модель AMQP Exchange / Binding / Queue и вместо этого предоставляет простую модель публикации / подписки и запроса / ответа. Моим вдохновением была отличная работа, проделанная Дрю Селлерсом и Крисом Паттерсоном с MassTransit (новая бета-версия V2.0 только что вышла).
Код находится на GitHub здесь:
https://github.com/mikehadlow/EasyNetQ
API сосредоточен вокруг интерфейса IBus, который выглядит следующим образом:
/// <summary> /// Provides a simple Publish/Subscribe and Request/Response API for a message bus. /// </summary> public interface IBus : IDisposable { /// <summary> /// Publishes a message. /// </summary> /// <typeparam name="T">The message type</typeparam> /// <param name="message">The message to publish</param> void Publish<T>(T message); /// <summary> /// Subscribes to a stream of messages that match a .NET type. /// </summary> /// <typeparam name="T">The type to subscribe to</typeparam> /// <param name="subscriptionId"> /// A unique identifier for the subscription. Two subscriptions with the same subscriptionId /// and type will get messages delivered in turn. This is useful if you want multiple subscribers /// to load balance a subscription in a round-robin fashion. /// </param> /// <param name="onMessage"> /// The action to run when a message arrives. /// </param> void Subscribe<T>(string subscriptionId, Action<T> onMessage); /// <summary> /// Makes an RPC style asynchronous request. /// </summary> /// <typeparam name="TRequest">The request type.</typeparam> /// <typeparam name="TResponse">The response type.</typeparam> /// <param name="request">The request message.</param> /// <param name="onResponse">The action to run when the response is received.</param> void Request<TRequest, TResponse>(TRequest request, Action<TResponse> onResponse); /// <summary> /// Responds to an RPC request. /// </summary> /// <typeparam name="TRequest">The request type.</typeparam> /// <typeparam name="TResponse">The response type.</typeparam> /// <param name="responder"> /// A function to run when the request is received. It should return the response. /// </param> void Respond<TRequest, TResponse>(Func<TRequest, TResponse> responder); }
Чтобы создать автобус, просто используйте RabbitHutch, извините, я не смог устоять перед этим ?
var bus = RabbitHutch.CreateRabbitBus("localhost");
Вы можете просто передать имя сервера для использования виртуального хоста Rabbit по умолчанию ‘/’, или вы можете указать именованный виртуальный хост следующим образом:
var bus = RabbitHutch.CreateRabbitBus("localhost/myVirtualHost");
Первым шаблоном обмена сообщениями, который я хотел поддержать, была публикация / подписка. Получив экземпляр автобуса, вы можете опубликовать следующее сообщение:
var message = new MyMessage {Text = "Hello!"}; bus.Publish(message);
Это публикует сообщение для обмена, названного типом сообщения.
Вы подписываетесь на сообщение, подобное этому:
bus.Subscribe<MyMessage>("test", message => Console.WriteLine(message.Text));
Это создает очередь с именем «test_ <тип сообщения>» и связывает ее с обменом типа сообщения. Когда сообщение получено, оно передается делегату Action <T>. Если у одного и того же типа сообщений с именем «test» более одного подписчика, Rabbit раздаст сообщения в циклическом порядке, поэтому вы получите простую балансировку нагрузки из коробки. Подписчики одного и того же типа сообщения, но с разными именами, каждый получит копию сообщения, как и следовало ожидать.
Второй шаблон обмена сообщениями — это асинхронный RPC. Вы можете вызвать удаленный сервис, например так:
var request = new TestRequestMessage {Text = "Hello from the client! "}; bus.Request<TestRequestMessage, TestResponseMessage>(request, response => Console.WriteLine("Got response: '{0}'", response.Text));
Сначала создается новая временная очередь для TestResponseMessage. Затем он публикует TestRequestMessage с обратным адресом во временной очереди. Когда TestResponseMessage получен, он передает его делегату Action <T>. RabbitMQ успешно создает временные очереди и предоставляет заголовок обратного адреса, так что это было очень легко реализовать.
Написать сервер RPC. Просто используйте метод Respond следующим образом:
bus.Respond<TestRequestMessage, TestResponseMessage>(request => new TestResponseMessage { Text = request.Text + " all done!" });
Это создает подписку для TestRequestMessage. Когда сообщение получено, делегат Func <TRequest, TResponse> передает запрос и возвращает ответ. Ответное сообщение затем публикуется во временную очередь клиента.
Еще раз, масштабирование серверов RPC — это просто вопрос запуска новых экземпляров. Кролик автоматически раздает им сообщения.
Функции AMQP (и Rabbit) делают создание этого вида API легким. Проверьте это и дайте мне знать, что вы думаете.