Когда дело доходит до очередей, независимо от того, реализованы ли они в виде JMS , таблиц базы данных (то есть, что Ruby использует Delayed :: Job для очереди) или даже SQS Amazon , наиболее распространенным показателем, используемым для оценки состояния очереди, является его длина. , По сути, получают показатель эффективности на основе того, сколько сообщений находится в очереди в любой момент времени. Если сообщений всего несколько, очередь работает эффективно. Если есть многочисленные сообщения, вещи неэффективны, и тревоги должны быть озвучены. 
Но что, если вы находитесь в постоянно занятой среде с экстремальными всплесками, где очереди имеют тенденцию быстро заполняться? Если у вас уже работает достаточно рабочих, чтобы справиться с этим взрывом, вам нужно больше разжигать?
Вы можете уволить больше работников, но это может стоить вам. То есть вам, возможно, придется предоставить новые рабочие экземпляры, такие как рабочие док-станции Heroku или AMI AMS, которые в конечном итоге будут стоить вам ощутимых денег. И иногда для запуска этих рабочих экземпляров требуется несколько минут, и когда они работают, всплеск активности заканчивается, и очередь возвращается в нормальное состояние — первоначально доступные рабочие справились с нагрузкой адекватно.
Оказывается, длина очереди была запаздывающим индикатором. Вы раскручивали ненужные ресурсы. Ложная сигнализация!
Если у вас уже есть достаточная емкость для обработки потока сообщений в очереди, то мониторинг длины очереди не слишком полезен. На самом деле это вводящий в заблуждение показатель, который может привести к ненужным действиям.
Следовательно, длина очереди не является показателем эффективности системы, когда там уже достаточно рабочих. Скорее, показатель, который означает что-то в среде с высокой пропускной способностью, показывает, как долго сообщение находится в очереди . Это действенный показатель: если сообщения помещаются в очередь, ожидающую обработки, вам нужно больше процессоров!
Moo по длине очереди и пусть время ожидания в очереди
По умолчанию SQS от Amazon не позволяет запрашивать, как долго сообщение находилось в очереди. Поэтому я написал Му .
Moo предоставляет клиентам интерфейс для получения и выполнения действий над временем сообщения в метрике очереди. Это делается путем добавления сообщения SQS к отметке времени. Эта отметка времени затем проверяется, когда сообщение извлекается из очереди SQS. Если пороговая разница превышена, вызывается обратный вызов.
Пользователи Moo найдут его использование похожим на Ahoy! , который является фасадом с асинхронным обратным вызовом поверх Java SDK AWS . На самом деле, Му использует Ахой! внизу, с добавленной функцией присоединения асинхронного обратного вызова «максимальное время в очереди».
Moo поддерживает множественное время в порогах очереди, а установка максимального времени в пороге очереди выполняется следующим образом:
Добавление максимального порога времени в очереди
|
1
2
3
4
5
6
7
|
//adds a 1 second max thresholdsqs.addQueueWaitTimeCallback(1000, new QueueWaitTimeCallback() { public void onThresholdExceeded(long waitTime) { //waitTime is the actual time in queue //do something... like fire off a web hook, etc }}); |
Обратите внимание, что метод addQueueWaitTimeCallback занимает максимальное время в миллисекундах в значении очереди и сопровождающую QueueWaitTimeCallback обратного вызова QueueWaitTimeCallback . Метод onThresholdExceeded будет вызываться асинхронно во время получения сообщения, если превышено максимальное пороговое значение; более того, onThresholdExceeded получит в качестве параметра фактическое время ожидания очереди.
Покажи мне Му
Чтобы запустить экземпляр Moo, у вас есть несколько вариантов, включая настройку экземпляра AmazonSQS AWS или просто передачу ключа, секрета и имени очереди следующим образом:
Добавление максимального порога времени в очереди
|
1
|
SQS sqs = new SQS(System.getProperty("key"), System.getProperty("secret"), System.getProperty("queue")); |
Затем вы можете присоединить ноль ко многим экземплярам QueueWaitTimeCallback например так:
Добавление максимального порога времени в очереди
|
1
2
3
4
5
|
sqs.addQueueWaitTimeCallback(600000, new QueueWaitTimeCallback() { public void onThresholdExceeded(long actualWaitTime) { //do something -- fire off SNS message? }}); |
В этом случае я добавил функцию обратного вызова, которая будет вызываться, если сообщения находятся в очереди более 10 минут. Обратите внимание, что эти QueueWaitTimeCallback вызовы QueueWaitTimeCallback экземпляром средства чтения очереди ; соответственно, QueueWaitTimeCallback может, конечно, QueueWaitTimeCallback больше экземпляров себя.
Вот пример документа JSON, который вы можете добавить в очередь SQS:
Добавление максимального порога времени в очереди
|
1
2
3
4
5
|
{ "employees":[ { "firstName":"John", "lastName":"Doe" }, { "firstName":"Anna", "lastName":"Smith" }, { "firstName":"Peter", "lastName":"Jones" }]} |
Отправка и получение этого сообщения точно так же, как если бы вы использовали Ahoy !. Например, чтобы отправить сообщение, просто передайте String в метод send :
Добавление максимального порога времени в очереди
|
1
2
3
4
5
|
sqs.send(json, new SendCallback() { public void onSend(String messageId) { //messageId is from SQS }}); |
Обратите внимание, что метод send принимает необязательный SendCallback .
Получение сообщения осуществляется с помощью метода receive , который принимает обязательный ReceiveCallback — этот обратный вызов будет вызываться асинхронно для каждого сообщения, полученного из очереди. Каждый экземпляр получит сообщение, помещенное в очередь, и идентификатор SQS сообщения.
Добавление максимального порога времени в очереди
|
1
2
3
4
5
|
sqs.receive(new ReceiveCallback() { public void onReceive(String messageId, String message) { //do something w/the message -- in this case it's JSON }}); |
Обратите внимание, что при получении сообщения Moo замечает, что сообщение ожидает в очереди более максимального порогового значения ожидания очереди, настроенного для соответствующего QueueWaitTimeCallback , Moo вызовет его. Обратите внимание, Moo может вызывать более одного экземпляра; таким образом, вы можете настроить цепочку для выполнения различных действий по мере увеличения времени.
Помните, что длина очереди обычно является запаздывающим индикатором. Метрика, которая на самом деле означает что-то, — это то, как долго сообщение находится в очереди. Это действенный показатель, и Му дает вам возможность что-то с этим сделать! Вы можете копать это?