Статьи

Первый взгляд на потоки Redis и как их использовать с Java

Redis Streams превратился в unstable ветку Redis с начала этого года, и первые клиенты начинают с принятия API Redis Streams. Это отличное время, чтобы взглянуть на то, что предоставляют Redis Streams, и как вы можете использовать их с точки зрения клиента.

Отказ от ответственности: Redis Streams доступны в качестве первого черновика и еще не являются частью стабильной версии. API могут быть изменены.

Что такое Redis Stream?

Redis Stream — это структура данных, похожая на журнал / журнал, которая представляет журнал событий в последовательном порядке. Сообщения (события) могут быть добавлены в поток. Эти сообщения могут затем использоваться в автономном режиме или путем чтения в группе потребителей. Группы потребителей — это концепция, в которой несколько потребителей (таких как экземпляры приложений) могут быть сгруппированы в группу потребителей, чье смещение потока (ход чтения) сохраняется на стороне сервера. Эта концепция упрощает построение клиентов, поскольку смещение потока не требуется хранить на стороне потребителя.

Потоковое сообщение состоит из идентификатора сообщения, сгенерированного Redis при отправке, и тела, представленного в виде хэша (карты) — в основном, набора ключей и значений.
Сам поток идентифицируется ключом и содержит ноль для многих потоковых сообщений вместе с некоторыми метаданными, такими как группы потребителей.

Redis Stream API

На данный момент все команды потока имеют префикс X Поток позволяет использовать команды добавления, чтения, самоанализа и обслуживания. Наиболее распространенные команды, которые вы увидите в следующих разделах:

  • XADD key * field1 value1 [field2 value2] [fieldN valueN] : добавить (отправить) сообщение в поток Redis.
  • XREAD [BLOCK timeout] [COUNT n] STREAMS key1 [keyN] offset1 [offsetN] : чтение сообщения из потока Redis.
  • XRANGE key from to [COUNT n] : сканирование (самоанализ) потока Redis для его сообщений

Кроме того, при использовании групп потребителей в игру вступают дополнительные команды:

  • XREADGROUP GROUP name consumer [BLOCK timeout] [COUNT n] [NOACK] STREAMS key1 [keyN] offset1 [offsetN] : чтение сообщения из потока Redis в контексте потребителя и его группы.
  • XACK key group messageId1 [messageId2] [messageIdN] : квитировать сообщение после прочтения в контексте потребителя.
  • XPENDING key group [from to COUNT n] : перечислить ожидающие (неподтвержденные сообщения).
  • XGROUP и подкоманды: API для создания и удаления групп потребителей.

Примечание. Команды выше усечены для краткости. См. Документацию Redis Streams для объяснения всех возможных опций и комбинаций.

Использование Redis Stream

Давайте посмотрим, как мы можем использовать поток Redis через redis-cli применяя команды, которые мы видели ранее. Давайте добавим (и сначала создадим поток) сообщение в новый поток.

1
2
127.0.0.1:6379> XADD my-stream * key value
1527062149743-0

Мы используем XADD для добавления нового сообщения в поток my-stream с XADD ключ-значение. Обратите внимание на * (звездочку)? Это поле используется для управления генерацией идентификатора. Если вы хотите сгенерировать идентификатор сообщения сервером (что верно в 99,5% случаев, если вы не являетесь сервером Redis, который хочет реплицировать), всегда ставьте * там. Redis отвечает с идентификатором сообщения 1527062149743-0 .

Наш поток теперь содержит сообщение. Давайте читать это с XREAD .

1
2
3
4
5
127.0.0.1:6379>  XREAD COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"
   2) 1) 1) 1527062149743-0
         2) 1) "key"
            2) "value"

Мы прочитали сообщение прямо сейчас и извлекли тело вдоль прочитанного. Чтение сообщения оставляет сообщение в потоке. Мы можем проверить это с помощью XRANGE :

1
2
3
4
127.0.0.1:6379> XRANGE my-stream - +
1) 1) 1527068644230-0
   2) 1) "key"
      2) "value"

Последующее чтение с тем же смещением потока вернет нам то же сообщение. У вас есть разные варианты, чтобы избежать этого поведения:

  1. Отслеживание идентификатора сообщения на стороне клиента
  2. Блокировка чтения
  3. Удаление сообщений из потока
  4. Ограничение размера потока
  5. Использование групп потребителей

Давайте внимательнее посмотрим на эти варианты.

Отслеживание MessageId

Каждая операция чтения возвращает идентификатор сообщения вместе с потоковым сообщением. Если у вас один клиент (нет одновременных чтений), вы можете сохранить ссылку на последний идентификатор сообщения в вашем приложении и использовать его повторно при последующих вызовах чтения. Давайте сделаем это для идентификатора сообщения, которое мы видели ранее 1527068644230-0 :

1
2
3
4
5
6
7
127.0.0.1:6379> XADD my-stream * key value
1527069672240-0
127.0.0.1:6379>  XREAD COUNT 1 STREAMS my-stream 1527068644230-0
1) 1) "my-stream"
   2) 1) 1) 1527069672240-0
         2) 1) "key"
            2) "value"

Мы использовали 1527068644230-0 качестве смещения потока и получили следующее добавленное сообщение. Этот подход позволяет возобновить чтение старых (вероятно, уже использованных сообщений), но требует некоторой координации на стороне клиента, чтобы не читать повторяющиеся сообщения.

Если вы не хотите отслеживать идентификатор сообщения и вас интересуют только самые последние сообщения, вы можете использовать блокировку чтения.

Блокировка чтения

Чтение через XREAD позволяет читать из потоков блокирующим образом. XREAD ведет себя аналогично BLPOP и BRPOP когда вы указываете тайм-аут, и вызов возвращается либо при наличии сообщения, либо по BLPOP времени чтения. Тем не менее, Stream API предоставляет больше возможностей. Для этого примера нам нужны две отдельные стороны: производитель и потребитель. Если вы читали с самого начала, вы видели примеры, выполненные с использованием одного клиента. Сначала мы начинаем с потребителя, в противном случае полученное сообщение поступает в поток без возможности оповестить ожидающего потребителя.

потребитель

Мы используем XREAD с BLOCK 10000 для ожидания 10000 миллисекунд (10 секунд). Обратите внимание, что мы используем символьное смещение потока $ , указывающее на начало потока.

1
127.0.0.1:6379> XREAD COUNT 1 BLOCK 10000 STREAMS my-stream $

Теперь потребитель заблокирован и ожидает прибытия сообщения.

Режиссер

1
2
127.0.0.1:6379> XADD my-stream * key value
1527070630698-0

Redis пишет сообщение в наш поток. Теперь вернемся к потребителю.

потребитель

После того, как сообщение записано в наш поток, потребитель получает сообщение и снова разблокируется. Вы можете начать обработку сообщения и, возможно, выполнить еще одно чтение.

1
2
3
4
5
1) 1) "my-stream"
   2) 1) 1) 1527070630698-0
         2) 1) "key"
            2) "value"
(1.88s)

При повторном чтении с использованием смещения потока $ снова будет ожидаться следующее сообщение, поступающее в поток. Однако использование $ оставляет нам время, в течение которого могут поступать другие сообщения, которые мы не потребляли. Чтобы избежать этих пробелов, вы должны отслеживать последний прочитанный вами идентификатор сообщения и повторно использовать его для следующего вызова XREAD .
Еще одно предостережение, чтобы не упустить параллелизм. Мы видели пример с одним потребителем. Что если вы увеличите количество потребителей?

В этом случае, если у вас есть, например, два потребителя, которые выдают блокирующее чтение, оба потребителя получают одно и то же сообщение, которое снова оставляет нам задачу координировать чтение, поэтому потоковое сообщение не обрабатывается несколько раз.

Удаление сообщений из потока

Можно удалить сообщения из потока, но это не рекомендуется. Мы еще не видели XDEL , но из названия становится очевидным, что мы можем удалять сообщения из потока:

1
2
127.0.0.1:6379> XDEL my-stream 1527070789716-0
(integer) 1

Сообщение теперь пропало. Удаление не рекомендуется, поскольку операции являются дорогостоящими: потоки используют радикальные деревья с макроузлами. Удаление является безопасной операцией, когда, но при использовании сообщения несколькими пользователями необходимо синхронизировать доступ, поскольку удаление не препятствует прочтению сообщения несколько раз.

Ограничение размера потока

Вы можете указать максимальный размер потока при добавлении сообщений в поток. Это происходит с опцией MAXLEN при XADD команды XADD .

1
2
127.0.0.1:6379> XADD my-stream MAXLEN 4 * key value
1527071269045-0

Сообщение добавляется в поток, и поток максимально урезается до предела размера. Это также означает, что старые сообщения удаляются и больше не читаются.

Потребительские группы

Последний подход к обработке дублированных сообщений использует группы потребителей. Идея групп потребителей состоит в том, чтобы отслеживать признание. Подтверждение позволяет пометить сообщение как подтвержденное потребителем. Команда XACK возвращает сообщение о том, было ли сообщение подтверждено, или предыдущий потребитель уже подтвердил сообщение.

Чтобы использовать группы потребителей, нам нужно сначала создать группу потребителей. Обратите внимание, что на момент написания этого поста поток уже должен существовать, прежде чем можно будет создать группу потребителей. Эта проблема, вероятно, будет решена с помощью https://github.com/antirez/redis/issues/4824 .

На данный момент мы можем повторно использовать наш поток my-stream если вы следовали предыдущим примерам.

Мы создаем группу потребителей с именем my-group которая действительна только для потока my-stream . Обратите внимание, что последний параметр — это смещение потока, которое используется для отслеживания прогресса чтения. Мы используем $ чтобы указать на заголовок потока.

1
2
127.0.0.1:6379> XGROUP CREATE my-stream my-group $
OK

Давайте теперь добавим сообщение в поток:

1
2
127.0.0.1:6379> XADD my-stream * key value
1527072009813-0

И выполните неблокирующее чтение через XREADGROUP :

1
2
3
4
5
127.0.0.1:6379> XREADGROUP GROUP my-group c1 COUNT 1 STREAMS my-stream >
1) 1) "my-stream"
   2) 1) 1) 1527072009813-0
         2) 1) "key"
            2) "value"

XREADGROUP принимает имя группы и имя потребителя для отслеживания прогресса чтения. Обратите внимание также на смещение потока > . Это символическое смещение потока указывает на последний идентификатор сообщения, который был прочитан группой потребителей my-group .
Вы могли заметить, что в группе есть имя потребителя. Группы потребителей предназначены для отслеживания доставки сообщений и различения потребителей. Если вы помните пример блокировки чтения сверху, вы видели, что два потребителя получили сообщение одновременно. Чтобы изменить (или сохранить) это поведение, вы можете указать имя потребителя:

  1. Операции чтения с одинаковыми именами потребителей могут получать одно и то же сообщение несколько раз.
  2. Операции чтения с разными именами потребителей не позволяют получить одно и то же сообщение несколько раз.

В зависимости от режима, в котором вы используете сообщения, вы можете захотеть возобновить обработку или использовать сообщения несколькими клиентами без создания собственного механизма синхронизации. Redis Streams позволяет вам делать это, подтверждая сообщения. По умолчанию XREADGROUP подтверждает сообщения, которые сигнализируют, что сообщение было обработано и может быть удалено. Вы можете указать NOACK чтобы не подтверждать сообщение во время чтения. После обработки сообщения подтвердите сообщение, XACK . В зависимости от команды return вы можете увидеть, подтвердили ли вы сообщение или другой клиент уже подтвердил сообщение.

Теперь давайте остановимся здесь и не будем погружаться в восстановление и более сложные темы. Веб-сайт Redis предоставляет полную документацию о Redis Streams по адресу https://redis.io/topics/streams-intro .

Использование Redis Streams с Java

Примечание. На момент написания статьи единственным Java-клиентом, поддерживающим Redis Streams, была предварительная версия Lettuce 5.1.0.M1.

Redis Streams поставляется с новым серверным API, который также требует адаптации на стороне клиента. Давайте повторим приведенные выше примеры, используя клиент Java.

Прежде всего, нам нужен экземпляр клиента для подготовки соединения. Мы будем использовать синхронный API. Однако API Redis Stream поддерживаются также асинхронными и реактивными API.

1
2
3
RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisStreamCommands<String, String> streamCommands = connection.sync();

Lettuce представляет новый командный интерфейс RedisStreamCommands который объявляет методы API Redis Stream вместе с различными его типами (такими как StreamOffset , Consumer и объекты аргументов команды).

Первое, что мы хотим сделать, это добавить новое сообщение в поток:

1
2
Map<String, String> body =  Collections.singletonMap("key", "value");
String messageId = streamCommands.xadd("my-stream", body);

В этом примере используются строки в кодировке UTF-8 для представления ключей и значений. Само тело переносится как Map и выдает команду XADD my-stream * key value .

Теперь давайте прочитаем одно сообщение из нашего потока с помощью команды, соответствующей XREAD COUNT 1 STREAMS my-stream 0 :

1
2
3
4
5
6
7
8
9
List<StreamMessage<String, String>> messages = streamCommands
        .xread(XReadArgs.Builder.count(1),
               StreamOffset.from("my-stream", "0"));
 
if(messages.size() == 1) { // a message was read
     
} else { // no message was read
     
}

Метод xread(…) принимает XReadArgs и StreamOffset и возвращает список объектов StreamMessage<K, V> которые содержат идентификатор сообщения вместе с телом. Сообщения могут быть обработаны сейчас, и последующее чтение будет включать последний messageId для чтения новых сообщений:

01
02
03
04
05
06
07
08
09
10
StreamMessage<String, String> message = …;
List<StreamMessage<String, String>> messages = streamCommands
        .xread(XReadArgs.Builder.count(1),
               StreamOffset.from("my-stream", message.getId()));
 
if(messages.size() == 1) { // a message was read
     
} else { // no message was read
     
}

Блокирующие чтения требуют, чтобы в объект аргумента была передана дополнительная длительность. Добавление опции BLOCK превращает неблокирующий вызов (с точки зрения Redis) в блокирующий:

1
2
3
4
List<StreamMessage<String, String>> messages = streamCommands
        .xread(XReadArgs.Builder.count(1)
                                .block(Duration.ofSeconds(10)),
                                StreamOffset.from("my-stream", "0"));

В последнем примере давайте рассмотрим группы потребителей. RedisStreamCommands предоставляет методы для создания потребителей — на момент написания, методы для удаления потребителей и групп потребителей еще не реализованы в Redis.

1
2
3
4
5
6
streamCommands.xadd("my-stream", Collections.singletonMap("key", "value")); // add a message to create the stream data structure
 
streamCommands.xgroupCreate("my-stream", "my-group", "$"); // add a group pointing to the stream head
 
List<StreamMessage<String, String>> messages = streamCommands.xreadgroup(Consumer.from("my-group", "c1"),
        StreamOffset.lastConsumed("my-stream"));

Сообщения считываются из my-stream с использованием группы потребителей my-group и потребителя c1 . Группы потребителей и имена потребителей кодируются в байтовом коде и поэтому чувствительны к регистру при использовании строк ASCII или UTF-8.

Вывод

В этом сообщении в блоге описан первый взгляд на Redis Streams, который будет доступен с Redis 5, и как использовать Stream API с клиентом Lettuce Redis. API не полностью реализован, поэтому следует ожидать изменений.

Опубликовано на Java Code Geeks с разрешения Марка Палуха, партнера нашей программы JCG . См. Оригинальную статью здесь: первый взгляд на Redis Streams и как использовать их с Java

Мнения, высказанные участниками Java Code Geeks, являются их собственными.