Статьи

Простая очередь сообщений с использованием Redis

В этих постах мы будем использовать Redis как простую очередь сообщений, используя команды списка.

Допустим, у нас есть приложение, которое позволяет пользователям загружать фотографии. Затем в приложении мы показываем фотографию в разных размерах, таких как Thumb, Medium и Large.

В первой реализации у нас может быть задача обработки загруженного изображения в том же запросе. Поскольку это дорогостоящее задание, наши запросы будут медленными.

Возможное решение для этого состоит в том, чтобы сделать эту обработку асинхронной с помощью очереди сообщений (MQ), существует множество хорошо известных MQ, таких как ActiveMQ, RabbitMQ, IBM MQ и другие. В приведенных ниже примерах мы будем использовать Redis в качестве очереди сообщений, используя структуру LIST.

Идея состоит в том, чтобы иметь СПИСОК, в который производитель помещает сообщения для обработки, а некоторые потребители будут просматривать СПИСОК для обработки отправленных сообщений.

По сути, производители добавляют сообщения в конец списка с помощью « сообщения очереди RPUSH », а потребители читают сообщения в начале списка с помощью « очереди LPOP », настраивая обработку FIFO.

Клиент всегда будет искать новое сообщение, для этого мы будем использовать команду BLPOP, которая является блокирующей версией команды LPOP. По сути, будет цикл while, вызывающий BLPOP, чтобы получить новое сообщение для обработки.

Рассматривая пример загрузки изображения, скажем, у нас есть класс ImageUploader, который отвечает за загрузку изображения на сервер, он добавит новое сообщение в очередь, указывающее, что изображение обрабатывается, сообщение может быть строкой JSON. как это:

1
{“imagePath”:”/path/to/image”, “user”:”userid”}

Класс ImageUploder может быть таким:

01
02
03
04
05
06
07
08
09
10
11
12
public class ImageUploader {
 
  public void uploadImage(HttpServletRequest request){
 
    String imagePath = saveImage(request);
    String jsonPayload = createJsonPayload(request, imagePath);
    jedis.rpush("queue", jsonPayload);
    //... keep with the processing
  }
 
  //.... other methods in the class
}

Это просто пример того, как продюсер будет работать. В этом случае мы отсоединили обработку изображений от класса ImageUploader. Мы просто создаем новое сообщение в очереди, чтобы потребители обрабатывали его.

Потребитель сообщения может выглядеть примерно так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package br.com.xicojunior.redistest;
 
import java.util.List;
 
import redis.clients.jedis.Jedis;
 
public class MessageConsumer
{
    public static void main( String[] args )
    {
        Jedis jedis = new Jedis("localhost");  
        List<String> messages = null;
        while(true){
          System.out.println("Waiting for a message in the queue");
          messages = jedis.blpop(0,"queue");
          System.out.println("Got the message");
          System.out.println("KEY:" + messages.get(0) + " VALUE:" + messages.get(1));
          String payload = messages.get(1);
          //Do some processing with the payload
          System.out.println("Message received:" + payload);
        }
 
    }
}

Этот потребительский код может выполняться в другом процессе или даже на другом компьютере. Этот потребительский код работает, мы можем скомпилировать его и запустить с помощью команды eclipse или java.

Для этого кода мы использовали метод jedis.blpop , он возвращает список с 2 строками, (0) — ключ, (1) — возвращаемое значение. Метод также получает целое число, оно указывает время ожидания. Мы пропустили 0, указывая, что таймаута не будет

Когда мы запускаем этот код и в списке нет значения, в консоли мы увидим только сообщение

1
"Waiting for a message in the queue".

Тогда, если клиент добавляет элемент в список «очередь», наш потребительский класс получит его значение. Мы можем смоделировать тест, используя redis-cli или даже другой класс, который будет добавлять элементы в очередь, как показано ниже:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
package br.com.xicojunior.redistest;
 
import redis.clients.jedis.Jedis;
 
public class MessageProducer {
 
  public static void main(String[] args) {
    Jedis jedis = new Jedis("localhost");
 
    jedis.rpush("queue", "Value 1");
    jedis.rpush("queue", "Value 2");
    jedis.rpush("queue", "Value 3");
 
  }
 
}

Если мы запустим класс MessageProducer после того, как класс MessageConsumer уже запущен, мы увидим этот вывод в консоли:

01
02
03
04
05
06
07
08
09
10
11
12
13
Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 1
Message received:Value 1
Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 2
Message received:Value 2
Waiting for a message in the queue
Got the message
KEY:queue VALUE:Value 3
Message received:Value 3
Waiting for a message in the queue

Таким образом, очередь сообщений будет еще одним возможным вариантом использования Redis. Есть некоторые очереди, построенные поверх redis, такие как RestMQ , Resque — Job Queue и другие.

Ссылка: Простая очередь сообщений с использованием Redis от нашего партнера JCG Франсиско Рибейру Младшего в блоге XICO JUNIOR’S WEBLOG .