Статьи

ListenableFuture в Гуаве

ListenableFuture в Guava — это попытка определить согласованный API для объектов Future для регистрации обратных вызовов завершения. Благодаря возможности добавить обратный вызов по завершении Future , мы можем асинхронно и эффективно реагировать на входящие события. Если ваше приложение работает с большим количеством объектов будущего, я настоятельно рекомендую использовать ListenableFuture когда вы можете. Технически ListenableFuture расширяет интерфейс Future , добавляя простые:

1
void addListener(Runnable listener, Executor executor)

метод. Вот и все. Если вы ListenableFuture вы можете зарегистрировать Runnable чтобы он выполнялся сразу после завершения рассматриваемого будущего. Вы также должны предоставить Executor ( ExecutorService расширяет его), который будет использоваться для выполнения вашего прослушивателя, чтобы продолжительные прослушиватели не занимали ваши рабочие потоки.

Давайте приведем это в действие. Мы начнем с рефакторинга нашего первого примера веб-сканера для использования ListenableFuture . К счастью, в случае пулов потоков это просто вопрос оборачивания их с помощью MoreExecutors.listeningDecorator() :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
 
for (final URL siteUrl : topSites) {
    final ListenableFuture<String> future = pool.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return IOUtils.toString(siteUrl, StandardCharsets.UTF_8);
        }
    });
 
    future.addListener(new Runnable() {
        @Override
        public void run() {
            try {
                final String contents = future.get();
                //...process web site contents
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            } catch (ExecutionException e) {
                log.error("Exception in task", e.getCause());
            }
        }
    }, MoreExecutors.sameThreadExecutor());
}

Есть несколько интересных наблюдений. Прежде всего обратите внимание, как ListeningExecutorService оборачивает существующий Executor . Это похоже на подход ExecutorCompletionService . Позже мы регистрируем пользовательский Runnable чтобы получать уведомления о завершении каждой задачи. Во-вторых, обратите внимание, как выглядит ужасная обработка ошибок: мы должны обработать InterruptedException (что технически никогда не должно произойти, так как Future уже разрешено, а get() никогда его не выбросит) и ExecutionException . Мы еще не рассмотрели это, но Future<T> должен каким-то образом обрабатывать исключения, возникающие во время асинхронных вычислений. Такие исключения включаются в ExecutionException (таким образом, getCause() во время регистрации), getCause() из get() .

Наконец, обратите внимание на использование MoreExecutors.sameThreadExecutor() . Это удобная абстракция, которую вы можете использовать каждый раз, когда какой-либо API-интерфейс захочет использовать Executor / ExecutorService (предположительно, пул потоков), пока вы хорошо справляетесь с использованием текущего потока. Это особенно полезно во время модульного тестирования — даже если ваш производственный код использует асинхронные задачи, во время тестов вы можете запускать все из одного потока.

Независимо от того, насколько это удобно, весь код кажется немного загроможденным. К счастью, в фантастическом классе Futures есть простой служебный метод:

01
02
03
04
05
06
07
08
09
10
11
Futures.addCallback(future, new FutureCallback<String>() {
    @Override
    public void onSuccess(String contents) {
        //...process web site contents
    }
 
    @Override
    public void onFailure(Throwable throwable) {
        log.error("Exception in task", throwable);
    }
});

FutureCallback — намного более простая абстракция для работы, решает будущее и выполняет обработку исключений для вас. Также вы можете по-прежнему предоставлять пользовательский пул потоков для слушателей, если хотите. Если вы застряли с каким-то устаревшим API, который по-прежнему возвращает Future вы можете попробовать JdkFutureAdapters.listenInPoolThread() который представляет собой адаптер, преобразующий обычный Future<V> в ListenableFuture<V> . Но имейте в виду, что как только вы начнете использовать addListener() , каждому такому адаптеру потребуется один поток для работы, поэтому это решение вообще не масштабируется, и вам следует избегать его.

1
2
3
Future<String> future = //...
ListenableFuture<String> listenableFuture =
        JdkFutureAdapters.listenInPoolThread(future);

Как только мы поймем основы, мы сможем глубоко погрузиться в самое сильное будущее прослушивания: трансформации и цепочки . Это продвинутый материал, вы были предупреждены.

Ссылка: ListenableFuture in Guava от нашего партнера по JCG Томаша Нуркевича в блоге NoBlogDefFound .