Что такое Amazon SQS?
Amazon SQS (Simple Queue Service) — это надежная служба очереди сообщений, размещенная в облаке Amazon. Этот сервис идеально подходит для отправки сообщений между серверами, которым необходимо подтвердить, что обработка завершена. Когда сообщение извлекается из очереди, оно не удаляется, а помечается клиентом, который сделал запрос. Затем клиент отвечает за указание SQS удалить сообщение из очереди. Если клиент не удаляет сообщение, которое он вытолкнул в течение определенного периода времени, клиент теряет право собственности на сообщение, и оно становится доступным для других клиентов.
Как я это использую?
Одной из систем, для которой я использую SQS, является служба распределенной доставки электронной почты (с использованием SMTP). Поскольку для Java не существует асинхронного SMTP-клиента (о котором я знаю), я использую JavaMail для доставки сообщений. Отправка сообщений с помощью JavaMail довольно медленная и может занимать несколько секунд на каждое сообщение, причем поток обрабатывается для каждого отправленного сообщения. Чтобы отправлять множество сообщений параллельно, я решил поставить в очередь исходящие сообщения и запустить много экземпляров приложения SMTP. Этот подход очень прост и прекрасно масштабируется без необходимости реализации собственного асинхронного SMTP-клиента.
Так что не так с Amazon SQS?
Основная проблема с использованием SQS в вышеприведенном сценарии заключается в том, что я не могу поместить целое сообщение электронной почты в очередь SQS, поскольку каждое сообщение SQS ограничено 8 КБ данных. Чтобы обойти это, я сохраняю сообщение в MongoDB, а затем помещаю идентификатор сообщения в очередь в SQS. Затем каждому клиенту необходимо извлечь сообщение из очереди, а затем найти сообщение в Mongo. В этом подходе нет ничего плохого, но его можно сделать лучше, быстрее, проще и дешевле. Amazon взимает с меня долю цента за каждую операцию, которую я выполняю в SQS. Это не так много, но если у меня есть 10 SMTP-приложений, опрашивающих SQS 4 раза в секунду каждый день каждый день, независимо от того, есть ли новые сообщения для отправки, это может сложиться. Кроме того, у меня есть диагностические приложения, которые следят за размером очереди, чтобы узнать, нужно ли мне увеличивать количество экземпляров или уменьшать их.Даже если это составляет до 10 долларов в день, это все равно 3650 долларов в год только для рассылки электронной почты. Это слишком много для стартапа без финансовой поддержки!
Подход
Я уже давно пользуюсь MongoDB и восхищаюсь тем, что он может сделать. Я знаю, что он может хранить много данных без схемы в блоках по 4 МБ (документ ограничен 4 МБ) и может хранить большие файлы с помощью GridFS. Я знаю, что это молниеносно (почти memcached скорость) для индексированных поисков и может обрабатывать тысячи операций в секунду, даже не нагружая процессор более чем на 10%. Я знаю, что я уже плачу за процессор и место на жестком диске в Amazon EC2 и полностью наслаждаюсь минимизацией моих ежемесячных, еженедельных и даже ежедневных затрат. Мля. Мля. Мля. Я хочу реализовать это в Монго!
С введением серверного javascript и команды findAndModify использование MongoDB для очереди, доступ к которой может получить любой клиентский язык (из которых существует тонна!), Становится очень простым. Ниже приведен код, который я использую в своих проектах.
Код
sqs.js function sqsQueueExists(name) { return db.queue[name].count() != 0; }; function sqsQueueMessageCount(name) { return db.queue[name].count({ alive: true, expires: {$lt: new Date} }); }; function sqsDeleteQueue(name) { db.queue[name].drop(); }; function sqsListQueues(prefix) { var regex; if (prefix) regex = new RegExp('^[^.]+\.queue\.' + prefix + '[^$]*$'); else regex = /^[^.]+\.queue\.[^$]+$/; return db.system.namespaces.find({ name: regex }).map(function (x) { return x.name.substring(x.name.indexOf('.') + 7); }); }; function sqsPushMessage(queue, message) { var _push = function(queue, message) { db.queue[queue].save({ alive: true, expires: new Date(0), owner: new ObjectId('000000000000000000000000'), body: message }); }; if (message instanceof Array) { message.forEach(function(m) { _push(queue, m); }); } else { _push(queue, message); } }; function sqsPopMessage(queue, owner, count) { var now = new Date; // 10 second expiration, change this to what you want var expires = new Date(now.getTime() + 10000); if (!count) { count = 1; } var result = []; for (var i = 0; i < count; ++i) { var item = db.queue[queue].findAndModify({ query: { alive: true, expires: {$lt: now} }, update: { $set: { expires: expires, owner: owner } }, new: true }); if (friendlyEqual({}, item)) break; result.push(item); } return result; }; function sqsDeleteMessage(queue, owner, item_ids) { if (item_ids instanceof ObjectId) item_ids = [item_ids]; db.queue[queue].update({ alive: true, expires: {$gte: new Date}, owner: owner, _id: {$in: item_ids} }, { $set: { alive: false } }, false, true); };
Если вы скопируете код в файл sqs.js, вы сможете запустить приведенный ниже скрипт, чтобы создать хранимые процедуры в выбранной вами базе данных. Альтернатива — написать код в драйвере MongoDB по вашему выбору.
удар
$ for function in sqsQueueExists sqsQueueMessageCount sqsDeleteQueue sqsListQueues sqsPushMessage sqsPopMessage sqsDeleteMessage; do echo "db.system.js.save({_id: '$function', value: $function})" | mongo [db name] --quiet --shell sqs.js done
MongoDB Shell
> use [db name]; > load('sqs.js'); > [sqsQueueExists, sqsQueueMessageCount, sqsDeleteQueue, sqsListQueues, sqsPushMessage, sqsPopMessage, sqsDeleteMessage].forEach(function (x) { var name = x.toString().match(/^function\s(\w+)/)[1]; db.system.js.save({_id: name, value: x}); });
Есть части API SQS, которые я упустил из этой реализации для простоты. Они основаны на предоставленных опциях, таких как изменение времени ожидания видимости очереди или отдельного сообщения. Это было бы довольно просто добавить в случае необходимости. Однако, если вы не используете эти функции, исключение их из кода только сделает код быстрее. Чтобы добавить параметры в очередь, просто добавьте другую коллекцию под названием queue, которая содержит все имена очередей (в поле _id) и соответствующие параметры. Затем просто сделайте запрос к коллекции очередей, когда понадобятся параметры (поле _id автоматически индексируется, так что это будет быстро).
Я надеюсь, что это помогает вам!
Источник: http://www.mattinsler.com/why-and-how-i-replaced-amazon-sqs-with-mongodb/