Интерфейс TransferQueue на языке программирования Java представляет собой очередь для передачи сообщений в соответствии с шаблоном проектирования программирования производителя-потребителя . Интерфейс TransferQueue основан на интерфейсе BlockingQueue в Java с дополнительным условием, что производители могут ждать, пока потребители не получат свои сообщения.
Redis — это хранилище структуры данных в памяти с открытым исходным кодом, которое часто используется для создания распределенных баз данных ключей NoSQL. Хотя многие Java-программисты хотели бы использовать Redis с Java, Redis автоматически не совместима с конструкциями Java, такими как TransferQueue из коробки.
По этим причинам многие Java-программисты устанавливают Java-клиент Redis, например Redisson, чтобы использовать TransferQueue в Redis.
Вы также можете быть заинтересованы в:
Распределенные Java-очереди поверх Redis
TransferQueues в Redis с Redisson
Redisson — это Java-клиент Redis, который реализует многие знакомые объекты и коллекции, необходимые для распределенных приложений на Java.
Интерфейс TransferQueue в Java повторно реализован в Redisson с помощью интерфейса RTransferQueue. Ниже приведен пример того, как создать экземпляр объекта RTransferQueue:
Джава
1
RTransferQueue<String> queue = redisson.getTransferQueue("myCountDownLatch");
После создания вы можете использовать RTransferQueue для передачи сообщения от производителя:
Джава
xxxxxxxxxx
1
queue.transfer("data");
2
3
// or try transfer immediately
4
queue.tryTransfer("data");
5
6
// or try transfer up to 10 seconds
7
queue.tryTransfer("data", 10, TimeUnit.SECONDS);
Соответствующий метод в потоке потребителя или JVM:
Джава
xxxxxxxxxx
1
queue.take();
или
Джава
xxxxxxxxxx
1
queue.poll();
Redisson также включает в себя реализации TransferQueue с использованием интерфейсов Async, Reactive и RxJava2.
Ниже приведен пример TransferQueue в Redis с использованием интерфейса Async:
Джава
xxxxxxxxxx
1
RFuture<Void> future = queue.transferAsync("data");
2
// or try transfer immediately
3
RFuture<Boolean> future = queue.tryTransferAsync("data");
4
// or try transfer up to 10 seconds
5
RFuture<Boolean> future = queue.tryTransferAsync("data", 10, TimeUnit.SECONDS);
6
7
// in other thread or JVM
8
RFuture<String> future = queue.takeAsync();
9
// or
10
RFuture<String> future = queue.pollAsync();
11
12
future.whenComplete((res, exception) -> {
13
// ...
14
});
Ниже приведен пример TransferQueue в Redis с использованием интерфейса Reactive:
Джава
xxxxxxxxxx
1
RedissonReactiveClient redisson = Redisson.createReactive(config);
2
RTransferQueueReactive<String> queue = redisson.getTransferQueue("myCountDownLatch");
3
4
Mono<Void> mono = queue.transfer("data");
5
// or try transfer immediately
6
Mono<Boolean> mono = queue.tryTransfer("data");
7
// or try transfer up to 10 seconds
8
Mono<Boolean> mono = queue.tryTransfer("data", 10, TimeUnit.SECONDS);
9
10
// in other thread or JVM
11
12
Mono<String> mono = queue.take();
13
// or
14
Mono<String> mono = queue.poll();
15
16
mono.doOnNext(res -> {
17
// ...
18
}).subscribe();
Ниже приведен пример TransferQueue в Redis с использованием интерфейса RxJava2:
Джава
xxxxxxxxxx
1
RedissonRxClient redisson = Redisson.createRx(config);
2
RTransferQueueRx<String> queue = redisson.getTransferQueue("myCountDownLatch");
3
4
Completable res = queue.transfer("data");
5
// or try transfer immediately
6
Single<Boolean> resRx = queue.tryTransfer("data");
7
// or try transfer up to 10 seconds
8
Single<Boolean> resRx = queue.tryTransfer("data", 10, TimeUnit.SECONDS);
9
10
// in other thread or JVM
11
12
Single<String> resRx = queue.take();
13
// or
14
Maybe<String> resRx = queue.poll();
15
16
resRx.doOnSuccess(res -> {
17
// ...
18
}).subscribe();
Спасибо за прочтение!
Дальнейшее чтение
Зачем нам нужен Thread.currentThread (). Interrupt () в прерываемых методах?