Что такое Amazon SQS?
Amazon SQS (Simple Queue Service) — это надежная служба очереди сообщений, размещенная в облаке Amazon. Этот сервис идеально подходит для отправки сообщений между серверами, которым необходимо подтвердить, что обработка завершена. Когда сообщение извлекается из очереди, оно не удаляется, а помечается клиентом, который сделал запрос. Затем клиент отвечает за указание SQS удалить сообщение из очереди. Если клиент не удаляет сообщение, которое он вытолкнул в течение определенного периода времени, клиент теряет право собственности на сообщение, и оно становится доступным для других клиентов.
Как я это использую?
Одной из систем, для которой я использую SQS, является служба распределенной доставки электронной почты (с использованием SMTP). Поскольку для Java не существует асинхронного SMTP-клиента (о котором я знаю), я использую JavaMail для доставки сообщений. Отправка сообщений с помощью JavaMail довольно медленная и может занимать несколько секунд на каждое сообщение, причем поток обрабатывается для каждого отправленного сообщения. Чтобы отправлять множество сообщений параллельно, я решил поставить в очередь исходящие сообщения и запустить много экземпляров приложения SMTP. Этот подход очень прост и прекрасно масштабируется без необходимости реализации собственного асинхронного SMTP-клиента.
Так что не так с Amazon SQS?
Основная проблема с использованием SQS в вышеприведенном сценарии заключается в том, что я не могу поместить целое сообщение электронной почты в очередь SQS, поскольку каждое сообщение SQS ограничено 8 КБ данных. Чтобы обойти это, я сохраняю сообщение в MongoDB, а затем помещаю идентификатор сообщения в очередь в SQS. Затем каждому клиенту необходимо извлечь сообщение из очереди, а затем найти сообщение в Mongo. В этом подходе нет ничего плохого, но его можно сделать лучше, быстрее, проще и дешевле. Amazon взимает с меня долю цента за каждую операцию, которую я выполняю в SQS. Это не так много, но если у меня есть 10 SMTP-приложений, опрашивающих SQS 4 раза в секунду каждый день каждый день, независимо от того, есть ли новые сообщения для отправки, это может сложиться. Кроме того, у меня есть диагностические приложения, которые следят за размером очереди, чтобы узнать, нужно ли мне увеличивать количество экземпляров или уменьшать их.Даже если это составляет до 10 долларов в день, это все равно 3650 долларов в год только для рассылки электронной почты. Это слишком много для стартапа без финансовой поддержки!
Подход
Я уже давно пользуюсь MongoDB и восхищаюсь тем, что он может сделать. Я знаю, что он может хранить много данных без схемы в блоках по 4 МБ (документ ограничен 4 МБ) и может хранить большие файлы с помощью GridFS. Я знаю, что это молниеносно (почти memcached скорость) для индексированных поисков и может обрабатывать тысячи операций в секунду, даже не нагружая процессор более чем на 10%. Я знаю, что я уже плачу за процессор и место на жестком диске в Amazon EC2 и полностью наслаждаюсь минимизацией моих ежемесячных, еженедельных и даже ежедневных затрат. Мля. Мля. Мля. Я хочу реализовать это в Монго!
С введением серверного javascript и команды findAndModify использование MongoDB для очереди, доступ к которой может получить любой клиентский язык (из которых существует тонна!), Становится очень простым. Ниже приведен код, который я использую в своих проектах.
Код
sqs.js
function sqsQueueExists(name) {
return db.queue[name].count() != 0;
};
function sqsQueueMessageCount(name) {
return db.queue[name].count({
alive: true,
expires: {$lt: new Date}
});
};
function sqsDeleteQueue(name) {
db.queue[name].drop();
};
function sqsListQueues(prefix) {
var regex;
if (prefix)
regex = new RegExp('^[^.]+\.queue\.' + prefix + '[^$]*$');
else
regex = /^[^.]+\.queue\.[^$]+$/;
return db.system.namespaces.find({
name: regex
}).map(function (x) {
return x.name.substring(x.name.indexOf('.') + 7);
});
};
function sqsPushMessage(queue, message) {
var _push = function(queue, message) {
db.queue[queue].save({
alive: true,
expires: new Date(0),
owner: new ObjectId('000000000000000000000000'),
body: message
});
};
if (message instanceof Array) {
message.forEach(function(m) {
_push(queue, m);
});
} else {
_push(queue, message);
}
};
function sqsPopMessage(queue, owner, count) {
var now = new Date;
// 10 second expiration, change this to what you want
var expires = new Date(now.getTime() + 10000);
if (!count) {
count = 1;
}
var result = [];
for (var i = 0; i < count; ++i) {
var item = db.queue[queue].findAndModify({
query: {
alive: true,
expires: {$lt: now}
},
update: {
$set: {
expires: expires,
owner: owner
}
},
new: true
});
if (friendlyEqual({}, item))
break;
result.push(item);
}
return result;
};
function sqsDeleteMessage(queue, owner, item_ids) {
if (item_ids instanceof ObjectId)
item_ids = [item_ids];
db.queue[queue].update({
alive: true,
expires: {$gte: new Date},
owner: owner,
_id: {$in: item_ids}
}, {
$set: {
alive: false
}
},
false,
true);
};
Если вы скопируете код в файл sqs.js, вы сможете запустить приведенный ниже скрипт, чтобы создать хранимые процедуры в выбранной вами базе данных. Альтернатива — написать код в драйвере MongoDB по вашему выбору.
удар
$ for function in sqsQueueExists sqsQueueMessageCount sqsDeleteQueue sqsListQueues sqsPushMessage
sqsPopMessage sqsDeleteMessage; do
echo "db.system.js.save({_id: '$function', value: $function})" |
mongo [db name] --quiet --shell sqs.js
done
MongoDB Shell
> use [db name];
> load('sqs.js');
> [sqsQueueExists, sqsQueueMessageCount, sqsDeleteQueue, sqsListQueues, sqsPushMessage,
sqsPopMessage, sqsDeleteMessage].forEach(function (x) {
var name = x.toString().match(/^function\s(\w+)/)[1];
db.system.js.save({_id: name, value: x});
});
Есть части API SQS, которые я упустил из этой реализации для простоты. Они основаны на предоставленных опциях, таких как изменение времени ожидания видимости очереди или отдельного сообщения. Это было бы довольно просто добавить в случае необходимости. Однако, если вы не используете эти функции, исключение их из кода только сделает код быстрее. Чтобы добавить параметры в очередь, просто добавьте другую коллекцию под названием queue, которая содержит все имена очередей (в поле _id) и соответствующие параметры. Затем просто сделайте запрос к коллекции очередей, когда понадобятся параметры (поле _id автоматически индексируется, так что это будет быстро).
Я надеюсь, что это помогает вам!
Источник: http://www.mattinsler.com/why-and-how-i-replaced-amazon-sqs-with-mongodb/