Вдохновленный недавним информационным бюллетенем от Heinz Kabutz, а также Scut ‘s Futures, который я исследовал в своей недавней книге , я приступил к использованию Java 8, чтобы написать пример того, как передавать работу в службу выполнения и асинхронно отвечать на ее результаты, используя обратные вызовы, так что что нет необходимости блокировать какие-либо потоки, ожидающие результатов от службы выполнения.
Теория говорит, что вызов методов блокировки, таких как get
on java.util.concurrent.Future
, плох, потому что системе потребуется больше, чем оптимальное количество потоков, если она должна непрерывно выполнять работу, и это приводит к потере времени с переключением контекста .
В мире Scala фреймворки, такие как Akka, используют модели программирования, которые означают, что фреймворки никогда не будут блокироваться — единственный раз, когда поток блокируется, когда пользователь программирует что-то, что блокирует, и ему не рекомендуется это делать. Никогда не блокируя, инфраструктура может обходиться без использования одного потока на ядро, что намного меньше, чем, скажем, на стандартном сервере приложений JBoss Java EE, который имеет до 400 потоков сразу после запуска. Во многом благодаря работе фреймворка Akka, Scala 2.10 добавила Futures and Promises , но их нет (пока?) В Java.
Следующий код показывает цель, которую я имел в виду. Он состоит из трех частей. Во-первых, новые задачи добавляются в службу выполнения с использованием метода статического future
найденного в классе ch.maxant.async.Future
. Он возвращает Future
, но не одно из пакета java.util.concurrent
, а скорее его подкласс из пакета ch.maxant.async
. Во-вторых, в этом Future
есть метод map
, который следует функциональному стилю из Scala или нового класса Java 8 Stream
. Метод map
позволяет зарегистрировать обратный вызов или, точнее, сопоставить (преобразовать) значение, которое содержит первое будущее, в новое значение. Отображение выполняется в другое время в будущем, после того, как первое Future
завершено, и поэтому оно приводит к новому Future
. В-третьих, мы используем другой метод в классе Future
для регистрации обратного вызова, который будет запущен, как только все созданное нами будущее будет завершено. Ни в коем случае не используются какие-либо методы блокировки Future
API!
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
final Random random = new Random(); int numTasks = 10 ; List<Future<Integer>> futures = new ArrayList<>(); for ( int i = 0 ; i < numTasks; i++){ final int j = i; log( "adding future " + i); // PART 1 //start some work async / in the future Future<String> f = future( new Task<String>( () -> { sleep(random.nextInt( 1000 )); if (j < 5 ){ log( "working success" ); return "20" ; } else { log( "working failure" ); throw new Exception(); } })); // PART 2 //register a callback, to be called when the work is done log( "adding mapping callback to future" ); final Future<Integer> f2 = f.map( (Try<String> stringNumber) -> { return stringNumber.map( (String s) -> { log( "mapping '" + s + "' to int" ); return Integer.parseInt(s); }).recover( (Exception e) -> { log( "recovering" ); return - 10 ; }).get(); //wont throw an exception, because we provided a recovery! }); futures.add(f2); } // PART 3 log( "registering callback for final result" ); Future.registerCallback(futures, (List<Try<Integer>> results) -> { Integer finalResult = results.stream().map( (Try<Integer> t) -> { log( "mapping " + t); try { return t.get(); } catch (Exception e) { return 0 ; } }).reduce( 0 , (Integer i1, Integer i2) -> { log( "reducing " + i1 + " and " + i2); return i1 + i2; }); log( "final result is " + finalResult); Future.shutdown(); if (finalResult != 50 ){ throw new RuntimeException( "FAILED" ); } else { log( "SUCESS" ); } }); System.out.println( "Completed submitting all tasks on thread " + Thread.currentThread().getId()); //this main thread will now die, but the Future executor is still up and running. the callback will shut it down and with it, the jvm. |
Строка 11 вызывает future
метод для регистрации новой Task
, которая создается с использованием экземпляра Work
, созданного здесь с использованием лямбда-кода Java 8. Работа немного спит, а затем либо возвращает число 20 в виде строки, либо выдает исключение, просто чтобы продемонстрировать, как обрабатываются ошибки.
Используя Future
который строка 11 возвращает из службы выполнения, строка 25 отображает его значение из строки в целое число, в результате чего Future<Integer>
а не Future<String>
. Этот результат добавляется в список Future
s в строке 35, который часть 3 использует в строке 40. Метод registerCallback
гарантирует, что данный обратный вызов будет вызван после завершения последнего будущего.
Отображение в строках 25-33 выполняется с помощью лямбды, которой передается объект Try
. Try
немного похож на Java 8 Optional
и является абстракцией (суперклассом) классов Success
и Failure
, которые я реализовал на основе своих знаний об их аналогах Scala. Это позволяет программистам легче обрабатывать ошибки, чем явно проверять наличие ошибок. Моя реализация интерфейса Try
выглядит следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
|
public interface Try<T> { /** returns the value, or throws an exception if its a failure. */ T get() throws Exception; /** converts the value using the given function, resulting in a new Try */ <S> Try<S> map(Function1<T, S> func); /** can be used to handle recovery by converting the exception into a {@link Try} */ Try<T> recover(Recovery<T> r); } |
Что происходит, так это то, что реализация Success
и Failure
обрабатывает ошибки. Например, если Future
в строке 11 первого листинга завершается с исключением, тогда лямбда в строке 25 первого листинга передается объект Failure
, и вызов метода map
для Failure
абсолютно ничего не делает. Никаких исключений не возникает, ничего. Чтобы компенсировать это, вы можете вызвать метод recover
, например, в строке 29 первого списка, который позволяет обработать исключение и вернуть значение, с которым ваша программа может продолжить работу, например значение по умолчанию.
Класс Success
с другой стороны, реализует методы map
и Try
интерфейса Try
разному, так что вызов map
приводит к вызову данной функции, но вызов recover
абсолютно ничего не делает. Вместо явного кодирования блока try / catch методы map
и recover
допускают более приятный синтаксис, который легче проверять при чтении или просмотре кода (что чаще всего происходит с кодом, чем при его написании).
Поскольку методы map
и recover
обертывают результаты функций в Try
s, вы можете объединять вызовы, такие как строки 26, 29 и 32. В Try
API от Scala гораздо больше методов, чем в трех, которые я реализовал здесь. Обратите внимание, что я решил не использовать java.util.function.Function
в моем Try
API, потому что его метод apply
не вызывает throw Exception
что означало, что код, показанный в первом листинге, был не так хорош, как сейчас. Вместо этого я написал
Интерфейс Function1
.
Часть 3 головоломки состоит в том, как заставить программу сделать что-то полезное после того, как все Future
завершено, без неприятных блокирующих вызовов, подобных вызовам метода Future#get()
. Решение состоит в том, чтобы зарегистрировать обратный вызов, как показано в строке 40. Этот обратный вызов, как и все другие, показанные здесь, передается службе выполнения. Это означает, что у нас нет гарантии, какой поток его запустит, и это имеет побочный эффект, а именно то, что локальное хранилище потоков (TLS) больше не работает — некоторые фреймворки, такие как (более старые версии?) Hibernate, полагались на TLS, и они просто победили ». здесь не работает У Scala есть хороший способ решения этой проблемы с помощью implicit
ключевого слова, которого у Java нет (пока …?), Поэтому необходимо использовать какой-то другой механизм. Я упоминаю об этом, просто чтобы вы знали об этом.
Таким образом, когда завершается последнее будущее, вызываются строки 40-60 и передается List
Try
содержащий Integer
, а не Future
. Метод registerCallback
преобразует фьючерсы в соответствующие Success
или Failure
. Но как мы можем превратить их во что-то полезное? Разумеется, с помощью простого сопоставления / сокращения, к счастью, Java 8 теперь поддерживает это с классом Stream
, который создается из коллекции Try
в строке 42 путем вызова метода stream()
. Сначала я отображаю (конвертирую) значения Try
в их значения, а затем сокращаю поток до одного значения в строке 49. Вместо того, чтобы передавать собственную реализацию лямбда-выражения, суммирующего значения, я мог бы использовать
Integer::sum
, например someStream.reduce(0, Integer::sum)
.
В последний раз, когда я запускал программу, она выводила следующее:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
Thread- 1 says: adding future 0 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 1 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 2 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 3 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 4 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 5 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 6 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 7 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 8 Thread- 1 says: adding mapping callback to future Thread- 1 says: adding future 9 Thread- 1 says: adding mapping callback to future Thread- 1 says: registering callback for final result Thread- 10 says: working success Completed submitting all tasks on thread 1 Thread- 14 says: working success Thread- 10 says: working failure Thread- 14 says: working failure Thread- 12 says: working success Thread- 10 says: working failure Thread- 10 says: mapping '20' to int Thread- 10 says: mapping '20' to int Thread- 10 says: recovering Thread- 10 says: recovering Thread- 10 says: mapping '20' to int Thread- 10 says: recovering Thread- 11 says: working success Thread- 11 says: mapping '20' to int Thread- 13 says: working success Thread- 10 says: mapping '20' to int Thread- 12 says: working failure Thread- 12 says: recovering Thread- 14 says: working failure Thread- 14 says: recovering Thread- 14 says: mapping Success( 20 ) Thread- 14 says: mapping Success( 20 ) Thread- 14 says: mapping Success( 20 ) Thread- 14 says: mapping Success( 20 ) Thread- 14 says: mapping Success( 20 ) Thread- 14 says: mapping Success(- 10 ) Thread- 14 says: mapping Success(- 10 ) Thread- 14 says: mapping Success(- 10 ) Thread- 14 says: mapping Success(- 10 ) Thread- 14 says: mapping Success(- 10 ) Thread- 14 says: final result is 50 Thread- 14 says: SUCESS |
Как видите, основной поток добавляет все задачи и регистрирует все функции отображения (строки 1-20). Затем он регистрирует обратный вызов (строка 21 вывода, которая соответствует строке 39 листинга) и, наконец, выводит текст из строки 63 в листинге, после чего он умирает, потому что ему больше нечего делать. Строки 22 и строки 24-42 выходных данных затем показывают различные потоки в пуле (который содержал 5 потоков), обрабатывающие работу, а также отображающие из String в Integer или восстанавливающие после исключения. Это код в частях 1 и 2 первого списка. Вы можете видеть, что он полностью асинхронный, с некоторыми сопоставлениями / восстановлениями, происходящими до того, как вся начальная работа завершена (сравните строки 38 или 40, которые являются отображением и восстановлением соответственно, со строкой 41 выходных данных, которая происходит позже и является последней начальной работы). Строки 43-52 представляют собой вывод карты / сокращения, который является частью 3 основного списка. Обратите внимание, что в журнале нет сокращения, потому что код, который я запускал и который находится на Github, использует упомянутый выше ярлык Integer::sum
, а не строки 50-51 первого списка, показанного выше.
Хотя все это возможно с использованием Java 6 (или даже 5?), Например, путем получения задач, которые отправляются в пул, для отправки обратного вызова самостоятельно, после их завершения объем кода, необходимый для этого, увеличивается и сам код будет хуже, чем показано здесь. Java 8 лямбда, Future
которые могут отображаться с помощью обратных вызовов, и Try
API с его аккуратной обработкой ошибок — все это делает решение, показанное здесь, возможно, более легким в обслуживании.
Приведенный выше код, а также код для классов в пакете ch.maxant.async
доступны под лицензией Apache License версии 2.0 и могут быть загружены из моей учетной записи Github .