ListenableFuture
в Guava — это попытка определить согласованный API для объектов Future
для регистрации обратных вызовов завершения. Благодаря возможности добавить обратный вызов по завершении Future
, мы можем асинхронно и эффективно реагировать на входящие события. Если ваше приложение работает с большим количеством объектов будущего, я настоятельно рекомендую использовать ListenableFuture
когда вы можете. Технически ListenableFuture
расширяет интерфейс Future
, добавляя простые:
1
|
void addListener(Runnable listener, Executor executor) |
метод. Вот и все. Если вы ListenableFuture
вы можете зарегистрировать Runnable
чтобы он выполнялся сразу после завершения рассматриваемого будущего. Вы также должны предоставить Executor
( ExecutorService
расширяет его), который будет использоваться для выполнения вашего прослушивателя, чтобы продолжительные прослушиватели не занимали ваши рабочие потоки.
Давайте приведем это в действие. Мы начнем с рефакторинга нашего первого примера веб-сканера для использования ListenableFuture
. К счастью, в случае пулов потоков это просто вопрос оборачивания их с помощью MoreExecutors.listeningDecorator()
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool( 10 )); for ( final URL siteUrl : topSites) { final ListenableFuture<String> future = pool.submit( new Callable<String>() { @Override public String call() throws Exception { return IOUtils.toString(siteUrl, StandardCharsets.UTF_8); } }); future.addListener( new Runnable() { @Override public void run() { try { final String contents = future.get(); //...process web site contents } catch (InterruptedException e) { log.error( "Interrupted" , e); } catch (ExecutionException e) { log.error( "Exception in task" , e.getCause()); } } }, MoreExecutors.sameThreadExecutor()); } |
Есть несколько интересных наблюдений. Прежде всего обратите внимание, как ListeningExecutorService
оборачивает существующий Executor
. Это похоже на подход ExecutorCompletionService
. Позже мы регистрируем пользовательский Runnable
чтобы получать уведомления о завершении каждой задачи. Во-вторых, обратите внимание, как выглядит ужасная обработка ошибок: мы должны обработать InterruptedException
(что технически никогда не должно произойти, так как Future
уже разрешено, а get()
никогда его не выбросит) и ExecutionException
. Мы еще не рассмотрели это, но Future<T>
должен каким-то образом обрабатывать исключения, возникающие во время асинхронных вычислений. Такие исключения включаются в ExecutionException
(таким образом, getCause()
во время регистрации), getCause()
из get()
.
Наконец, обратите внимание на использование MoreExecutors.sameThreadExecutor()
. Это удобная абстракция, которую вы можете использовать каждый раз, когда какой-либо API-интерфейс захочет использовать Executor
/ ExecutorService
(предположительно, пул потоков), пока вы хорошо справляетесь с использованием текущего потока. Это особенно полезно во время модульного тестирования — даже если ваш производственный код использует асинхронные задачи, во время тестов вы можете запускать все из одного потока.
Независимо от того, насколько это удобно, весь код кажется немного загроможденным. К счастью, в фантастическом классе Futures
есть простой служебный метод:
01
02
03
04
05
06
07
08
09
10
11
|
Futures.addCallback(future, new FutureCallback<String>() { @Override public void onSuccess(String contents) { //...process web site contents } @Override public void onFailure(Throwable throwable) { log.error( "Exception in task" , throwable); } }); |
FutureCallback
— намного более простая абстракция для работы, решает будущее и выполняет обработку исключений для вас. Также вы можете по-прежнему предоставлять пользовательский пул потоков для слушателей, если хотите. Если вы застряли с каким-то устаревшим API, который по-прежнему возвращает Future
вы можете попробовать JdkFutureAdapters.listenInPoolThread()
который представляет собой адаптер, преобразующий обычный Future<V>
в ListenableFuture<V>
. Но имейте в виду, что как только вы начнете использовать addListener()
, каждому такому адаптеру потребуется один поток для работы, поэтому это решение вообще не масштабируется, и вам следует избегать его.
1
2
3
|
Future<String> future = //... ListenableFuture<String> listenableFuture = JdkFutureAdapters.listenInPoolThread(future); |
Как только мы поймем основы, мы сможем глубоко погрузиться в самое сильное будущее прослушивания: трансформации и цепочки . Это продвинутый материал, вы были предупреждены.
Ссылка: ListenableFuture in Guava от нашего партнера по JCG Томаша Нуркевича в блоге NoBlogDefFound .