Статьи

Почему (и как) заменить Amazon SQS на MongoDB

Что такое Amazon SQS?

Amazon SQS (Simple Queue Service) — это надежная служба очереди сообщений, размещенная в облаке Amazon. Этот сервис идеально подходит для отправки сообщений между серверами, которым необходимо подтвердить, что обработка завершена. Когда сообщение извлекается из очереди, оно не удаляется, а помечается клиентом, который сделал запрос. Затем клиент отвечает за указание SQS удалить сообщение из очереди. Если клиент не удаляет сообщение, которое он вытолкнул в течение определенного периода времени, клиент теряет право собственности на сообщение, и оно становится доступным для других клиентов.

Как я это использую?

Одной из систем, для которой я использую SQS, является служба распределенной доставки электронной почты (с использованием SMTP). Поскольку для Java не существует асинхронного SMTP-клиента (о котором я знаю), я использую JavaMail для доставки сообщений. Отправка сообщений с помощью JavaMail довольно медленная и может занимать несколько секунд на каждое сообщение, причем поток обрабатывается для каждого отправленного сообщения. Чтобы отправлять множество сообщений параллельно, я решил поставить в очередь исходящие сообщения и запустить много экземпляров приложения SMTP. Этот подход очень прост и прекрасно масштабируется без необходимости реализации собственного асинхронного SMTP-клиента.

Так что не так с Amazon SQS?

Основная проблема с использованием SQS в вышеприведенном сценарии заключается в том, что я не могу поместить целое сообщение электронной почты в очередь SQS, поскольку каждое сообщение SQS ограничено 8 КБ данных. Чтобы обойти это, я сохраняю сообщение в MongoDB, а затем помещаю идентификатор сообщения в очередь в SQS. Затем каждому клиенту необходимо извлечь сообщение из очереди, а затем найти сообщение в Mongo. В этом подходе нет ничего плохого, но его можно сделать лучше, быстрее, проще и дешевле. Amazon взимает с меня долю цента за каждую операцию, которую я выполняю в SQS. Это не так много, но если у меня есть 10 SMTP-приложений, опрашивающих SQS 4 раза в секунду каждый день каждый день, независимо от того, есть ли новые сообщения для отправки, это может сложиться. Кроме того, у меня есть диагностические приложения, которые следят за размером очереди, чтобы узнать, нужно ли мне увеличивать количество экземпляров или уменьшать их.Даже если это составляет до 10 долларов в день, это все равно 3650 долларов в год только для рассылки электронной почты. Это слишком много для стартапа без финансовой поддержки!

Подход

Я уже давно пользуюсь MongoDB и восхищаюсь тем, что он может сделать. Я знаю, что он может хранить много данных без схемы в блоках по 4 МБ (документ ограничен 4 МБ) и может хранить большие файлы с помощью GridFS. Я знаю, что это молниеносно (почти memcached скорость) для индексированных поисков и может обрабатывать тысячи операций в секунду, даже не нагружая процессор более чем на 10%. Я знаю, что я уже плачу за процессор и место на жестком диске в Amazon EC2 и полностью наслаждаюсь минимизацией моих ежемесячных, еженедельных и даже ежедневных затрат. Мля. Мля. Мля. Я хочу реализовать это в Монго!

С введением серверного javascript и команды findAndModify использование MongoDB для очереди, доступ к которой может получить любой клиентский язык (из которых существует тонна!), Становится очень простым. Ниже приведен код, который я использую в своих проектах.

Код

sqs.js
function sqsQueueExists(name) {
    return db.queue[name].count() != 0;
};
 
function sqsQueueMessageCount(name) {
    return db.queue[name].count({
        alive: true,
        expires: {$lt: new Date}
    });
};
 
function sqsDeleteQueue(name) {
    db.queue[name].drop();
};
 
function sqsListQueues(prefix) {
    var regex;
    if (prefix)
        regex = new RegExp('^[^.]+\.queue\.' + prefix + '[^$]*$');
    else
        regex = /^[^.]+\.queue\.[^$]+$/;
 
    return db.system.namespaces.find({
        name: regex
    }).map(function (x) {
        return x.name.substring(x.name.indexOf('.') + 7);
    });
};
 
function sqsPushMessage(queue, message) {
    var _push = function(queue, message) {
        db.queue[queue].save({
            alive: true,
            expires: new Date(0),
            owner: new ObjectId('000000000000000000000000'),
            body: message
        });
    };
 
    if (message instanceof Array) {
        message.forEach(function(m) {
            _push(queue, m);
        });
    } else {
        _push(queue, message);
    }
};
 
function sqsPopMessage(queue, owner, count) {
    var now = new Date;
    // 10 second expiration, change this to what you want
    var expires = new Date(now.getTime() + 10000);
 
    if (!count) {
        count = 1;
    }
    var result = [];
    for (var i = 0; i < count; ++i) {
        var item = db.queue[queue].findAndModify({
            query: {
                alive: true,
                expires: {$lt: now}
            },
            update: {
                $set: {
                    expires: expires,
                    owner: owner
                }
            },
            new: true
        });
        if (friendlyEqual({}, item))
            break;
        result.push(item);
    }
    return result;
};
 
function sqsDeleteMessage(queue, owner, item_ids) {
    if (item_ids instanceof ObjectId)
        item_ids = [item_ids];
    db.queue[queue].update({
        alive: true,
        expires: {$gte: new Date},
        owner: owner,
        _id: {$in: item_ids}
    }, {
        $set: {
            alive: false
        }
    },
    false,
    true);
};

Если вы скопируете код в файл sqs.js, вы сможете запустить приведенный ниже скрипт, чтобы создать хранимые процедуры в выбранной вами базе данных. Альтернатива — написать код в драйвере MongoDB по вашему выбору.

удар

$ for function in sqsQueueExists sqsQueueMessageCount sqsDeleteQueue sqsListQueues sqsPushMessage
	sqsPopMessage sqsDeleteMessage; do
	echo "db.system.js.save({_id: '$function', value: $function})" |
		mongo [db name] --quiet --shell sqs.js
done

MongoDB Shell

> use [db name];
> load('sqs.js');
> [sqsQueueExists, sqsQueueMessageCount, sqsDeleteQueue, sqsListQueues, sqsPushMessage,
	sqsPopMessage, sqsDeleteMessage].forEach(function (x) {
	var name = x.toString().match(/^function\s(\w+)/)[1];
	db.system.js.save({_id: name, value: x});
});

Есть части API SQS, которые я упустил из этой реализации для простоты. Они основаны на предоставленных опциях, таких как изменение времени ожидания видимости очереди или отдельного сообщения. Это было бы довольно просто добавить в случае необходимости. Однако, если вы не используете эти функции, исключение их из кода только сделает код быстрее. Чтобы добавить параметры в очередь, просто добавьте другую коллекцию под названием queue, которая содержит все имена очередей (в поле _id) и соответствующие параметры. Затем просто сделайте запрос к коллекции очередей, когда понадобятся параметры (поле _id автоматически индексируется, так что это будет быстро).

Я надеюсь, что это помогает вам!

Источник: http://www.mattinsler.com/why-and-how-i-replaced-amazon-sqs-with-mongodb/