Когда дело доходит до очередей, независимо от того, реализованы ли они в виде JMS , таблиц базы данных (то есть, что Ruby использует Delayed :: Job для очереди) или даже SQS Amazon , наиболее распространенным показателем, используемым для оценки состояния очереди, является его длина. , По сути, получают показатель эффективности на основе того, сколько сообщений находится в очереди в любой момент времени. Если сообщений всего несколько, очередь работает эффективно. Если есть многочисленные сообщения, вещи неэффективны, и тревоги должны быть озвучены.
Но что, если вы находитесь в постоянно занятой среде с экстремальными всплесками, где очереди имеют тенденцию быстро заполняться? Если у вас уже работает достаточно рабочих, чтобы справиться с этим взрывом, вам нужно больше разжигать?
Вы можете уволить больше работников, но это может стоить вам. То есть вам, возможно, придется предоставить новые рабочие экземпляры, такие как рабочие док-станции Heroku или AMI AMS, которые в конечном итоге будут стоить вам ощутимых денег. И иногда для запуска этих рабочих экземпляров требуется несколько минут, и когда они работают, всплеск активности заканчивается, и очередь возвращается в нормальное состояние — первоначально доступные рабочие справились с нагрузкой адекватно.
Оказывается, длина очереди была запаздывающим индикатором. Вы раскручивали ненужные ресурсы. Ложная сигнализация!
Если у вас уже есть достаточная емкость для обработки потока сообщений в очереди, то мониторинг длины очереди не слишком полезен. На самом деле это вводящий в заблуждение показатель, который может привести к ненужным действиям.
Следовательно, длина очереди не является показателем эффективности системы, когда там уже достаточно рабочих. Скорее, показатель, который означает что-то в среде с высокой пропускной способностью, показывает, как долго сообщение находится в очереди . Это действенный показатель: если сообщения помещаются в очередь, ожидающую обработки, вам нужно больше процессоров!
Moo по длине очереди и пусть время ожидания в очереди
По умолчанию SQS от Amazon не позволяет запрашивать, как долго сообщение находилось в очереди. Поэтому я написал Му .
Moo предоставляет клиентам интерфейс для получения и выполнения действий над временем сообщения в метрике очереди. Это делается путем добавления сообщения SQS к отметке времени. Эта отметка времени затем проверяется, когда сообщение извлекается из очереди SQS. Если пороговая разница превышена, вызывается обратный вызов.
Пользователи Moo найдут его использование похожим на Ahoy! , который является фасадом с асинхронным обратным вызовом поверх Java SDK AWS . На самом деле, Му использует Ахой! внизу, с добавленной функцией присоединения асинхронного обратного вызова «максимальное время в очереди».
Moo поддерживает множественное время в порогах очереди, а установка максимального времени в пороге очереди выполняется следующим образом:
Добавление максимального порога времени в очереди
1
2
3
4
5
6
7
|
//adds a 1 second max threshold sqs.addQueueWaitTimeCallback( 1000 , new QueueWaitTimeCallback() { public void onThresholdExceeded( long waitTime) { //waitTime is the actual time in queue //do something... like fire off a web hook, etc } }); |
Обратите внимание, что метод addQueueWaitTimeCallback
занимает максимальное время в миллисекундах в значении очереди и сопровождающую QueueWaitTimeCallback
обратного вызова QueueWaitTimeCallback
. Метод onThresholdExceeded
будет вызываться асинхронно во время получения сообщения, если превышено максимальное пороговое значение; более того, onThresholdExceeded
получит в качестве параметра фактическое время ожидания очереди.
Покажи мне Му
Чтобы запустить экземпляр Moo, у вас есть несколько вариантов, включая настройку экземпляра AmazonSQS
AWS или просто передачу ключа, секрета и имени очереди следующим образом:
Добавление максимального порога времени в очереди
1
|
SQS sqs = new SQS(System.getProperty( "key" ), System.getProperty( "secret" ), System.getProperty( "queue" )); |
Затем вы можете присоединить ноль ко многим экземплярам QueueWaitTimeCallback
например так:
Добавление максимального порога времени в очереди
1
2
3
4
5
|
sqs.addQueueWaitTimeCallback( 600000 , new QueueWaitTimeCallback() { public void onThresholdExceeded( long actualWaitTime) { //do something -- fire off SNS message? } }); |
В этом случае я добавил функцию обратного вызова, которая будет вызываться, если сообщения находятся в очереди более 10 минут. Обратите внимание, что эти QueueWaitTimeCallback
вызовы QueueWaitTimeCallback
экземпляром средства чтения очереди ; соответственно, QueueWaitTimeCallback
может, конечно, QueueWaitTimeCallback
больше экземпляров себя.
Вот пример документа JSON, который вы можете добавить в очередь SQS:
Добавление максимального порога времени в очереди
1
2
3
4
5
|
{ "employees" :[ { "firstName" : "John" , "lastName" : "Doe" }, { "firstName" : "Anna" , "lastName" : "Smith" }, { "firstName" : "Peter" , "lastName" : "Jones" } ]} |
Отправка и получение этого сообщения точно так же, как если бы вы использовали Ahoy !. Например, чтобы отправить сообщение, просто передайте String
в метод send
:
Добавление максимального порога времени в очереди
1
2
3
4
5
|
sqs.send(json, new SendCallback() { public void onSend(String messageId) { //messageId is from SQS } }); |
Обратите внимание, что метод send
принимает необязательный SendCallback
.
Получение сообщения осуществляется с помощью метода receive
, который принимает обязательный ReceiveCallback
— этот обратный вызов будет вызываться асинхронно для каждого сообщения, полученного из очереди. Каждый экземпляр получит сообщение, помещенное в очередь, и идентификатор SQS сообщения.
Добавление максимального порога времени в очереди
1
2
3
4
5
|
sqs.receive( new ReceiveCallback() { public void onReceive(String messageId, String message) { //do something w/the message -- in this case it's JSON } }); |
Обратите внимание, что при получении сообщения Moo замечает, что сообщение ожидает в очереди более максимального порогового значения ожидания очереди, настроенного для соответствующего QueueWaitTimeCallback
, Moo вызовет его. Обратите внимание, Moo может вызывать более одного экземпляра; таким образом, вы можете настроить цепочку для выполнения различных действий по мере увеличения времени.
Помните, что длина очереди обычно является запаздывающим индикатором. Метрика, которая на самом деле означает что-то, — это то, как долго сообщение находится в очереди. Это действенный показатель, и Му дает вам возможность что-то с этим сделать! Вы можете копать это?