Использование очередей в Redis
Redis — это мощный инструмент, который поддерживает различные типы структур данных, от строк и списков до карт и потоков. Разработчики используют Redis в качестве базы данных, кэша и посредника сообщений.
Как и любой брокер сообщений, Redis должен отправлять сообщения в правильном порядке. Сообщения могут быть отправлены в соответствии с их возрастом или в соответствии с некоторыми другими предварительно определенными приоритетами.
Для хранения этих ожидающих сообщений разработчикам Redis нужна структура данных очереди. Redisson — это инфраструктура для распределенного программирования с Redis и Java, которая обеспечивает реализацию многих распределенных структур данных, включая очереди.
Redisson облегчает разработку Redis, предоставляя Java API. Вместо того, чтобы требовать от разработчиков изучения команд Redis, Redisson включает в себя все известные интерфейсы Java, такие как Queue и BlockingQueue. Redisson также выполняет утомительную работу в Redis, такую как управление соединениями, обработка отказов и сериализация данных.
Вам также может понравиться:
Создание микросервисов с помощью Redis .
Распределенные Java-очереди на основе Redis
Redisson предоставляет несколько реализаций базовой структуры данных в Java на основе Redis, каждая из которых имеет различные функциональные возможности. Это позволяет вам выбрать тип очереди, который лучше всего подходит для ваших целей.
Ниже мы обсудим шесть различных типов распределенных очередей на основе Redis с использованием инфраструктуры Redisson Java.
Очередь
RQueue
Объект в Redisson реализует java.util.Queue интерфейс. Очереди используются для ситуаций, в которых элементы должны обрабатываться, начиная с самых старых, сначала (также известные как «первым пришел, первым вышел» или FIFO).
Как и в простой Java, первый элемент RQueue
можно проверить с помощью peek()
метода или изучить и удалить с помощью poll()
метода:
Джава
xxxxxxxxxx
1
RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
2
queue.add(new SomeObject());
3
SomeObject obj = queue.peek();
4
SomeObject someObj = queue.poll();
BlockingQueue
RBlockingQueue
Объект в Redisson реализует java.util.BlockingQueue интерфейс.
BlockingQueues — это очереди, блокирующие поток, пытающийся выполнить опрос из пустой очереди или вставляющий элемент в очередь, которая заполнена. Поток блокируется до тех пор, пока другой поток не вставит элемент в пустую очередь или не выполнит опрос первого элемента из полной очереди.
Пример кода ниже демонстрирует правильную реализацию и использование RBlockingQueue
. В частности, вы можете вызвать poll()
метод с аргументами, которые определяют, как долго поток будет ждать, пока элемент станет доступным:
Джава
xxxxxxxxxx
1
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
2
queue.offer(new SomeObject());
3
SomeObject obj = queue.peek();
4
SomeObject someObj = queue.poll();
5
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
Во время восстановления после сбоев или повторного подключения к серверу Redis, то poll()
, pollFromAny()
, pollLastAndOfferFirstTo()
и take()
методы Java являются Возобновлены подпиской автоматически.
BoundedBlockingQueue
RBoundedBlockingQueue
Объект в Redisson реализует ограниченную блокирующую структуру очереди. Ограниченные очереди блокирования — это блокирующие очереди, емкость которых ограничена, т.е. ограничена.
Приведенный ниже код демонстрирует, как создавать экземпляры и использовать их RBoundedBlockingQueue
в Redisson. trySetCapacity()
Метод используется , чтобы попытаться установить емкость очереди блокировки. trySetCapacity()
возвращает логическое значение «true» или «false» в зависимости от того, была ли емкость успешно установлена или уже была установлена:
Джава
xxxxxxxxxx
1
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
2
queue.trySetCapacity(2);
3
queue.offer(new SomeObject(1));
4
queue.offer(new SomeObject(2));
5
// will be blocked until free space available in queue
6
queue.put(new SomeObject());
7
SomeObject obj = queue.peek();
8
SomeObject someObj = queue.poll();
9
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
DelayedQueue
RDelayedQueue
Объект Redisson позволяет реализовать задержанные очереди в Redis. Это может быть полезно при доставке сообщений потребителям с использованием такой стратегии, как экспоненциальный откат. После каждой неудачной попытки доставить сообщение время между попытками будет экспоненциально увеличиваться.
Каждый элемент в очереди с задержкой будет перенесен в очередь назначения после задержки, указанной вместе с элементом. Эта очередь назначения может быть любой очередью, которая реализует RQueue
интерфейс, такой как RBlockingQueue
или RBoundedBlockingQueue
.
Джава
xxxxxxxxxx
1
RQueue<String> destinationQueue = redisson.getQueue("anyQueue");
2
RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue);
3
// move object to destinationQueue in 10 seconds
4
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
5
// move object to destinationQueue in 1 minute
6
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
Рекомендуется уничтожить отложенную очередь с помощью метода destroy () после того, как очередь больше не нужна. Однако в этом нет необходимости, если вы выключаете Redisson.
PriorityQueue
RPriorityQueue
Объект в Redisson реализует java.util.Queue интерфейс. Приоритетные очереди — это очереди, которые сортируются не по возрасту элемента, а по приоритету, связанному с каждым элементом.
Как показано в примере кода ниже, RPriorityQueue
использует Comparator для сортировки элементов в очереди:
Джава
xxxxxxxxxx
1
RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
2
queue.trySetComparator(new MyComparator()); // set object comparator
3
queue.add(3);
4
queue.add(1);
5
queue.add(2);
6
queue.removeAsync(0);
7
queue.addAsync(5);
8
queue.poll();
PriorityBlockingQueue
RPriorityBlockingQueue
Объект Redisson сочетает в себе функции RPriorityQueue
и RBlockingQueue
. Мол RPriorityQueue
, RPriorityBlockingQueue
использует Comparator
для сортировки элементов в очереди.
Джава
xxxxxxxxxx
1
RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
2
queue.trySetComparator(new MyComparator()); // set object comparator
3
queue.add(3);
4
queue.add(1);
5
queue.add(2);
6
queue.removeAsync(0);
7
queue.addAsync(5);
8
queue.take();
Во время восстановления после сбоев или повторного подключения к серверу Redis, то poll()
, pollLastAndOfferFirstTo()
и take()
методы Java являются Возобновлены подпиской автоматически.