Статьи

Неблокирующая асинхронная Java 8 и Scala’s Try / Success / Failure

Вдохновленный недавним информационным бюллетенем от Heinz Kabutz, а также Scut ‘s Futures, который я исследовал в своей недавней книге , я приступил к использованию Java 8, чтобы написать пример того, как передавать работу в службу выполнения и асинхронно отвечать на ее результаты, используя обратные вызовы, так что что нет необходимости блокировать какие-либо потоки, ожидающие результатов от службы выполнения.

Теория говорит, что вызов методов блокировки, таких как get on java.util.concurrent.Future , плох, потому что системе потребуется больше, чем оптимальное количество потоков, если она должна непрерывно выполнять работу, и это приводит к потере времени с переключением контекста .

В мире Scala фреймворки, такие как Akka, используют модели программирования, которые означают, что фреймворки никогда не будут блокироваться — единственный раз, когда поток блокируется, когда пользователь программирует что-то, что блокирует, и ему не рекомендуется это делать. Никогда не блокируя, инфраструктура может обходиться без использования одного потока на ядро, что намного меньше, чем, скажем, на стандартном сервере приложений JBoss Java EE, который имеет до 400 потоков сразу после запуска. Во многом благодаря работе фреймворка Akka, Scala 2.10 добавила Futures and Promises , но их нет (пока?) В Java.

Следующий код показывает цель, которую я имел в виду. Он состоит из трех частей. Во-первых, новые задачи добавляются в службу выполнения с использованием метода статического future найденного в классе ch.maxant.async.Future . Он возвращает Future , но не одно из пакета java.util.concurrent , а скорее его подкласс из пакета ch.maxant.async . Во-вторых, в этом Future есть метод map , который следует функциональному стилю из Scala или нового класса Java 8 Stream . Метод map позволяет зарегистрировать обратный вызов или, точнее, сопоставить (преобразовать) значение, которое содержит первое будущее, в новое значение. Отображение выполняется в другое время в будущем, после того, как первое Future завершено, и поэтому оно приводит к новому Future . В-третьих, мы используем другой метод в классе Future для регистрации обратного вызова, который будет запущен, как только все созданное нами будущее будет завершено. Ни в коем случае не используются какие-либо методы блокировки Future API!

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
final Random random = new Random();
int numTasks = 10;
List<Future<Integer>> futures = new ArrayList<>();
 
for(int i = 0; i < numTasks; i++){
    final int j = i;
    log("adding future " + i);
 
    // PART 1
    //start some work async / in the future
    Future<String> f = future(new Task<String>( () -> {
        sleep(random.nextInt(1000));
        if(j < 5){
            log("working success");
            return "20";
        }else{
            log("working failure");
            throw new Exception();
        }
    }));
 
    // PART 2
    //register a callback, to be called when the work is done
    log("adding mapping callback to future");
    final Future<Integer> f2 = f.map( (Try<String> stringNumber) -> {
        return stringNumber.map( (String s) -> {
            log("mapping '" + s + "' to int");
            return Integer.parseInt(s);
        }).recover( (Exception e) -> {
            log("recovering");
            return -10;
        }).get(); //wont throw an exception, because we provided a recovery!
    });
 
    futures.add(f2);
}
 
// PART 3
log("registering callback for final result");
Future.registerCallback(futures, (List<Try<Integer>> results) -> {
 
    Integer finalResult = results.stream().map( (Try<Integer> t) -> {
        log("mapping " + t);
        try {
            return t.get();
        } catch (Exception e) {
            return 0;
        }
    }).reduce(0, (Integer i1, Integer i2) -> {
        log("reducing " + i1 + " and " + i2);
        return i1 + i2;
    });
 
    log("final result is " + finalResult);
    Future.shutdown();
    if(finalResult != 50){
        throw new RuntimeException("FAILED");
    }else{
        log("SUCESS");
    }
});
 
System.out.println("Completed submitting all tasks on thread " + Thread.currentThread().getId());
 
//this main thread will now die, but the Future executor is still up and running.  the callback will shut it down and with it, the jvm.

Строка 11 вызывает future метод для регистрации новой Task , которая создается с использованием экземпляра Work , созданного здесь с использованием лямбда-кода Java 8. Работа немного спит, а затем либо возвращает число 20 в виде строки, либо выдает исключение, просто чтобы продемонстрировать, как обрабатываются ошибки.

Используя Future который строка 11 возвращает из службы выполнения, строка 25 отображает его значение из строки в целое число, в результате чего Future<Integer> а не Future<String> . Этот результат добавляется в список Future s в строке 35, который часть 3 использует в строке 40. Метод registerCallback гарантирует, что данный обратный вызов будет вызван после завершения последнего будущего.

Отображение в строках 25-33 выполняется с помощью лямбды, которой передается объект Try . Try немного похож на Java 8 Optional и является абстракцией (суперклассом) классов Success и Failure , которые я реализовал на основе своих знаний об их аналогах Scala. Это позволяет программистам легче обрабатывать ошибки, чем явно проверять наличие ошибок. Моя реализация интерфейса Try выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
public interface Try<T> {
 
    /** returns the value, or throws an exception if its a failure. */
    T get() throws Exception;
 
    /** converts the value using the given function, resulting in a new Try */
    <S> Try<S> map(Function1<T, S> func);
 
    /** can be used to handle recovery by converting the exception into a {@link Try} */
    Try<T> recover(Recovery<T> r);
 
}

Что происходит, так это то, что реализация Success и Failure обрабатывает ошибки. Например, если Future в строке 11 первого листинга завершается с исключением, тогда лямбда в строке 25 первого листинга передается объект Failure , и вызов метода map для Failure абсолютно ничего не делает. Никаких исключений не возникает, ничего. Чтобы компенсировать это, вы можете вызвать метод recover , например, в строке 29 первого списка, который позволяет обработать исключение и вернуть значение, с которым ваша программа может продолжить работу, например значение по умолчанию.

Класс Success с другой стороны, реализует методы map и Try интерфейса Try разному, так что вызов map приводит к вызову данной функции, но вызов recover абсолютно ничего не делает. Вместо явного кодирования блока try / catch методы map и recover допускают более приятный синтаксис, который легче проверять при чтении или просмотре кода (что чаще всего происходит с кодом, чем при его написании).

Поскольку методы map и recover обертывают результаты функций в Try s, вы можете объединять вызовы, такие как строки 26, 29 и 32. В Try API от Scala гораздо больше методов, чем в трех, которые я реализовал здесь. Обратите внимание, что я решил не использовать java.util.function.Function в моем Try API, потому что его метод apply не вызывает throw Exception что означало, что код, показанный в первом листинге, был не так хорош, как сейчас. Вместо этого я написал
Интерфейс Function1 .

Часть 3 головоломки состоит в том, как заставить программу сделать что-то полезное после того, как все Future завершено, без неприятных блокирующих вызовов, подобных вызовам метода Future#get() . Решение состоит в том, чтобы зарегистрировать обратный вызов, как показано в строке 40. Этот обратный вызов, как и все другие, показанные здесь, передается службе выполнения. Это означает, что у нас нет гарантии, какой поток его запустит, и это имеет побочный эффект, а именно то, что локальное хранилище потоков (TLS) больше не работает — некоторые фреймворки, такие как (более старые версии?) Hibernate, полагались на TLS, и они просто победили ». здесь не работает У Scala есть хороший способ решения этой проблемы с помощью implicit ключевого слова, которого у Java нет (пока …?), Поэтому необходимо использовать какой-то другой механизм. Я упоминаю об этом, просто чтобы вы знали об этом.

Таким образом, когда завершается последнее будущее, вызываются строки 40-60 и передается List Try содержащий Integer , а не Future . Метод registerCallback преобразует фьючерсы в соответствующие Success или Failure . Но как мы можем превратить их во что-то полезное? Разумеется, с помощью простого сопоставления / сокращения, к счастью, Java 8 теперь поддерживает это с классом Stream , который создается из коллекции Try в строке 42 путем вызова метода stream() . Сначала я отображаю (конвертирую) значения Try в их значения, а затем сокращаю поток до одного значения в строке 49. Вместо того, чтобы передавать собственную реализацию лямбда-выражения, суммирующего значения, я мог бы использовать
Integer::sum , например someStream.reduce(0, Integer::sum) .

В последний раз, когда я запускал программу, она выводила следующее:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
Thread-1 says: adding future 0
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 1
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 2
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 3
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 4
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 5
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 6
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 7
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 8
Thread-1 says: adding mapping callback to future
Thread-1 says: adding future 9
Thread-1 says: adding mapping callback to future
Thread-1 says: registering callback for final result
Thread-10 says: working success
Completed submitting all tasks on thread 1
Thread-14 says: working success
Thread-10 says: working failure
Thread-14 says: working failure
Thread-12 says: working success
Thread-10 says: working failure
Thread-10 says: mapping '20' to int
Thread-10 says: mapping '20' to int
Thread-10 says: recovering
Thread-10 says: recovering
Thread-10 says: mapping '20' to int
Thread-10 says: recovering
Thread-11 says: working success
Thread-11 says: mapping '20' to int
Thread-13 says: working success
Thread-10 says: mapping '20' to int
Thread-12 says: working failure
Thread-12 says: recovering
Thread-14 says: working failure
Thread-14 says: recovering
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(20)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: mapping Success(-10)
Thread-14 says: final result is 50
Thread-14 says: SUCESS

Как видите, основной поток добавляет все задачи и регистрирует все функции отображения (строки 1-20). Затем он регистрирует обратный вызов (строка 21 вывода, которая соответствует строке 39 листинга) и, наконец, выводит текст из строки 63 в листинге, после чего он умирает, потому что ему больше нечего делать. Строки 22 и строки 24-42 выходных данных затем показывают различные потоки в пуле (который содержал 5 потоков), обрабатывающие работу, а также отображающие из String в Integer или восстанавливающие после исключения. Это код в частях 1 и 2 первого списка. Вы можете видеть, что он полностью асинхронный, с некоторыми сопоставлениями / восстановлениями, происходящими до того, как вся начальная работа завершена (сравните строки 38 или 40, которые являются отображением и восстановлением соответственно, со строкой 41 выходных данных, которая происходит позже и является последней начальной работы). Строки 43-52 представляют собой вывод карты / сокращения, который является частью 3 основного списка. Обратите внимание, что в журнале нет сокращения, потому что код, который я запускал и который находится на Github, использует упомянутый выше ярлык Integer::sum , а не строки 50-51 первого списка, показанного выше.

Хотя все это возможно с использованием Java 6 (или даже 5?), Например, путем получения задач, которые отправляются в пул, для отправки обратного вызова самостоятельно, после их завершения объем кода, необходимый для этого, увеличивается и сам код будет хуже, чем показано здесь. Java 8 лямбда, Future которые могут отображаться с помощью обратных вызовов, и Try API с его аккуратной обработкой ошибок — все это делает решение, показанное здесь, возможно, более легким в обслуживании.

Приведенный выше код, а также код для классов в пакете ch.maxant.async доступны под лицензией Apache License версии 2.0 и могут быть загружены из моей учетной записи Github .