Oracle AQ: Oracle Database Advanced Queuing . Одна из самых удивительных функций базы данных Oracle . AQ API реализует полноценную систему транзакционных сообщений непосредственно в базе данных.
В классической архитектуре, где база данных находится в центре вашей системы, когда несколько приложений (некоторые из которых написаны на Java, другие написаны на Perl или PL / SQL и т. Д.) Обращаются к одной и той же базе данных, используя AQ для межпроцессного взаимодействия просто замечательно Если вы больше разбираетесь в Java EE, вы можете приобрести решение MQ на основе Java и поместить эту шину / промежуточное ПО в центр архитектуры вашей системы. Но почему бы не использовать базу данных вместо этого ?
Как использовать PL / SQL AQ API с jOOQ
API PL / SQL для постановки и извлечения сообщений AQ довольно прост, и к нему очень легко получить доступ из Java с OracleDSL.DBMS_AQ
API OracleDSL.DBMS_AQ OracleDSL.DBMS_AQ
.
Используемая здесь конфигурация очереди будет выглядеть примерно так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
CREATE OR REPLACE TYPE message_t AS OBJECT ( ID NUMBER(7), title VARCHAR2(100 CHAR ) ) / BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'message_aq_t' , queue_payload_type => 'message_t' ); DBMS_AQADM.CREATE_QUEUE( queue_name => 'message_q' , queue_table => 'message_aq_t' ); DBMS_AQADM.START_QUEUE( queue_name => 'message_q' ); COMMIT ; END ; / |
И генератор кода jOOQ будет генерировать полезные классы со всей информацией о типах, непосредственно связанной с ними (упрощенный пример):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
class Queues { static final Queue<MessageTRecord> MESSAGE_Q = new QueueImpl<>( "NEW_AUTHOR_AQ" , MESSAGE_T); } class MessageTRecord { void setId(Integer id) { ... } Integer getId() { ... } void setTitle(String title) { ... } String getTitle() { ... } MessageTRecord( Integer id, String title ) { ... } } |
Эти классы могут затем использоваться для постановки в очередь и безопасного удаления типов сообщений непосредственно в сгенерированных ссылках очереди:
1
2
3
4
5
6
7
8
9
|
// The jOOQ configuration Configuration c = ... // Enqueue a message DBMS_AQ.enqueue(c, MESSAGE_Q, new MessageTRecord( 1 , "test" )); // Dequeue it again MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q); |
Легко, не правда ли?
Теперь давайте использовать возможности Java 8
Очередь сообщений — это не что иное, как бесконечный (блокирующий) поток сообщений. Начиная с Java 8, у нас есть потрясающий API для таких потоков сообщений, Stream API.
Вот почему мы добавили (для готовящегося к выпуску jOOQ 3.8) новый API, который объединяет существующий API jOOQ AQ с потоками Java 8:
1
2
3
4
5
6
|
// The jOOQ configuration Configuration c = ... DBMS_AQ.dequeueStream(c, MESSAGE_Q) .filter(m -> "test" .equals(m.getTitle())) .forEach(System.out::println); |
Вышеупомянутый потоковый конвейер будет прослушивать очередь MESSAGE_Q
, MESSAGE_Q
все сообщения, отфильтровывать сообщения, не содержащие "test"
, и распечатывать оставшиеся сообщения.
Блокировка потоков
Интересно то, что это блокирующий бесконечный поток. Пока в очереди нет новых сообщений, обработка потокового конвейера будет просто блокировать очередь, ожидая новых сообщений. Это не проблема для последовательных потоков, но когда вызывается Stream.parallel()
, что происходит потом?
jOOQ будет использовать каждое сообщение в транзакции. Транзакция jOOQ 3.8 выполняется в ForkJoinPool.ManagedBlocker
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
static <T> Supplier<T> blocking(Supplier<T> supplier) { return new Supplier<T>() { volatile T result; @Override public T get() { try { ForkJoinPool.managedBlock( new ManagedBlocker() { @Override public boolean block() { result = supplier.get(); return true ; } @Override public boolean isReleasable() { return result != null ; } }); } catch (InterruptedException e) { throw new RuntimeException(e); } return asyncResult; } }; } |
Это не много магии. ManagedBlocker
запускает некоторый специальный код, когда он запускается с помощью ForkJoinWorkerThread
, следя за тем, чтобы поток ForkJoinPool
не страдал от исчерпания потока и, следовательно, от взаимоблокировок. Для получения дополнительной информации, прочитайте эту интересную статью здесь: http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health
Или этот ответ переполнения стека: http://stackoverflow.com/a/35272153/521799
Итак, если вам нужен сверхбыстрый параллельный процесс снятия с очереди AQ, просто запустите:
1
2
3
4
5
6
7
8
|
// The jOOQ configuration. Make sure its referenced // ConnectionPool has enough connections Configuration c = ... DBMS_AQ.dequeueStream(c, MESSAGE_Q) .parallel() .filter(m -> "test" .equals(m.getTitle())) .forEach(System.out::println); |
И у вас будет несколько потоков, которые будут удалять сообщения параллельно.
Не хотите ждать JOOQ 3.8?
Нет проблем. Используйте текущую версию и оберните операцию dequeue
в свой собственный Stream
:
1
2
3
4
5
|
Stream<MessageTRecord> stream = Stream.generate(() -> DSL.using(config).transactionResult(c -> dequeue(c, MESSAGE_Q) ) ); |
Готово.
Бонус: асинхронное снятие очереди
Пока мы занимались этим, еще одна очень приятная особенность систем массового обслуживания — это их асинхронность. В Java 8 очень полезным типом для моделирования (и составления) асинхронных алгоритмов является CompletionStage
, а его реализацией по умолчанию является CompletableFuture
, которая снова выполняет задачи в ForkJoinPool
.
Используя jOOQ 3.8, вы можете снова просто позвонить
1
2
3
4
5
6
7
8
|
// The jOOQ configuration. Make sure its referenced // ConnectionPool has enough connections Configuration c = ... CompletionStage<MessageTRecord> stage = DBMS_AQ.dequeueAsync(c, MESSAGE_Q) .thenCompose(m -> ...) ...; |
Следите за следующей статьей в блоге jOOQ , где мы рассмотрим более сложные сценарии использования для асинхронных блокировок операторов SQL с помощью jOOQ 3.8 и Java 8
Ссылка: | Использование Oracle AQ через Java 8 Streams от нашего партнера по JCG Лукаса Эдера из блога JAVA, SQL и AND JOOQ . |