Статьи

DeferredResult — асинхронная обработка в Spring MVC

DeferredResult — это контейнер для, возможно, еще не законченных вычислений, который будет доступен в будущем. Spring MVC использует его для представления асинхронных вычислений и использования преимуществ асинхронной обработки запросов Servlet 3.0 AsyncContext . Просто чтобы быстро понять, как это работает:

01
02
03
04
05
06
07
08
09
10
11
12
@RequestMapping("/")
@ResponseBody
public DeferredResult<String> square() throws JMSException {
    final DeferredResult<String> deferredResult = new DeferredResult<>();
    runInOtherThread(deferredResult);
    return deferredResult;
}
  
private void runInOtherThread(DeferredResult<String> deferredResult) {
    //seconds later in other thread...
    deferredResult.setResult("HTTP response is: 42");
}

Обычно после выхода из контроллера обработчик запроса метода выполняется. Но не с DeferredResult . Spring MVC (с использованием возможностей Servlet 3.0) будет удерживать ответ, поддерживая бездействующее HTTP-соединение. Рабочий поток HTTP больше не используется, но соединение HTTP все еще открыто. Позже какой-то другой поток разрешит DeferredResult , присвоив ему какое-то значение. Spring MVC немедленно заберет это событие и отправит ответ (в данном примере «HTTP-ответ: 42» ), завершив обработку запроса.

Вы можете увидеть некоторое концептуальное сходство между Future<V> и DeferredResult — они оба представляют вычисления с результатами, доступными когда-то в будущем. Вы можете спросить, почему Spring MVC не позволяет нам просто возвращать Future<V> а вместо этого вводит новую проприетарную абстракцию? Причина проста и еще раз показывает недостатки Future<V> . Весь смысл асинхронной обработки — избежать блокирования потоков. Стандартный java.util.concurrent.Future не позволяет регистрировать обратные вызовы после завершения вычислений, поэтому вам нужно либо выделить один поток для блокировки до завершения будущего, либо использовать один поток для периодического опроса нескольких фьючерсов. Однако последний вариант потребляет больше ресурсов процессора и вносит задержку. Но превосходное ListenableFuture<V> из Гуавы кажется подходящим? Да, но Spring не зависит от Guava, к счастью, соединение этих двух API довольно просто.

Но сначала взглянем на предыдущую часть о реализации пользовательского java.util.concurrent.Future<V> . Правда, это было не так просто, как можно было ожидать. Очистка, обработка прерываний, блокировка и синхронизация, поддержание состояния. Много общего, когда все, что нам нужно, так же просто, как получить сообщение и вернуть его из get() . Давайте попробуем модифицировать предыдущую реализацию JmsReplyFuture чтобы также реализовать более мощный ListenableFuture — чтобы мы могли использовать его позже в Spring MVC.

ListenableFuture просто расширяет стандарт Future добавляя возможность регистрировать обратные вызовы (слушатели). Так что энергичный разработчик просто сядет и добавит список слушателей Runnable в существующую реализацию:

01
02
03
04
05
06
07
08
09
10
public class JmsReplyFuture<T extends Serializable> implements ListenableFuture<T>, MessageListener {
  
    private final List<Runnable> listeners = new ArrayList<Runnable>();
  
    @Override
    public void addListener(Runnable listener, Executor executor) {
        listeners.add(listener);
    }
  
    //...

Но это сильно упрощено. Конечно, мы должны перебрать всех слушателей, когда будущее закончится или произойдет исключение. Если будущее уже разрешено, когда мы добавляем слушателя, мы должны немедленно вызвать этого слушателя. Более того, мы игнорируем executor — в соответствии с API каждый слушатель может использовать другой пул потоков, предоставленный для addListener() поэтому мы должны хранить пары: Runnable + Executor . И последнее, но не менее addListener() не является поточно- addListener() . Стремительный разработчик исправит все это в течение часа или двух. И потратьте еще два часа, чтобы исправить ошибки, внесенные в это время. И еще несколько часов спустя, когда на производстве появляется еще одна «невозможная» ошибка. Я не жажду На самом деле, мне лень писать даже простейшую реализацию выше. Но я достаточно отчаянно стараюсь нажать Ctrl + H (представление подтипов в IntelliJ IDEA) на ListenableFuture и сканировать доступное дерево реализаций скелета. AbstractFuture<V> — Бинго!

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class JmsReplyListenableFuture<T extends Serializable> extends AbstractFuture<T> implements MessageListener {
  
    private final Connection connection;
    private final Session session;
    private final MessageConsumer replyConsumer;
  
    public JmsReplyListenableFuture(Connection connection, Session session, Queue replyQueue) throws JMSException {
        this.connection = connection;
        this.session = session;
        this.replyConsumer = session.createConsumer(replyQueue);
        this.replyConsumer.setMessageListener(this);
    }
  
    @Override
    public void onMessage(Message message) {
        try {
            final ObjectMessage objectMessage = (ObjectMessage) message;
            final Serializable object = objectMessage.getObject();
            set((T) object);
            cleanUp();
        } catch (Exception e) {
            setException(e);
        }
    }
  
    @Override
    protected void interruptTask() {
        cleanUp();
    }
  
    private void cleanUp() {
        try {
            replyConsumer.close();
            session.close();
            connection.close();
        } catch (Exception e) {
            Throwables.propagate(e);
        }
    }
}

Вот и все, все, скомпилируйте и запустите. Почти в 2 раза меньше кода по сравнению с первоначальной реализацией, и мы получаем намного более мощный ListenableFuture . Большая часть кода настроена и очищена. AbstractFuture уже реализует addListener() , блокировку и обработку состояний для нас. Все, что нам нужно сделать, это вызвать метод set() когда будущее разрешено (в нашем случае приходит ответ JMS). Более того, мы наконец поддерживаем исключения должным образом. Ранее мы просто игнорировали / перебрасывали их, а теперь они правильно упакованы и выброшены из get() при доступе. Даже если мы не были заинтересованы в возможностях ListenableFuture , AbstractFuture прежнему нам очень помогает. И мы получаем ListenableFuture бесплатно.

Хорошие программисты любят писать код. Лучшие любят удалять его . Меньше поддерживать, меньше проверять, меньше ломать. Я иногда поражаюсь, насколько полезной может быть гуава. В прошлый раз я работал с большим количеством итераторов. Данные генерировались динамически, и итераторы могли легко создавать миллионы элементов, поэтому у меня не было выбора. Ограниченный API итератора вместе с довольно сложной бизнес-логикой — это рецепт для бесконечного количества кода. А потом я нашел вспомогательный класс Iterators и он спас мне жизнь. Я предлагаю вам открыть JavaDoc Guava и пройти все пакеты, класс за классом. Ты поблагодаришь меня позже.

Как только у нас будет наш собственный ListenableFuture (очевидно, вы можете использовать любую реализацию), мы можем попытаться интегрировать его с Spring MVC. Вот чего мы хотим достичь:

  1. HTTP-запрос приходит
  2. Отправляем запрос в очередь JMS
  3. Рабочий поток HTTP больше не используется, он может обслуживать другие запросы
  4. Слушатель JMS асинхронно ожидает ответа во временной очереди
  5. Как только приходит ответ, мы немедленно отправляем его как HTTP-ответ, и соединение установлено.

Первая наивная реализация с использованием блокировки Future :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Controller
public class JmsController {
  
    private final ConnectionFactory connectionFactory;
  
    public JmsController(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }
  
    @RequestMapping("/square/{value}")
    @ResponseBody
    public String square(@PathVariable double value) throws JMSException, ExecutionException, InterruptedException {
        final ListenableFuture<Double> responseFuture = request(value);
        return responseFuture.get().toString();
    }
  
    //JMS API boilerplate
    private <T extends Serializable> ListenableFuture<T> request(Serializable request) throws JMSException {
        Connection connection = this.connectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        final Queue tempReplyQueue = session.createTemporaryQueue();
        final ObjectMessage requestMsg = session.createObjectMessage(request);
        requestMsg.setJMSReplyTo(tempReplyQueue);
        sendRequest(session.createQueue("square"), session, requestMsg);
        return new JmsReplyListenableFuture<T>(connection, session, tempReplyQueue);
    }
  
    private void sendRequest(Queue queue, Session session, ObjectMessage requestMsg) throws JMSException {
        final MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.send(requestMsg);
        producer.close();
    }
  
}

Эта реализация не очень удачна. На самом деле Future нам вообще не нужен, так как мы едва блокируем get() , синхронно ожидая ответа. Давайте попробуем с DeferredResult :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
@RequestMapping("/square/{value}")
@ResponseBody
public DeferredResult<String> square(@PathVariable double value) throws JMSException {
    final DeferredResult<String> deferredResult = new DeferredResult<>();
    final ListenableFuture<Double> responseFuture = request(value);
    Futures.addCallback(responseFuture, new FutureCallback<Double>() {
        @Override
        public void onSuccess(Double result) {
            deferredResult.setResult(result.toString());
        }
  
        @Override
        public void onFailure(Throwable t) {
            deferredResult.setErrorResult(t);
        }
    });
    return deferredResult;
}

Гораздо сложнее, но также будет гораздо более масштабируемым. Этот метод почти не требует времени для выполнения, и рабочий поток HTTP вскоре после того, как готов обработать другой запрос. Самое большое замечание — это то, что onSuccess() и onFailure() выполняются другим потоком спустя секунды или даже минуты. Но пул рабочих потоков HTTP не исчерпан, и приложение остается отзывчивым.

Это был пример школьной книги, но можем ли мы сделать лучше? Первая попытка — написать универсальный адаптер из ListenableFuture в DeferredResult . Эти две абстракции представляют собой одно и то же, но с разными API. Это довольно просто:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
public class ListenableFutureAdapter<T> extends DeferredResult<String> {
  
    public ListenableFutureAdapter(final ListenableFuture<T> target) {
        Futures.addCallback(target, new FutureCallback<T>() {
            @Override
            public void onSuccess(T result) {
                setResult(result.toString());
            }
  
            @Override
            public void onFailure(Throwable t) {
                setErrorResult(t);
            }
        });
    }
}

Мы просто расширяем DeferredResult и уведомляем его с помощью обратных вызовов ListenableFuture . Использование простое:

1
2
3
4
5
6
@RequestMapping("/square/{value}")
@ResponseBody
public DeferredResult<String> square(@PathVariable double value) throws JMSException {
    final ListenableFuture<Double> responseFuture = request(value);
    return new ListenableFutureAdapter<>(responseFuture);
}

Но мы можем сделать еще лучше! Если ListenableFuture и DeferredResult очень похожи, почему бы просто не вернуть ListenableFuture из метода обработчика контроллера?

1
2
3
4
5
6
@RequestMapping("/square/{value}")
@ResponseBody
public ListenableFuture<Double> square2(@PathVariable double value) throws JMSException {
    final ListenableFuture<Double> responseFuture = request(value);
    return responseFuture;
}

Ну, это не сработает, потому что Spring не понимает ListenableFuture и просто взорвется. К счастью, Spring MVC очень гибок и позволяет нам легко регистрировать новые так называемые HandlerMethodReturnValueHandler . Существует 12 таких встроенных обработчиков, и каждый раз, когда мы возвращаем некоторый объект из контроллера, Spring MVC проверяет их в предопределенном порядке и выбирает первый, который может обрабатывать данный тип. Одним из таких обработчиков является DeferredResultHandler (имя говорит само за себя), который мы будем использовать в качестве ссылки:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ListenableFutureReturnValueHandler implements HandlerMethodReturnValueHandler {
  
    public boolean supportsReturnType(MethodParameter returnType) {
        Class<?> paramType = returnType.getParameterType();
        return ListenableFuture.class.isAssignableFrom(paramType);
    }
  
    public void handleReturnValue(Object returnValue,
                                  MethodParameter returnType, ModelAndViewContainer mavContainer,
                                  NativeWebRequest webRequest) throws Exception {
  
        if (returnValue == null) {
            mavContainer.setRequestHandled(true);
            return;
        }
  
        final DeferredResult<Object> deferredResult = new DeferredResult<>();
        Futures.addCallback((ListenableFuture<?>) returnValue, new FutureCallback<Object>() {
            @Override
            public void onSuccess(Object result) {
                deferredResult.setResult(result.toString());
            }
  
            @Override
            public void onFailure(Throwable t) {
                deferredResult.setErrorResult(t);
            }
        });
        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
    }
  
}

Закончив карму, установка этого обработчика не так проста, как я надеялся. Технически существует WebMvcConfigurerAdapter.addReturnValueHandlers() который мы можем легко переопределить, если использовать конфигурацию Java для Spring MVC. Но этот метод добавляет пользовательский обработчик возвращаемого значения в конце цепочки обработчиков, и по причинам, выходящим за рамки этой статьи, нам нужно добавить его в начале (более высокий приоритет). К счастью, немного взломав, мы также можем достичь этого:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
@Configuration
@EnableWebMvc
public class SpringConfig extends WebMvcConfigurerAdapter {
  
    @Resource
    private RequestMappingHandlerAdapter requestMappingHandlerAdapter;
  
    @PostConstruct
    public void init() {
        final List<HandlerMethodReturnValueHandler> originalHandlers = new ArrayList<>(requestMappingHandlerAdapter.getReturnValueHandlers().getHandlers());
        originalHandlers.add(0, listenableFutureReturnValueHandler());
        requestMappingHandlerAdapter.setReturnValueHandlers(originalHandlers);
    }
  
    @Bean
    public HandlerMethodReturnValueHandler listenableFutureReturnValueHandler() {
        return new ListenableFutureReturnValueHandler();
    }
  
}

Резюме

В этой статье мы познакомились с другим воплощением абстракции будущего / обещания под названием DeferredResult . Он используется для того, чтобы отложить обработку HTTP-запроса до завершения какой-либо асинхронной задачи. Таким образом, DeferredResult отлично подходит для веб-интерфейсов, построенных на основе управляемых событиями систем, брокеров сообщений и т. Д. Он не такой мощный, как API Servlet 3.0. Например, мы не можем передавать несколько событий по мере их поступления (например, новые твиты) в длительное HTTP-соединение — Spring MVC больше ориентирован на шаблон запроса-ответа.

Мы также настроили Spring MVC, чтобы позволить возвращать ListenableFuture из Guava непосредственно из метода контроллера. Это делает наш код намного чище и выразительнее.

Ссылка: DeferredResult — асинхронная обработка в Spring MVC от нашего партнера по JCG Томаша Нуркевича в блоге NoBlogDefFound .