Статьи

Создание чистилища: использование Node.js для создания отложенной очереди

На Segment.io мы ежедневно имеем дело с большим количеством важных пользовательских данных. Следовательно, наши главные приоритеты в том, чтобы мы не теряли данные вашего посетителя и чтобы наш входящий API оставался доступным в любое время.

Как вы можете догадаться, все входящие данные должны быть проверены по нашей базе данных. Наши API-серверы поддерживают соединение с БД, чтобы проверить, что входящие запросы действительно хороши.

Если что-либо в запросе не проверяется с БД, мы постараемся сразу же вернуть правильный код ошибки HTTP.

Мы не сразу ставим сообщение в очередь, потому что хотим помочь нашим пользователям во время интеграции. Если пользователь неправильно набирает свой ключ API, мы хотим предупредить его как можно скорее. Таким образом, они не продвигаются к производству, только чтобы узнать, что через две недели они не записали никаких данных.

Поэтому возникает вопрос: «Если Segment.io выполняет запрос по отслеживаемому событию, что происходит, когда умирает вся база данных?» Мы, очевидно, не можем перебрасывать 500s повсюду или сбрасывать данные наших клиентов из-за внутренней проблемы. Хотя мы кешируем определенные данные, мы хотим обеспечить их доступность, даже если срок действия записи из кеша истек.

Введите Чистилище

Чистилище предоставляет место для хранения сообщений, которые будут воспроизведены позже. Это небольшой модуль, который предоставляет хороший набор примитивов для публикации и использования отложенной очереди. Мы используем RabbitMQ и node-amqp , но эти принципы можно адаптировать к любой платформе.

Издательский

Чтобы увидеть, как отправлять сообщения в чистилище, давайте вернемся к нашему случаю сбоя, когда база данных выходит из строя. В случае потери связи, мы должны пойти дальше и опубликовать в чистилище. Мы даем клиенту преимущество сомнения и предполагаем, что его запрос был хорошим, возвращая 200немедленно.

exports.ingest = function (data, options, callback) {
    validate(data, function (err, result) {
        if (err) {
            // If it's a connection problem, purgatory it
            if (connection.isUnavailable(err)) {
                purgatory.publish('my_key', data);
                return callback(); // end client request
            }
    [...]
}

Всякий раз, когда связь недоступна, мы публикуем в чистилище. Мы предполагаем, что пока никто не отправляет нам плохие данные.

Подписавшись

Это все хорошо, мы можем публиковать наши сообщения в альтернативной очереди. Итак, как нам указать, как вытащить сообщения из очереди?

Самый простой способ сделать это — предоставить обработчик сообщений. Всякий раз, когда мы проверяем, может ли сообщение быть обработано повторно, вызывается обработчик и решает, как обработать сообщение. Это выглядит примерно так:

/**
 * Subscribes a message handler for the given key
 * @param  {String}   key     the queue topic key
 * @param  {Function} handler ({ message      : message,
 *                               headers      : headers,
 *                               deliveryInfo : deliveryInfo },
 *                              callback (err) });
 */
purgatory.subscribe('my_key', function (delivery, callback) {
    var data = delivery.message;
    validate(data, function (err, cleaned) {
        if (err) {
            if (connection.isUnavailable(err)) return callback(err);
            else return callback(); // don't re-queue if it's a validation error
        } else {
    [...]
});

Обработчик перезванивает с ошибкой, только если сообщение должно быть поставлено в очередь. В нашем случае мы не возвращаем ошибку, если есть проблема с проверкой — эти сообщения все равно должны быть выброшены!

Реализация

Публикация сообщений в чистилище — простая задача. В любом месте вашего кода, где вы хотите, чтобы сообщение было обработано повторно, вы можете опубликовать его на чистилище.

Наиболее интересные компоненты чистилища происходят от обработки сообщений в очереди. Для обработки сообщений модуль чистилища должен удовлетворять трем требованиям:

  1. отслеживать различные обработчики очереди по ключу
  2. обрабатывать сообщения в очереди
  3. повторно поместить сообщения в очередь, если база данных все еще не работает

Я разместил полный исходный код в гисте , но я также разбью части кода обработки сообщений здесь.

Обработчики очереди

Чтобы отслеживать обработчики очереди, мы храним ключи сопоставления объектов с их обработчиками. Функция подписки выглядит так:

var handlers = {};

exports.subscribe = function (key, handler) {

    var currentHandler = handlers[key];
    handlers[key] = handler;

    // If we are already bound and have set the new handler, return
    if (currentHandler)
        return;

    // Otherwise set up the queue
    declareQueue(rabbit, key);
};

Закрытая переменная позволяет нам объявлять новые обработчики очереди в зависимости от поведения приложения без необходимости повторного подключения к очереди.

Обработка и постановка в очередь

Чтобы увидеть, как чистилище обрабатывает эти сообщения в очереди, давайте посмотрим на declareQueueфункцию:

var declareQueue = function (client, key) {

    // Create our queue
    client.queue(queueName(key), function (queue) {

        queue.bind('purgatory', key);

        queue.subscribe({ ack : true },
                        function (message, headers, deliveryInfo) {

            var delivery = { message      : message,
                             headers      : headers,
                             deliveryInfo : deliveryInfo };

            var handlerFn = handlers[key];

            handlerFn(delivery, function (err) {

                if (err) {
                    process.nextTick(function () {
                                        requeue(queue, key, message); });
                } else {
                    process.nextTick(function () { queue.shift(); });
                }
            });
        });
    });
}; 

Вы заметите, что очередь объявляется только один раз, но каждый раз проверяет обработчики в случае изменения функции обработчика. Когда очередь подписана, мы устанавливаем ackопцию, чтобы сообщения отбирались из очереди только после их подтверждения. Это гарантирует, что функция не вращается непрерывно, пока база данных не работает.

Если бы мы хотели повысить производительность, мы могли бы даже установить окно взлома . Для простоты моя текущая реализация только подтверждает сообщения по одному.

Если обработчик обнаруживает ошибку, указывающую на проблему с соединением, сообщение помещается обратно в очередь. Затем метод Requeue устанавливает тайм-аут на 10 секунд, прежде чем получить подтверждение и попытаться снова обработать сообщение.

Если ошибки не возникает, чистилище может сразу подтвердить сообщение и пройти через все сообщения в очереди.

Следующие шаги

При использовании чистилища мы помещаем в очередь сообщения, если есть проблемы с подключением к базе данных, но этот подход может быть распространен практически на все типы услуг. Сам модуль не зависит от типов ошибок соединения, которые могут возникнуть.

Наша будущая цель состоит в том, чтобы все наши службы, выполняющие проверку, были поддержаны вспомогательными очередями. Даже если одна очередь выйдет из строя, сервер должен иметь возможность циклического перебора между другими доступными очередями, чтобы в итоге выполнить запрос.

Если будет достаточно интереса, я с удовольствием опубликую весь код в виде отдельного модуля на npm. Наш вариант использования может быть конкретным, но мы нашли модуль чистилища чрезвычайно полезным на Segment.io . Если вы нашли другие методы обеспечения отказоустойчивости между различными частями инфраструктуры, я хотел бы услышать о них.