В прошлый раз мы узнали принципы, лежащие в основе java.util.concurrent.Future<T>
. Мы также обнаружили, что Future<T>
обычно возвращается библиотеками или фреймворками. Но ничто не мешает нам реализовать все это самим, когда это имеет смысл. Это не особенно сложно и может значительно улучшить ваш дизайн. Я приложил все усилия, чтобы выбрать интересный вариант использования для нашего примера.
JMS (Java Message Service) — это стандартный Java API для отправки асинхронных сообщений. Когда мы думаем о JMS, мы сразу же видим, как клиент отправляет сообщение серверу (брокеру) в режиме « забыто» . Но в равной степени распространено использование шаблона обмена сообщениями типа « запрос-ответ» поверх JMS. Реализация довольно проста: вы отправляете сообщение запроса (конечно асинхронно) в MDB на другой стороне.
MDB обрабатывает запрос и отправляет ответ обратно либо в жестко закодированную очередь ответов, либо в произвольную очередь, выбранную клиентом и отправленную вместе с сообщением в JMSReplyTo
. Второй сценарий гораздо интереснее. Клиент может создать временную очередь и использовать ее в качестве очереди ответа при отправке запроса. Таким образом, каждая пара запрос / ответ использует различную очередь ответов, при этом не требуется идентификатор корреляции, селекторы и т. Д.
Однако есть одна проблема. Отправка сообщения брокеру JMS проста и асинхронна. Но получать ответ гораздо более громоздко. Вы можете реализовать MessageListener
для использования одного, одного сообщения или использовать блокирование MessageConsumer.receive()
. Первый подход довольно тяжелый и сложный в применении. Второй побеждает цель асинхронного обмена сообщениями. Вы также можете опросить очередь ответов с некоторым интервалом, который звучит еще хуже.
Зная абстракцию Future
, вы должны иметь некоторую дизайнерскую идею. Что если мы сможем отправить сообщение с запросом и получить обратно Future<T>
, представляющее ответное сообщение, которое еще не пришло? Эта абстракция Future
должна обрабатывать всю логику, и мы можем смело использовать ее как указатель на будущий результат. Вот код соединения, используемый для создания временной очереди и отправки запроса:
01
02
03
04
05
06
07
08
09
10
|
private <T extends Serializable> Future<T> asynchRequest(ConnectionFactory connectionFactory, Serializable request, String queue) throws JMSException { Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE); final Queue tempReplyQueue = session.createTemporaryQueue(); final ObjectMessage requestMsg = session.createObjectMessage(request); requestMsg.setJMSReplyTo(tempReplyQueue); sendRequest(session.createQueue(queue), session, requestMsg); return new JmsReplyFuture<T>(connection, session, tempReplyQueue); } |
asynchRequest()
просто передает ConnectionFactory
посреднику JMS и произвольному фрагменту данных. Этот объект будет отправлен в queue
с помощью ObjectMessage
. Последняя строка имеет решающее значение — мы возвращаем наш пользовательский JmsReplyFuture<T>
который будет представлять еще не полученный ответ. Обратите внимание, как мы передаем временную очередь JMS как свойству JMSReplyTo
и нашему Future
. Реализация стороны MDB не так важна. Излишне говорить, что предполагается отправить ответ обратно в назначенную очередь:
1
2
|
final ObjectMessage reply = session.createObjectMessage(...); session.createProducer(request.getJMSReplyTo()).send(reply); |
Итак, давайте JmsReplyFuture<T>
в реализацию JmsReplyFuture<T>
. Я сделал предположение, что и запрос, и ответ являются ObjectMessage
. Не очень сложно использовать другой тип сообщения. Для начала посмотрим, как настроен прием сообщений с ответного канала:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener { //... public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException { this .connection = connection; this .session = session; replyConsumer = session.createConsumer(replyQueue); replyConsumer.setMessageListener( this ); } @Override public void onMessage(Message message) { //... } } |
Как вы можете видеть, JmsReplyFuture
реализует как Future<T>
(где T
— ожидаемый тип объекта, обернутый в ObjectMessage
), так и JMS MessageListener
. В конструкторе мы просто начинаем прослушивать replyQueue
. Из наших предположений о дизайне мы знаем, что там будет самое большее одно сообщение, потому что очередь ответа является временной отбрасываемой очередью. В предыдущей статье мы узнали, что Future.get()
должен блокироваться во время ожидания результата. С другой стороны, onMessage()
— это метод обратного вызова, вызываемый из некоторого внутреннего потока / библиотеки клиента JMS. Очевидно, нам нужна некоторая общая переменная / блокировка, чтобы ожидающий get()
знал, что ответ получен. Предпочтительно, чтобы наше решение было легким и не требовало задержек, поэтому занятие ожиданием volatile
переменной — плохая идея. Изначально я onMessage()
который я бы использовал, чтобы разблокировать get()
из onMessage()
. Но мне все еще нужна некоторая общая переменная для хранения фактического объекта ответа. Таким образом, у меня возникла идея использовать ArrayBlockingQueue
. Может показаться странным использовать очередь, когда мы знаем, что не будет больше одного элемента. Но он работает отлично, используя старый добрый шаблон «производитель-потребитель»: Future.get()
— это потребитель, блокирующий метод poll()
пустой очереди. С другой стороны, onMessage()
— производитель, помещающий ответное сообщение в эту очередь и немедленно разблокирующий потребителя. Вот как это выглядит:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener { private final BlockingQueue<T> reply = new ArrayBlockingQueue<>( 1 ); //... @Override public T get() throws InterruptedException, ExecutionException { return this .reply.take(); } @Override public T get( long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { final T replyOrNull = reply.poll(timeout, unit); if (replyOrNull == null ) { throw new TimeoutException(); } return replyOrNull; } @Override public void onMessage(Message message) { final ObjectMessage objectMessage = (ObjectMessage) message; final Serializable object = objectMessage.getObject(); reply.put((T) object); //... } } |
Реализация еще не завершена, но она охватывает наиболее важные концепции. Обратите внимание, как хорошо метод BlockingQueue.poll(long, TimeUnit)
вписывается в Future.get(long, TimeUnit)
. К сожалению, несмотря на то, что они приходят из одного и того же пакета и были разработаны более или менее в одно и то же время, один метод возвращает значение null
по истечении времени ожидания, в то время как другой должен выдавать исключение. Легко исправить.
Также обратите внимание, как легко стало реализовывать onMessage()
. Мы просто помещаем вновь полученное сообщение в BlockingQueue reply
и коллекция выполняет всю синхронизацию за нас. Мы все еще упускаем некоторые менее значимые, но все же важные детали — отмена и очистка. Не вдаваясь в подробности, вот полная реализация:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public class JmsReplyFuture<T extends Serializable> implements Future<T>, MessageListener { private static enum State {WAITING, DONE, CANCELLED} private final Connection connection; private final Session session; private final MessageConsumer replyConsumer; private final BlockingQueue<T> reply = new ArrayBlockingQueue<>( 1 ); private volatile State state = State.WAITING; public JmsReplyFuture(Connection connection, Session session, Queue replyQueue) throws JMSException { this .connection = connection; this .session = session; replyConsumer = session.createConsumer(replyQueue); replyConsumer.setMessageListener( this ); } @Override public boolean cancel( boolean mayInterruptIfRunning) { try { state = State.CANCELLED; cleanUp(); return true ; } catch (JMSException e) { throw Throwables.propagate(e); } } @Override public boolean isCancelled() { return state == State.CANCELLED; } @Override public boolean isDone() { return state == State.DONE; } @Override public T get() throws InterruptedException, ExecutionException { return this .reply.take(); } @Override public T get( long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { final T replyOrNull = reply.poll(timeout, unit); if (replyOrNull == null ) { throw new TimeoutException(); } return replyOrNull; } @Override public void onMessage(Message message) { try { final ObjectMessage objectMessage = (ObjectMessage) message; final Serializable object = objectMessage.getObject(); reply.put((T) object); state = State.DONE; cleanUp(); } catch (Exception e) { throw Throwables.propagate(e); } } private void cleanUp() throws JMSException { replyConsumer.close(); session.close(); connection.close(); } } |
Я использую специальный State
enum для хранения информации о государстве. Я считаю, что это намного более читабельно по сравнению со сложными условиями, основанными на множественных флагах, null
проверках и т. Д. Второе, что нужно иметь в виду, это отмена. К счастью, все довольно просто. Мы в основном закрываем базовую сессию / соединение. Он должен оставаться открытым в течение всего обмена сообщениями запроса / ответа, в противном случае временная очередь ответов JMS исчезает. Обратите внимание, что мы не можем легко сообщить брокеру / MDB, что мы больше не заинтересованы в ответе. Мы просто перестаём его слушать, но MDB все равно будет обрабатывать запрос и пытаться отправить ответ в уже не существующую временную очередь.
Так как же все это выглядит на практике? Скажем, у нас есть MDB, который получает число и возвращает его квадрат. Представьте, что вычисление занимает немного времени, поэтому мы начнем его заранее, сделаем некоторую работу тем временем, а затем получим результаты. Вот как может выглядеть такой дизайн:
1
2
3
|
final Future<Double> replyFuture = asynchRequest(connectionFactory, 7 , "square" ); //do some more work final double resp = replyFuture.get(); //49 |
Где "square"
— это имя очереди запросов. Если мы реорганизуем его и используем внедрение зависимостей, мы можем еще больше упростить его до чего-то вроде:
1
2
3
|
final Future<Double> replyFuture = calculator.square( 7 ); //do some more work final double resp = replyFuture.get(); //49 |
Вы знаете, что лучше в этом дизайне? Несмотря на то, что мы используем довольно продвинутые возможности JMS, здесь нет кода JMS. Более того, позже мы можем заменить calculator
другой реализацией, используя SOAP или GPU. Что касается клиентского кода, мы все еще используем абстракцию Future<Double>
. Результат вычисления, который еще не доступен. Основной механизм не имеет значения. В этом красота абстракции.
Очевидно, что эта реализация еще не готова к производству. Но что еще хуже, он пропускает некоторые важные функции. Мы все еще вызываем блокирование Future.get()
в какой-то момент. Более того, нет никакого способа составить / связать фьючерсы (например, когда ответ придет, отправить другое сообщение ) или ожидать завершения самого быстрого будущего. Потерпи!
Ссылка: Внедрение собственного будущего от нашего партнера по JCG Томаша Нуркевича в блоге NoBlogDefFound .