Oracle AQ: Oracle Database Advanced Queuing . Одна из самых удивительных функций базы данных Oracle . AQ API реализует полноценную систему транзакционных сообщений непосредственно в базе данных.
В классической архитектуре, где база данных находится в центре вашей системы, когда несколько приложений (некоторые из которых написаны на Java, другие написаны на Perl или PL / SQL и т. Д.) Обращаются к одной и той же базе данных, используя AQ для межпроцессного взаимодействия просто замечательно Если вы больше разбираетесь в Java EE, вы можете приобрести решение MQ на основе Java и поместить эту шину / промежуточное ПО в центр архитектуры вашей системы. Но почему бы не использовать базу данных вместо этого ?
Как использовать PL / SQL AQ API с jOOQ
API PL / SQL для постановки и извлечения сообщений AQ довольно прост, и к нему очень легко получить доступ из Java с OracleDSL.DBMS_AQ API OracleDSL.DBMS_AQ OracleDSL.DBMS_AQ .
Используемая здесь конфигурация очереди будет выглядеть примерно так:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
CREATE OR REPLACE TYPE message_t AS OBJECT ( ID NUMBER(7), title VARCHAR2(100 CHAR))/BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'message_aq_t', queue_payload_type => 'message_t' ); DBMS_AQADM.CREATE_QUEUE( queue_name => 'message_q', queue_table => 'message_aq_t' ); DBMS_AQADM.START_QUEUE( queue_name => 'message_q' ); COMMIT;END;/ |
И генератор кода jOOQ будет генерировать полезные классы со всей информацией о типах, непосредственно связанной с ними (упрощенный пример):
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
class Queues { static final Queue<MessageTRecord> MESSAGE_Q = new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T);}class MessageTRecord { void setId(Integer id) { ... } Integer getId() { ... } void setTitle(String title) { ... } String getTitle() { ... } MessageTRecord( Integer id, String title ) { ... }} |
Эти классы могут затем использоваться для постановки в очередь и безопасного удаления типов сообщений непосредственно в сгенерированных ссылках очереди:
|
1
2
3
4
5
6
7
8
9
|
// The jOOQ configurationConfiguration c = ...// Enqueue a messageDBMS_AQ.enqueue(c, MESSAGE_Q, new MessageTRecord(1, "test"));// Dequeue it againMessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q); |
Легко, не правда ли?
Теперь давайте использовать возможности Java 8
Очередь сообщений — это не что иное, как бесконечный (блокирующий) поток сообщений. Начиная с Java 8, у нас есть потрясающий API для таких потоков сообщений, Stream API.
Вот почему мы добавили (для готовящегося к выпуску jOOQ 3.8) новый API, который объединяет существующий API jOOQ AQ с потоками Java 8:
|
1
2
3
4
5
6
|
// The jOOQ configurationConfiguration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q) .filter(m -> "test".equals(m.getTitle())) .forEach(System.out::println); |
Вышеупомянутый потоковый конвейер будет прослушивать очередь MESSAGE_Q , MESSAGE_Q все сообщения, отфильтровывать сообщения, не содержащие "test" , и распечатывать оставшиеся сообщения.
Блокировка потоков
Интересно то, что это блокирующий бесконечный поток. Пока в очереди нет новых сообщений, обработка потокового конвейера будет просто блокировать очередь, ожидая новых сообщений. Это не проблема для последовательных потоков, но когда вызывается Stream.parallel() , что происходит потом?
jOOQ будет использовать каждое сообщение в транзакции. Транзакция jOOQ 3.8 выполняется в ForkJoinPool.ManagedBlocker :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
static <T> Supplier<T> blocking(Supplier<T> supplier) { return new Supplier<T>() { volatile T result; @Override public T get() { try { ForkJoinPool.managedBlock(new ManagedBlocker() { @Override public boolean block() { result = supplier.get(); return true; } @Override public boolean isReleasable() { return result != null; } }); } catch (InterruptedException e) { throw new RuntimeException(e); } return asyncResult; } };} |
Это не много магии. ManagedBlocker запускает некоторый специальный код, когда он запускается с помощью ForkJoinWorkerThread , следя за тем, чтобы поток ForkJoinPool не страдал от исчерпания потока и, следовательно, от взаимоблокировок. Для получения дополнительной информации, прочитайте эту интересную статью здесь: http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health
Или этот ответ переполнения стека: http://stackoverflow.com/a/35272153/521799
Итак, если вам нужен сверхбыстрый параллельный процесс снятия с очереди AQ, просто запустите:
|
1
2
3
4
5
6
7
8
|
// The jOOQ configuration. Make sure its referenced// ConnectionPool has enough connectionsConfiguration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q) .parallel() .filter(m -> "test".equals(m.getTitle())) .forEach(System.out::println); |
И у вас будет несколько потоков, которые будут удалять сообщения параллельно.
Не хотите ждать JOOQ 3.8?
Нет проблем. Используйте текущую версию и оберните операцию dequeue в свой собственный Stream :
|
1
2
3
4
5
|
Stream<MessageTRecord> stream = Stream.generate(() -> DSL.using(config).transactionResult(c -> dequeue(c, MESSAGE_Q) )); |
Готово.
Бонус: асинхронное снятие очереди
Пока мы занимались этим, еще одна очень приятная особенность систем массового обслуживания — это их асинхронность. В Java 8 очень полезным типом для моделирования (и составления) асинхронных алгоритмов является CompletionStage , а его реализацией по умолчанию является CompletableFuture , которая снова выполняет задачи в ForkJoinPool .
Используя jOOQ 3.8, вы можете снова просто позвонить
|
1
2
3
4
5
6
7
8
|
// The jOOQ configuration. Make sure its referenced// ConnectionPool has enough connectionsConfiguration c = ...CompletionStage<MessageTRecord> stage =DBMS_AQ.dequeueAsync(c, MESSAGE_Q) .thenCompose(m -> ...) ...; |
Следите за следующей статьей в блоге jOOQ , где мы рассмотрим более сложные сценарии использования для асинхронных блокировок операторов SQL с помощью jOOQ 3.8 и Java 8
| Ссылка: | Использование Oracle AQ через Java 8 Streams от нашего партнера по JCG Лукаса Эдера из блога JAVA, SQL и AND JOOQ . |