Статьи

Расширенные возможности ListenableFuture

В прошлый раз мы познакомились с ListenableFuture . Я обещал ввести более продвинутые методы, а именно преобразования и цепочки. Давайте начнем с чего-то простого. Скажем, у нас есть ListenableFuture<String> который мы получили от некоторого асинхронного сервиса. У нас также есть простой метод:

1
Document parse(String xml) {//...

Нам не нужна String , нам нужен Document . Одним из способов было бы просто разрешить Future ( дождаться его) и выполнить обработку на String . Но гораздо более элегантное решение – применить преобразование, как только результаты станут доступны, и обработать наш метод, как если бы он всегда возвращал ListenableFuture<Document> . Это довольно просто:

1
2
3
4
5
6
7
8
final ListenableFuture<String> future = //...
  
final ListenableFuture<Document> documentFuture = Futures.transform(future, new Function<String, Document>() {
    @Override
    public Document apply(String contents) {
        return parse(contents);
    }
});

или более читаемый:

01
02
03
04
05
06
07
08
09
10
final Function<String, Document> parseFun = new Function<String, Document>() {
    @Override
    public Document apply(String contents) {
        return parse(contents);
    }
};
  
final ListenableFuture<String> future = //...
  
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);

Синтаксис Java немного ограничивает, но, пожалуйста, обратите внимание на то, что мы только что сделали. Futures.transform() не ожидает, когда базовый ListenableFuture<String> применит преобразование parse() . Вместо этого, под капотом, он регистрирует обратный вызов, желая получать уведомления всякий раз, когда данное будущее заканчивается. Это преобразование применяется динамично и прозрачно для нас в нужный момент. У нас еще есть Future , но на этот раз оберточный Document .

Итак, давайте сделаем еще один шаг вперед. У нас также есть асинхронный, возможно длительный метод, который вычисляет релевантность (независимо от того, что в этом контексте) данного Document :

1
ListenableFuture<double> calculateRelevance(Document pageContents) {//...</double>

Можем ли мы каким-то образом связать это с ListenableFuture<Document> у нас уже есть? Первая попытка:

01
02
03
04
05
06
07
08
09
10
final Function<Document, ListenableFuture<Double>> relevanceFun = new Function<Document, ListenableFuture<Double>>() {
    @Override
    public ListenableFuture<Double> apply(Document input) {
        return calculateRelevance(input);
    }
};
  
final ListenableFuture<String> future = //...
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
final ListenableFuture<ListenableFuture<Double>> relevanceFuture = Futures.transform(documentFuture, relevanceFun);

Ой! Будущее будущего Double , это выглядит не очень хорошо. Как только мы решим внешнее будущее, нам нужно ждать и внутреннего. Определенно не элегантно. Можем ли мы сделать лучше?

01
02
03
04
05
06
07
08
09
10
final AsyncFunction<Document, Double> relevanceAsyncFun = new AsyncFunction<Document, Double>() {
    @Override
    public ListenableFuture<Double> apply(Document pageContents) throws Exception {
        return calculateRelevance(pageContents);
    }
};
  
final ListenableFuture<String> future = //comes from ListeningExecutorService
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceAsyncFun);

Пожалуйста, внимательно посмотрите на все типы и результаты. Обратите внимание на разницу между Function и AsyncFunction . Изначально мы получили асинхронный метод, возвращающий будущее String . Позже мы преобразовали его, чтобы плавно превратить String в XML- Document . Это преобразование происходит асинхронно, когда завершается внутреннее будущее. Имея будущее Document мы хотели бы вызвать метод, который требует Document и возвращает будущее Double .

Если мы вызовем relevanceFuture.get() , наш объект Future сначала будет ожидать завершения внутренней задачи, а его результат ( String -> Document ) будет ожидать внешней задачи и вернет Double . Мы также можем зарегистрировать обратные вызовы для relevanceFuture будущем, которые будут срабатывать при завершении внешней задачи ( calculateRelevance() ). Если вы все еще здесь, это еще более безумные преобразования.

Помните, что все это происходит в цикле. Для каждого веб-сайта мы получили ListenableFuture<String> который мы асинхронно преобразовали в ListenableFuture<Double> . В итоге мы работаем со List<ListenableFuture<Double>> . Это также означает, что для извлечения всех результатов мы должны либо зарегистрировать прослушиватель для каждого ListenableFuture либо ждать каждого из них. Который не прогрессирует нас вообще. Но что, если бы мы могли легко преобразовать из List<ListenableFuture<Double>> в ListenableFuture<List<Double>> ? Читайте внимательно – от списка фьючерсов до будущего списка. Другими словами, вместо того, чтобы иметь кучу маленьких фьючерсов, у нас есть одно будущее, которое завершится, когда все дочерние фьючерсы будут завершены, – и результаты будут сопоставлены один к одному с целевым списком. Угадайте, что Гуава может сделать это!

1
2
final List<ListenableFuture<Double>> relevanceFutures = //...;
final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);

Конечно, здесь тоже нет ожидания. Wrapper ListenableFuture<List<Double>> будет уведомляться каждый раз, когда завершается одно из его дочерних фьючерсов. В момент завершения последнего дочернего ListenableFuture<Double> завершается внешнее будущее. Все зависит от событий и полностью скрыто от вас.

Ты думаешь это все? Скажем, мы хотели бы вычислить наибольшую релевантность во всем наборе. Как вы, наверное, уже знаете, мы не будем ждать List<Double> . Вместо этого мы зарегистрируем преобразование из List<Double> в Double !

1
2
3
4
5
6
final ListenableFuture<Double> maxRelevanceFuture = Futures.transform(futureOfRelevance, new Function<List<Double>, Double>() {
    @Override
    public Double apply(List<Double> relevanceList) {
        return Collections.max(relevanceList);
    }
});

Наконец, мы можем прослушивать событие завершения maxRelevanceFuture и, например, отправлять результаты (асинхронно!) С использованием JMS. Вот полный код, если вы потеряли трек:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private Document parse(String xml) {
    return //...
}
  
private final Function<String, Document> parseFun = new Function<String, Document>() {
    @Override
    public Document apply(String contents) {
        return parse(contents);
    }
};
  
private ListenableFuture<Double> calculateRelevance(Document pageContents) {
    return //...
}
  
final AsyncFunction<Document, Double> relevanceAsyncFun = new AsyncFunction<Document, Double>() {
    @Override
    public ListenableFuture<Double> apply(Document pageContents) throws Exception {
        return calculateRelevance(pageContents);
    }
};
  
//...
  
final ListeningExecutorService pool = MoreExecutors.listeningDecorator(
    Executors.newFixedThreadPool(10)
);
  
final List<ListenableFuture<Double>> relevanceFutures = new ArrayList<>(topSites.size());
for (final URL siteUrl : topSites) {
    final ListenableFuture<String> future = pool.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return IOUtils.toString(siteUrl, StandardCharsets.UTF_8);
        }
    });
    final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
    final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceAsyncFun);
    relevanceFutures.add(relevanceFuture);
}
  
final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);
final ListenableFuture<Double> maxRelevanceFuture = Futures.transform(futureOfRelevance, new Function<List<Double>, Double>() {
    @Override
    public Double apply(List<Double> relevanceList) {
        return Collections.max(relevanceList);
    }
});
  
Futures.addCallback(maxRelevanceFuture, new FutureCallback<Double>() {
    @Override
    public void onSuccess(Double result) {
        log.debug("Result: {}", result);
    }
  
    @Override
    public void onFailure(Throwable t) {
        log.error("Error :-(", t);
    }
});

Стоило ли? Да и нет . Да , потому что мы изучили некоторые действительно важные конструкции и примитивы, используемые вместе с фьючерсами / обещаниями: сцепление, отображение (преобразование) и сокращение. Это красивое решение с точки зрения использования процессора – не нужно ждать, блокировать и т. Д. Помните, что самая сильная сторона Node.js – это политика «не блокировать». Также в Netty фьючерсы распространены повсеместно. И последнее, но не менее важное: он очень функциональный .

С другой стороны, в основном из-за многословия синтаксиса Java и отсутствия вывода типа (да, мы скоро перейдем к Scala), код кажется очень нечитаемым, его трудно отслеживать и поддерживать. Ну, в некоторой степени это справедливо для всех систем, управляемых сообщениями. Но пока мы не изобретаем лучшие API и примитивы, мы должны научиться жить и использовать асинхронные высокопараллельные вычисления.

Если вы хотите еще больше поэкспериментировать с ListenableFuture , не забудьте прочитать официальную документацию .

Ссылка: Расширенные возможности ListenableFuture от нашего партнера по JCG Томаша Нуркевича в блоге NoBlogDefFound .