ListenableFuture в Guava — это попытка определить согласованный API для объектов Future для регистрации обратных вызовов завершения. Благодаря возможности добавить обратный вызов по завершении Future , мы можем асинхронно и эффективно реагировать на входящие события. Если ваше приложение работает с большим количеством объектов будущего, я настоятельно рекомендую использовать ListenableFuture когда вы можете. Технически ListenableFuture расширяет интерфейс Future , добавляя простые:
|
1
|
void addListener(Runnable listener, Executor executor) |
метод. Вот и все. Если вы ListenableFuture вы можете зарегистрировать Runnable чтобы он выполнялся сразу после завершения рассматриваемого будущего. Вы также должны предоставить Executor ( ExecutorService расширяет его), который будет использоваться для выполнения вашего прослушивателя, чтобы продолжительные прослушиватели не занимали ваши рабочие потоки.
Давайте приведем это в действие. Мы начнем с рефакторинга нашего первого примера веб-сканера для использования ListenableFuture . К счастью, в случае пулов потоков это просто вопрос оборачивания их с помощью MoreExecutors.listeningDecorator() :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));for (final URL siteUrl : topSites) { final ListenableFuture<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { return IOUtils.toString(siteUrl, StandardCharsets.UTF_8); } }); future.addListener(new Runnable() { @Override public void run() { try { final String contents = future.get(); //...process web site contents } catch (InterruptedException e) { log.error("Interrupted", e); } catch (ExecutionException e) { log.error("Exception in task", e.getCause()); } } }, MoreExecutors.sameThreadExecutor());} |
Есть несколько интересных наблюдений. Прежде всего обратите внимание, как ListeningExecutorService оборачивает существующий Executor . Это похоже на подход ExecutorCompletionService . Позже мы регистрируем пользовательский Runnable чтобы получать уведомления о завершении каждой задачи. Во-вторых, обратите внимание, как выглядит ужасная обработка ошибок: мы должны обработать InterruptedException (что технически никогда не должно произойти, так как Future уже разрешено, а get() никогда его не выбросит) и ExecutionException . Мы еще не рассмотрели это, но Future<T> должен каким-то образом обрабатывать исключения, возникающие во время асинхронных вычислений. Такие исключения включаются в ExecutionException (таким образом, getCause() во время регистрации), getCause() из get() .
Наконец, обратите внимание на использование MoreExecutors.sameThreadExecutor() . Это удобная абстракция, которую вы можете использовать каждый раз, когда какой-либо API-интерфейс захочет использовать Executor / ExecutorService (предположительно, пул потоков), пока вы хорошо справляетесь с использованием текущего потока. Это особенно полезно во время модульного тестирования — даже если ваш производственный код использует асинхронные задачи, во время тестов вы можете запускать все из одного потока.
Независимо от того, насколько это удобно, весь код кажется немного загроможденным. К счастью, в фантастическом классе Futures есть простой служебный метод:
|
01
02
03
04
05
06
07
08
09
10
11
|
Futures.addCallback(future, new FutureCallback<String>() { @Override public void onSuccess(String contents) { //...process web site contents } @Override public void onFailure(Throwable throwable) { log.error("Exception in task", throwable); }}); |
FutureCallback — намного более простая абстракция для работы, решает будущее и выполняет обработку исключений для вас. Также вы можете по-прежнему предоставлять пользовательский пул потоков для слушателей, если хотите. Если вы застряли с каким-то устаревшим API, который по-прежнему возвращает Future вы можете попробовать JdkFutureAdapters.listenInPoolThread() который представляет собой адаптер, преобразующий обычный Future<V> в ListenableFuture<V> . Но имейте в виду, что как только вы начнете использовать addListener() , каждому такому адаптеру потребуется один поток для работы, поэтому это решение вообще не масштабируется, и вам следует избегать его.
|
1
2
3
|
Future<String> future = //...ListenableFuture<String> listenableFuture = JdkFutureAdapters.listenInPoolThread(future); |
Как только мы поймем основы, мы сможем глубоко погрузиться в самое сильное будущее прослушивания: трансформации и цепочки . Это продвинутый материал, вы были предупреждены.
Ссылка: ListenableFuture in Guava от нашего партнера по JCG Томаша Нуркевича в блоге NoBlogDefFound .