Статьи

Расширенные возможности прослушивания в будущем

В прошлый раз мы познакомилисьListenableFuture . Я обещал ввести более продвинутые методы, а именно преобразования и цепочки. Давайте начнем с чего-то простого. Скажем, у нас есть наш, ListenableFuture<String>который мы получили от некоторого асинхронного сервиса. У нас также есть простой метод:

Document parse(String xml) {//...

Нам не нужно String, нам нужно Document. Одним из способов было бы просто решить Future( дождаться его) и выполнить обработку String. Но гораздо более элегантное решение — применить преобразование, как только результаты станут доступны, и обработать наш метод, как если бы он всегда возвращался ListenableFuture<Document>. Это довольно просто:

final ListenableFuture<String> future = //...
 
final ListenableFuture<Document> documentFuture = Futures.transform(future, new Function<String, Document>() {
    @Override
    public Document apply(String contents) {
        return parse(contents);
    }
});

или более читаемый:

final Function<String, Document> parseFun = new Function<String, Document>() {
    @Override
    public Document apply(String contents) {
        return parse(contents);
    }
};
 
final ListenableFuture<String> future = //...
 
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);

Синтаксис Java немного ограничивает, но, пожалуйста, обратите внимание на то, что мы только что сделали. Futures.transform()не ждет, пока не ListenableFuture<String>будет применено parse()преобразование. Вместо этого, под капотом, он регистрирует обратный вызов, желая получать уведомления всякий раз, когда данное будущее заканчивается. Это преобразование применяется динамично и прозрачно для нас в нужный момент. У нас еще есть Future, но на этот раз упаковка Document.

Итак, давайте сделаем еще один шаг вперед. У нас также есть асинхронный, возможно длительный метод, который вычисляет релевантность (независимо от того, что в этом контексте) данного Document:

ListenableFuture<Double> calculateRelevance(Document pageContents) {//...

Можем ли мы каким-то образом связать это с ListenableFuture<Document>нами уже есть? Первая попытка:

final Function<Document, ListenableFuture<Double>> relevanceFun = new Function<Document, ListenableFuture<Double>>() {
    @Override
    public ListenableFuture<Double> apply(Document input) {
        return calculateRelevance(input);
    }
};
 
final ListenableFuture<String> future = //...
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
final ListenableFuture<ListenableFuture<Double>> relevanceFuture = Futures.transform(documentFuture, relevanceFun);

Ой! Будущее будущегоDouble , это выглядит не очень хорошо. Как только мы решим внешнее будущее, нам нужно ждать и внутреннего. Определенно не элегантно. Можем ли мы сделать лучше?

final AsyncFunction<Document, Double> relevanceAsyncFun = new AsyncFunction<Document, Double>() {
    @Override
    public ListenableFuture<Double> apply(Document pageContents) throws Exception {
        return calculateRelevance(pageContents);
    }
};
 
final ListenableFuture<String> future = //comes from ListeningExecutorService
final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceAsyncFun);

Пожалуйста, внимательно посмотрите на все типы и результаты. Обратите внимание на разницу между Functionи AsyncFunction. Изначально мы получили асинхронный метод, возвращающий будущее String. Позже мы преобразовали это, чтобы плавно превратиться Stringв XML Document. Это преобразование происходит асинхронно, когда завершается внутреннее будущее. Имея будущее, Documentмы хотели бы вызвать метод, который требует Documentи возвращает будущее Double.

Если мы вызываем relevanceFuture.get(), наш Futureобъект сначала будет ждать завершения внутренней задачи, а результат ( String-> Document) будет ждать внешней задачи и вернуться Double. Мы также можем зарегистрировать обратные вызовы, по relevanceFutureкоторым будет срабатывать внешняя задача (calculateRelevance()) заканчивает. Если вы все еще здесь, это еще более безумные преобразования.

Помните, что все это происходит в цикле. Для каждого веб-сайта мы получили ListenableFuture<String>асинхронное преобразование ListenableFuture<Double>. Итак, в конце концов, мы работаем с List<ListenableFuture<Double>>. Это также означает, что для извлечения всех результатов мы должны либо зарегистрировать слушателя для каждого, ListenableFutureлибо ждать каждого из них. Который не прогрессирует нас вообще. Но что, если бы мы могли легко трансформироваться из List<ListenableFuture<Double>>в ListenableFuture<List<Double>>? Читайте внимательно — от списка фьючерсов до будущего списка. Другими словами, вместо того, чтобы иметь кучу маленьких фьючерсов, у нас есть одно будущее, которое завершится, когда все дочерние фьючерсы завершатся — и результаты будут сопоставлены один к одному с целевым списком. Угадайте, что Гуава может сделать это!

final List<ListenableFuture<Double>> relevanceFutures = //...;
final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);

Конечно, здесь тоже нет ожидания. Оболочка ListenableFuture<List<Double>>будет уведомляться каждый раз, когда завершается одно из ее дочерних фьючерсов. В тот момент, когда последний ребенок ListenableFuture<Double>завершает, внешнее будущее также завершается. Все зависит от событий и полностью скрыто от вас.

Ты думаешь это все? Скажем, мы хотели бы вычислить наибольшую релевантность во всем наборе. Как вы, наверное, уже знаете, мы не будем ждать List<Double>. Вместо этого мы зарегистрируем преобразование из List<Double>в Double!

final ListenableFuture<Double> maxRelevanceFuture = Futures.transform(futureOfRelevance, new Function<List<Double>, Double>() {
    @Override
    public Double apply(List<Double> relevanceList) {
        return Collections.max(relevanceList);
    }
});

Наконец, мы можем прослушивать событие завершения maxRelevanceFutureи, например, отправлять результаты (асинхронно!), Используя JMS. Вот полный код, если вы потеряли трек:

private Document parse(String xml) {
    return //...
}
 
private final Function<String, Document> parseFun = new Function<String, Document>() {
    @Override
    public Document apply(String contents) {
        return parse(contents);
    }
};
 
private ListenableFuture<Double> calculateRelevance(Document pageContents) {
    return //...
}
 
final AsyncFunction<Document, Double> relevanceAsyncFun = new AsyncFunction<Document, Double>() {
    @Override
    public ListenableFuture<Double> apply(Document pageContents) throws Exception {
        return calculateRelevance(pageContents);
    }
};
 
//...
 
final ListeningExecutorService pool = MoreExecutors.listeningDecorator(
    Executors.newFixedThreadPool(10)
);
 
final List<ListenableFuture<Double>> relevanceFutures = new ArrayList<>(topSites.size());
for (final URL siteUrl : topSites) {
    final ListenableFuture<String> future = pool.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return IOUtils.toString(siteUrl, StandardCharsets.UTF_8);
        }
    });
    final ListenableFuture<Document> documentFuture = Futures.transform(future, parseFun);
    final ListenableFuture<Double> relevanceFuture = Futures.transform(documentFuture, relevanceAsyncFun);
    relevanceFutures.add(relevanceFuture);
}
 
final ListenableFuture<List<Double>> futureOfRelevance = Futures.allAsList(relevanceFutures);
final ListenableFuture<Double> maxRelevanceFuture = Futures.transform(futureOfRelevance, new Function<List<Double>, Double>() {
    @Override
    public Double apply(List<Double> relevanceList) {
        return Collections.max(relevanceList);
    }
});
 
Futures.addCallback(maxRelevanceFuture, new FutureCallback<Double>() {
    @Override
    public void onSuccess(Double result) {
        log.debug("Result: {}", result);
    }
 
    @Override
    public void onFailure(Throwable t) {
        log.error("Error :-(", t);
    }
});

Стоило ли это? Да и нет . Да , потому что мы изучили некоторые действительно важные конструкции и примитивы, используемые вместе с фьючерсами / обещаниями: сцепление, отображение (преобразование) и сокращение. Это прекрасное решение с точки зрения использования процессора — не нужно ждать, блокировать и т. Д. Помните, что самая сильная сторона Node.js — это политика «не блокировать». Также в Netty фьючерсы распространены повсеместно. И последнее, но не менее важное: он очень функциональный .

С другой стороны, в основном из-за многословия синтаксиса Java и отсутствия вывода типа (да, мы скоро перейдем к Scala), код кажется очень нечитаемым, его трудно отслеживать и поддерживать. Ну, в некоторой степени это справедливо для всех систем, управляемых сообщениями. Но пока мы не изобретаем лучшие API и примитивы, мы должны научиться жить и использовать асинхронные высокопараллельные вычисления.

Если вы хотите ListenableFutureеще больше поэкспериментировать , не забудьте прочитать официальную документацию .