Статьи

RabbitMQ, серверы подписки и подпрыгивания в EasyNetQ


Если вы регулярно читаете мой блог, вы знаете, что в настоящее время я работаю над дружественным .NET API для RabbitMQ,
EasyNetQ . EasyNetQ — программное обеспечение. Он устраняет большую часть сложности AMQP и заменяет его простым интерфейсом, который использует систему типов .NET для маршрутизации сообщений.

Одна из вещей, которую я хочу удалить из «пространства приложения» и протолкнуть в API, — это все, что нужно для создания отчетов и обработки ошибок. Одной из сторон этого является предоставление инфраструктуры для записи и обработки исключений, создаваемых приложениями, использующими EasyNetQ. Я расскажу об этом в следующем посте. Другое соображение, которое я хочу рассмотреть в этом посте, заключается в том, как EasyNetQ должен корректно обрабатывать сетевое соединение или сбой сервера.

В Заблуждения распределенных вычислений не говорят нам , что, независимо от того , каким образом может быть надежным RabbitMQ и платформа Erlang, еще будет время , когда сервер RabbitMQ уйдет по какой — либо причине.

Одной из проблем программирования в системе обмена сообщениями по сравнению с реляционной базой данных является длительность времени, в течение которого приложение поддерживает соединения открытыми. Типичное соединение с базой данных открывается, над ним выполняется какая-то операция — выбор, вставка, обновление и т. Д. — и затем оно закрывается. Подписки системы обмена сообщениями, однако, требуют, чтобы клиент или подписчик имели открытое соединение в течение всего срока службы приложения.

Если вы просто запрограммируете использование низкоуровневого API CQ AMQP, предоставляемого RabbitHQ, для создания простой подписки, вы заметите, что после отказа сервера RabbitMQ подписка больше не работает. Это связано с тем, что канал, который вы открыли для подписки на очередь, и циклы потребления, связанные с ними, больше не действительны. Вам необходимо обнаружить закрытый канал и затем попытаться восстановить подписку, как только сервер снова станет доступен.

Отличный RabbitMQ в действии Videla и Williams описывает, как это сделать, в главе 6 «Написание кода, который выживает после сбоя». Вот их пример кода Python:

rabbit_mq_in_action_failure_detecting_subscriber

EasyNetQ должен сделать нечто подобное, но в качестве общего решения, чтобы все подписчики автоматически получали повторную подписку после отказа сервера.

Вот как это работает.

Во-первых, все подписки создаются в закрытии:

public void Subscribe<T>(string subscriptionId, Action<T> onMessage)
{
    if (onMessage == null)
    {
        throw new ArgumentNullException("onMessage");
    }

    var typeName = serializeType(typeof(T));
    var subscriptionQueue = string.Format("{0}_{1}", subscriptionId, typeName);

    Action subscribeAction = () =>
    {
        var channel = connection.CreateModel();
        DeclarePublishExchange(channel, typeName);

        var queue = channel.QueueDeclare(
            subscriptionQueue,  // queue
            true,               // durable
            false,              // exclusive
            false,              // autoDelete
            null);              // arguments

        channel.QueueBind(queue, typeName, typeName);  

        var consumer = consumerFactory.CreateConsumer(channel, 
            (consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body) =>
            {
                var message = serializer.BytesToMessage<T>(body);
                onMessage(message);
            });

        channel.BasicConsume(
            subscriptionQueue,      // queue
            true,                   // noAck 
            consumer.ConsumerTag,   // consumerTag
            consumer);              // consumer
    };

    connection.AddSubscriptionAction(subscribeAction);
}

Строка connection.AddSubscriptionAction (subscribeAction) передает замыкание в класс PersistentConnection, который оборачивает соединение AMQP и предоставляет весь код обнаружения отключения и повторной подписки. Вот действие AddSubscriptionAction:

public void AddSubscriptionAction(Action subscriptionAction)
{
    if (IsConnected) subscriptionAction();
    subscribeActions.Add(subscriptionAction);
}

Если есть открытое соединение, он сразу запускает подписку. Он также сохраняет закрытие подписки в списке <Action>.

Когда соединение по какой-либо причине закрывается, возникает событие AMQP ConnectionShutdown, которое запускает метод OnConnectionShutdown:

void OnConnectionShutdown(IConnection _, ShutdownEventArgs reason)
{
    if (disposed) return;
    if (Disconnected != null) Disconnected();

    Thread.Sleep(100);
    TryToConnect();
}

Мы немного подождем, а затем попробуем восстановить соединение:

void TryToConnect()
{
    ThreadPool.QueueUserWorkItem(state =>
    {
        while (connection == null || !connection.IsOpen)
        {
            try
            {
                connection = connectionFactory.CreateConnection();
                connection.ConnectionShutdown += OnConnectionShutdown;

                if (Connected != null) Connected();
            }
            catch (RabbitMQ.Client.Exceptions.BrokerUnreachableException)
            {
                Thread.Sleep(100);
            }
        }
        foreach (var subscribeAction in subscribeActions)
        {
            subscribeAction();
        }
    });
}

Это раскручивает поток, который просто зацикливается при попытке подключиться к серверу. Как только соединение установлено, оно запускает все сохраненные закрытия подписки (subscribeActions).

В моих тестах это решение работало очень хорошо. Мои клиенты автоматически повторно подписываются на те же очереди и продолжают получать сообщения. Однако одним из основных мотивов написания этого поста была попытка получить обратную связь, поэтому, если вы использовали RabbitMQ с .NET, я хотел бы услышать о ваших впечатлениях и особенно о комментариях по поводу моего кода или о том, как вы решили Эта проблема.

Код EasyNetQ доступен на GitHub . Это все еще очень рано и никоим образом не готово к производству. Вы были предупреждены.