Статьи

Использование Oracle AQ через Java 8 Streams

Oracle AQ: Oracle Database Advanced Queuing . Одна из самых удивительных функций базы данных Oracle . AQ API реализует полноценную систему транзакционных сообщений непосредственно в базе данных.

В классической архитектуре, где база данных находится в центре вашей системы, когда несколько приложений (некоторые из которых написаны на Java, другие написаны на Perl или PL / SQL и т. Д.) Обращаются к одной и той же базе данных, используя AQ для межпроцессного взаимодействия просто замечательно Если вы больше разбираетесь в Java EE, вы можете приобрести решение MQ на основе Java и поместить эту шину / промежуточное ПО в центр архитектуры вашей системы. Но почему бы не использовать базу данных вместо этого ?

Как использовать PL / SQL AQ API с jOOQ

API PL / SQL для постановки и извлечения сообщений AQ довольно прост, и к нему очень легко получить доступ из Java с OracleDSL.DBMS_AQ API OracleDSL.DBMS_AQ OracleDSL.DBMS_AQ .

Используемая здесь конфигурация очереди будет выглядеть примерно так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE OR REPLACE TYPE message_t AS OBJECT (
  ID         NUMBER(7),
  title      VARCHAR2(100 CHAR)
)
/
 
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table => 'message_aq_t',
    queue_payload_type => 'message_t'
  );
 
  DBMS_AQADM.CREATE_QUEUE(
    queue_name => 'message_q',
    queue_table => 'message_aq_t'
  );
 
  DBMS_AQADM.START_QUEUE(
    queue_name => 'message_q'
  );
  COMMIT;
END;
/

И генератор кода jOOQ будет генерировать полезные классы со всей информацией о типах, непосредственно связанной с ними (упрощенный пример):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
class Queues {
    static final Queue<MessageTRecord> MESSAGE_Q =
        new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T);
}
 
class MessageTRecord {
    void setId(Integer id) { ... }
    Integer getId() { ... }
    void setTitle(String title) { ... }
    String getTitle() { ... }
    MessageTRecord(
        Integer id, String title
    ) { ... }
}

Эти классы могут затем использоваться для постановки в очередь и безопасного удаления типов сообщений непосредственно в сгенерированных ссылках очереди:

1
2
3
4
5
6
7
8
9
// The jOOQ configuration
Configuration c = ...
 
// Enqueue a message
DBMS_AQ.enqueue(c, MESSAGE_Q,
    new MessageTRecord(1, "test"));
 
// Dequeue it again
MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);

Легко, не правда ли?

Теперь давайте использовать возможности Java 8

Очередь сообщений — это не что иное, как бесконечный (блокирующий) поток сообщений. Начиная с Java 8, у нас есть потрясающий API для таких потоков сообщений, Stream API.

Вот почему мы добавили (для готовящегося к выпуску jOOQ 3.8) новый API, который объединяет существующий API jOOQ AQ с потоками Java 8:

1
2
3
4
5
6
// The jOOQ configuration
Configuration c = ...
 
DBMS_AQ.dequeueStream(c, MESSAGE_Q)
       .filter(m -> "test".equals(m.getTitle()))
       .forEach(System.out::println);

Вышеупомянутый потоковый конвейер будет прослушивать очередь MESSAGE_Q , MESSAGE_Q все сообщения, отфильтровывать сообщения, не содержащие "test" , и распечатывать оставшиеся сообщения.

Блокировка потоков

Интересно то, что это блокирующий бесконечный поток. Пока в очереди нет новых сообщений, обработка потокового конвейера будет просто блокировать очередь, ожидая новых сообщений. Это не проблема для последовательных потоков, но когда вызывается Stream.parallel() , что происходит потом?

jOOQ будет использовать каждое сообщение в транзакции. Транзакция jOOQ 3.8 выполняется в ForkJoinPool.ManagedBlocker :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static <T> Supplier<T> blocking(Supplier<T> supplier) {
    return new Supplier<T>() {
        volatile T result;
 
        @Override
        public T get() {
            try {
                ForkJoinPool.managedBlock(new ManagedBlocker() {
                    @Override
                    public boolean block() {
                        result = supplier.get();
                        return true;
                    }
 
                    @Override
                    public boolean isReleasable() {
                        return result != null;
                    }
                });
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
 
            return asyncResult;
        }
    };
}

Это не много магии. ManagedBlocker запускает некоторый специальный код, когда он запускается с помощью ForkJoinWorkerThread , следя за тем, чтобы поток ForkJoinPool не страдал от исчерпания потока и, следовательно, от взаимоблокировок. Для получения дополнительной информации, прочитайте эту интересную статью здесь: http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health

Или этот ответ переполнения стека: http://stackoverflow.com/a/35272153/521799

Итак, если вам нужен сверхбыстрый параллельный процесс снятия с очереди AQ, просто запустите:

1
2
3
4
5
6
7
8
// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...
 
DBMS_AQ.dequeueStream(c, MESSAGE_Q)
       .parallel()
       .filter(m -> "test".equals(m.getTitle()))
       .forEach(System.out::println);

И у вас будет несколько потоков, которые будут удалять сообщения параллельно.

Не хотите ждать JOOQ 3.8?

Нет проблем. Используйте текущую версию и оберните операцию dequeue в свой собственный Stream :

1
2
3
4
5
Stream<MessageTRecord> stream = Stream.generate(() ->
    DSL.using(config).transactionResult(c ->
        dequeue(c, MESSAGE_Q)
    )
);

Готово.

Бонус: асинхронное снятие очереди

Пока мы занимались этим, еще одна очень приятная особенность систем массового обслуживания — это их асинхронность. В Java 8 очень полезным типом для моделирования (и составления) асинхронных алгоритмов является CompletionStage , а его реализацией по умолчанию является CompletableFuture , которая снова выполняет задачи в ForkJoinPool .

Используя jOOQ 3.8, вы можете снова просто позвонить

1
2
3
4
5
6
7
8
// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...
 
CompletionStage<MessageTRecord> stage =
DBMS_AQ.dequeueAsync(c, MESSAGE_Q)
       .thenCompose(m -> ...)
       ...;

Следите за следующей статьей в блоге jOOQ , где мы рассмотрим более сложные сценарии использования для асинхронных блокировок операторов SQL с помощью jOOQ 3.8 и Java 8

Ссылка: Использование Oracle AQ через Java 8 Streams от нашего партнера по JCG Лукаса Эдера из блога JAVA, SQL и AND JOOQ .