В предыдущих частях этого поста мы обсуждали фьючерсы и обещания . В этой последней части мы создадим Futures, используя его мощные комбинаторы.
Составление фьючерсов:
В первом посте мы увидели, как извлечь значение из Future
используя onComplete
, foreach
и в Await.result
используя Await.result
. Извлечение значения из одного Future
— это хорошо, но часто мы порождаем более одной асинхронной операции и ожидаем, пока несколько Futures достигнут окончательного результата. Еще лучше, иногда результат одного Future
будет подан в другое или цепочку фьючерсов.
Future
— это Монада . (Мне жаль, что я сбросил М-бомбу здесь, и я попытаюсь объяснить свое понимание того, что такое Monoid
, Functor
, Monad
и Applicative
позже) . Но пока давайте жить с этим грубым объяснением:
-
Future
— это контейнер значений (значений) некоторого типа (то есть он принимает тип в качестве аргумента и не может существовать без него). У вас может бытьFuture[Int]
илиFuture[String]
илиFuture[AwesomeClass]
— вы не можете просто иметь простоеFuture
. Причудливый термин для этого — конструктор типов . Для сравнения,List
— это конструктор типов (а также Monad). Список — это контейнер значений типаInt
,String
или любого другого типа. Список / Будущее без отдельного типа не существует. -
Future
есть функцииflatMap
иunit
(и, соответственно, функцияmap
).
Причина, по которой я это поднял, заключается в том, что вместо использования обратного вызова onComplete
или foreach
, мы могли бы просто map
или flatMap
результат Future точно так же, как мы сделали бы это для Option
или List
.
Теперь давайте посмотрим на map
и комбинаторы flatMap
.
Отображение фьючерсов, которые выполняются последовательно
Давайте рассмотрим эту простую задачу, которая добавляет три числа, которые вычисляются асинхронно через некоторый интервал.
Предупреждение: следующий код является грязным и выполняет Futures последовательно
Код
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class FutureCombinators { def sumOfThreeNumbersSequentialMap(): Future[Int] = { Future { Thread.sleep( 1000 ) 1 }.flatMap { oneValue => Future { Thread.sleep( 2000 ) 2 }.flatMap { twoValue => Future { Thread.sleep( 3000 ) 3 }.map { thirdValue => oneValue + twoValue + thirdValue } } } } ... ... |
Первое Будущее возвращает 1 через 1 секунду, второе Будущее возвращает 2 через 2 секунды, а третье Будущее возвращает 3 через 3 секунды. Вложенный блок, наконец, вычисляет сумму трех значений и возвращает одно Future[Int]
.
Прецедент
Для вычисления времени, необходимого для вычисления значений, у нас есть небольшая служебная функция (внутри черты ConcurrentUtils
), называемая timed, которая вычисляет и печатает время, затраченное блоком.
Мы Await.result
что в результате блокировки futureCombinators.sumOfThreeNumbersSequentialMap
результатов будет futureCombinators.sumOfThreeNumbersSequentialMap
результат futureCombinators.sumOfThreeNumbersSequentialMap
. Мы также рассчитываем время выполнения и печатаем его.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class FutureCombinatorsTest extends FunSpec with Matchers with ConcurrentUtils { describe( "Futures" ) { it( "could be composed using map" ) { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialMap(), 7 seconds)) result shouldBe 6 } } ... ... } trait ConcurrentUtils { def timed[T](block: => T): T = { val start = System.currentTimeMillis() val result = block val duration = System.currentTimeMillis() - start println(s "Time taken : $duration" ) result } } |
Выход
1
|
Time taken : 6049 |
Выполнение функции заняло чуть более 6 секунд, что указывает на то, что фьючерсы выполняются последовательно.
Использование для понимания синтаксического сахара вместо Map
Scala предоставляет отличный способ работы с классами, которые имеют map
и flatMap
( flatMap
) — для понимания. Для понимания это просто синтаксический сахар, который flatMap
в flatMap
и map
.
Следующий код означает то же самое, что и выше, за исключением того, что расширение выполняется компилятором Scala.
Код
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
def sumOfThreeNumbersSequentialForComprehension(): Future[Int] = { for { localOne <- Future { Thread.sleep( 1000 ) 1 } localTwo <- Future { Thread.sleep( 2000 ) 2 } localThree <- Future { Thread.sleep( 3000 ) 3 } } yield localOne + localTwo + localThree } |
Прецедент
Это так же, как и выше.
1
2
3
4
5
|
it( "could be composed using for comprehensions" ) { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialForComprehension(), 7 seconds)) result shouldBe 6 } |
Выход
1
|
Time taken : 6012 |
Выполнение фьючерсов параллельно
Как мы видели, предыдущий блок кода запускает три Futures последовательно и, следовательно, для завершения вычисления требуется всего 6 секунд. Это не хорошо Наши фьючерсы должны работать параллельно. Чтобы добиться этого, все, что нам нужно сделать, это извлечь блок Future и объявить их отдельно.
Код
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
val oneFuture: Future[Int] = Future { Thread.sleep( 1000 ) 1 } val twoFuture: Future[Int] = Future { Thread.sleep( 2000 ) 2 } val threeFuture: Future[Int] = Future { Thread.sleep( 3000 ) 3 } |
Теперь давайте воспользуемся формой для понимания значения.
1
2
3
4
5
|
def sumOfThreeNumbersParallelMapForComprehension(): Future[Int] = for { oneValue <- oneFuture twoValue <- twoFuture threeValue <- threeFuture } yield oneValue + twoValue + threeValue |
Прецедент
Давайте рассчитаем время вычисления и подтвердим правильное значение, используя следующий тестовый пример.
1
2
3
4
5
6
7
|
describe( "Futures that are executed in parallel" ) { it( "could be composed using for comprehensions" ) { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallel(), 4 seconds)) result shouldBe 6 } } |
Выход
1
|
Time taken : 3005 |
Как мы видим, эта sumOfThreeNumbersParallel
занимает почти sumOfThreeNumbersParallel
же времени, сколько самое длинное будущее ( threeFuture
), что составляет 3 секунды.
Просто для сравнения, приведенный выше код может быть написан без использования для понимания как:
1
2
3
4
5
6
7
|
def sumOfThreeNumbersParallelMap(): Future[Int] = oneFuture.flatMap { oneValue => twoFuture.flatMap { twoValue => threeFuture.map { threeValue => oneValue + twoValue + threeValue } } } |
Охранники в зачете
Точно так же, как мы добавили защищенное условие if
в for-comp понять для List
и других коллекций (также называемых другими монадическими типами), мы могли бы также добавить охрану против генераторов Future
. Следующее, if
guard проверяет, является ли значение, возвращаемое twoFuture
, больше 1, что и есть.
1
2
3
4
5
|
def sumOfThreeNumbersParallelWithGuard(): Future[Int] = for { oneValue <- oneFuture twoValue <- twoFuture if twoValue > 1 threeValue <- threeFuture } yield oneValue + twoValue + threeValue |
Эти охранники withFilter
сахара, как и в случае с withFilter
(я на 90% уверен, что никто не хочет писать так):
1
2
3
4
5
6
7
|
def sumOfThreeNumbersMapAndFlatMapWithFilter(): Future[Int] = oneFuture.flatMap { oneValue => twoFuture.withFilter(_ > 1 ).flatMap { twoValue => threeFuture.map { threeValue => oneValue + twoValue + threeValue } } } |
Охранники в тюрьмах — дело об отказе
Если охранник оценивает как ложное, таким образом, генератор, NoSuchElementException
ошибку, NoSuchElementException
. Давайте изменим условие охраны, чтобы оценить как ложное.
1
2
3
4
5
|
def sumOfThreeNumbersParallelWithGuardAndFailure(): Future[Int] = for { oneValue <- oneFuture twoValue <- twoFuture if twoValue > 2 threeValue <- threeFuture } yield oneValue + twoValue + threeValue |
Выход
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
Future.filter predicate is not satisfied java.util.NoSuchElementException: Future.filter predicate is not satisfied at scala.concurrent.Future$$anonfun$filter$ 1 .apply(Future.scala: 280 ) at scala.util.Success$$anonfun$map$ 1 .apply(Try.scala: 237 ) at scala.util.Try$.apply(Try.scala: 192 ) at scala.util.Success.map(Try.scala: 237 ) at scala.concurrent.Future$$anonfun$map$ 1 .apply(Future.scala: 237 ) at scala.concurrent.Future$$anonfun$map$ 1 .apply(Future.scala: 237 ) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala: 32 ) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala: 121 ) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java: 260 ) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java: 1253 ) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java: 1346 ) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java: 1979 ) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java: 107 ) |
Обработка исключений
Как и NoSuchElementException
создаваемое защитником, код, выполняемый асинхронно внутри Future
может NoSuchElementException
различные исключения. Хотя можно утверждать, что исключения не очень похожи на FP, есть вероятность, что с распределенным приложением или использованием библиотек Java внутри вашего Future
исключения случаются.
Код
Обе две функции ниже NoSuchElementException
исключения — первая выдает NoSuchElementException
а вторая — LegacyException
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
//NoSuchElementException def throwsNoSuchElementIfGuardFails(): Future[Int] = for { oneValue <- oneFuture twoValue <- twoFuture if twoValue > 2 threeValue <- threeFuture } yield oneValue + twoValue + threeValue //LegacyException val futureCallingLegacyCode: Future[Int] = Future { Thread.sleep( 1000 ) throw new LegacyException( "Danger! Danger!" ) } def throwsExceptionFromComputation(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureCallingLegacyCode } yield oneValue + futureThrowingException case class LegacyException(msg: String) extends Exception(msg) |
Testcases
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
describe( "Futures that throw exception" ) { it( "could blow up on the caller code when guard fails" ) { val futureCombinators = new FutureCombinators intercept[NoSuchElementException] { val result = timed(Await.result(futureCombinators.throwsNoSuchElementIfGuardFails(), 4 seconds)) } } it( "could blow up on the caller code when exception comes from a computation executed inside the Future" ) { val futureCombinators = new FutureCombinators intercept[LegacyException] { val result = timed(Await.result(futureCombinators.throwsExceptionFromComputation(), 4 seconds)) } } ... ... |
Обратите внимание, что даже если одно из Futures приведет к исключению, весь результат составного вычисления приведет к распространению исключения.
Восстановление из исключения:
Использование recover
Если Future scala.util.control.NonFatal
и мы хотим иметь запасное значение по умолчанию вместо того, чтобы scala.util.control.NonFatal
ошибку вызывающей стороне, мы могли бы использовать функцию recover
. recover
очень похоже на ловушку.
Давайте throwsExceptionFromComputation
вышеупомянутую функцию throwsExceptionFromComputation
которая выбрасывает LegacyException
. Функция recover
принимает функцию PartialFunction
которая сопоставляется с Throwable
типу, который переносит Future
.
Код
В приведенном ниже коде, если futureCallingLegacyCode
выдает Exception
(что он и делает), значение, которое является результатом этого вычисления, устанавливается равным 200 . Если бы он не выдал исключение, результирующее значение будет результатом самого вычисления.
1
2
3
4
5
6
7
8
|
val futureCallingLegacyCodeWithRecover: Future[Int] = futureCallingLegacyCode.recover { case LegacyException(msg) => 200 } def recoversFromExceptionUsingRecover(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureCallingLegacyCodeWithRecover } yield oneValue + futureThrowingException |
Повторим, если исходное Future
дает успешное значение, блок recover
никогда не выполняется. Кроме того, если PartialFunction
внутри функции recover
не обрабатывает исходное исключение, исключение PartialFunction
вызывающей стороне.
Прецедент
oneFuture
утверждает, что результатом вычисления является сумма значений, возвращаемых oneFuture
(который равен 1) и futureCallingLegacyCodeWithRecover
(который равен 200).
1
2
3
4
5
|
it( "could be recovered with a recovery value" ) { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecover(), 2 seconds)) result shouldBe 201 } |
Выход
1
|
Time taken : 1004 |
Использование recoverWith
Вместо восстановления со значением, когда Future
приводит к Exception
, мы можем захотеть восстановить с помощью некоторого другого Future в некоторых обстоятельствах. Скажем, недоступность HTTP-вызова к Server1 из-за сбоя в сети может быть восстановлена с помощью HTTP-вызова к другой службе, работающей на Server2.
Подобно recover
, recoverWith
принимает функцию PartialFunction
. Однако PartialFunction
отображает Throwable
в Future
того же типа, что и исходное Future
.
Как и в случае recover
, если основное Future
для которого recoverWith
не выполняется, вызывается то Future
, которое отображается в PartialFunction
. Если второе будущее приводит к успешному значению, то возвращается новый результат.
Код
01
02
03
04
05
06
07
08
09
10
11
|
val futureCallingLegacyCodeWithRecoverWith: Future[Int] = futureCallingLegacyCode.recoverWith { case LegacyException(msg) => println( "Exception occurred. Recovering with a Future that wraps 1000" ) Thread.sleep( 2000 ) Future( 1000 ) } def recoversFromExceptionUsingRecoverWith(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureCallingLegacyCodeWithRecoverWith } yield oneValue + futureThrowingException |
Прецедент
oneFuture
занимает 1 секунду, а восстановление Future занимает 2 секунды. Итак, мы установили тайм-аут Await.result
на 4 секунды. Конечный результат 1001 является суммой результата oneFuture
и futureCallingLegacyCodeWithRecoverWith
.
1
2
3
4
5
|
it( "could be recovered with a recovery Future" ) { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWith(), 4 seconds)) result shouldBe 1001 } |
Выход
1
|
Time taken : 3006 |
Обратите внимание, что так же, как и recover
, если второе будущее также не удается, то ошибка, выданная вторым будущим, передается вызывающей стороне.
Код
В следующем коде мы создаем еще одно Future
которое выдает Exception
с сообщением Dieded !! и мы восстанавливаем первое будущее с помощью этого будущего. Тестовый сценарий показал бы, что исключение из второго будущего (восстановление одного) возвращается вызывающей стороне.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
val anotherErrorThrowingFuture: Future[Int] = Future { Thread.sleep( 1000 ) throw new LegacyException( "Dieded!!" ) } val futureRecoveringWithAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.recoverWith { case LegacyException(msg) => anotherErrorThrowingFuture } def recoversFromExceptionUsingRecoverWithThatFails(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureRecoveringWithAnotherErrorThrowingFuture } yield oneValue + futureThrowingException |
Прецедент
1
2
3
4
5
6
7
|
it( "when recovered with another Future that throws Exception would throw the error from the second Future" ) { val futureCombinators = new FutureCombinators val exception = intercept[LegacyException] { timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWithThatFails(), 4 seconds)) } exception.msg shouldBe "Dieded!!" } |
Использование fallbackTo:
fallbackTo
работает так же, как recoverWith
когда дело доходит до успешного значения. Он использует значение первого Future, если он успешен или возвращается к значению второго Future. Однако, если и первое, и второе Future терпят неудачу, то ошибка, которая передается вызывающей стороне, является ошибкой первого Future, а не второго Future.
Код
Давайте использовать те же Futures, которые мы использовали в recoverWith
.
1
2
3
4
5
6
|
val futureFallingBackToAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.fallbackTo (anotherErrorThrowingFuture) def recoversFromExceptionUsingFallbackTo(): Future[Int] = for { oneValue <- oneFuture futureThrowingException <- futureFallingBackToAnotherErrorThrowingFuture } yield oneValue + futureThrowingException |
Обратите внимание, что функция fallbackTo
просто принимает другое Future
а не функцию PartialFunction
как recoverWith
.
Прецедент
1
2
3
4
5
6
7
|
it( "when fallen back to another Future that throws Exception would throw the error from the first Future" ) { val futureCombinators = new FutureCombinators val exception = intercept[LegacyException] { timed(Await.result(futureCombinators.recoversFromExceptionUsingFallbackTo(), 4 seconds)) } exception.msg shouldBe "Danger! Danger!" } |
Другие интересные и полезные комбинаторы
Ниже приведен супер-краткий список других комбинаторов Future, которые я считаю очень полезными.
застежка-молния
zip
работает так же, как List.zip
. Он просто объединяет два фьючерса и дает Future
Tuple
.
1
|
def zipTwoFutures:Future[(Int,Int)]=oneFuture zip twoFuture |
firstCompletedOf
Ах! firstCompletedOf
очень удобен, когда у вас есть два эквивалентных сервиса, и вы хотите продолжить, как только самый быстрый сервис вернет значение.
1
2
|
val listOfFutures=List(oneFuture,twoFuture,threeFuture) def getFirstResult():Future[Int]=Future.firstCompletedOf(listOfFutures) |
В приведенном выше случае oneFuture
возвращает самый быстрый.
последовательность
sequence
чистая магия. Скажем, у вас есть List[Future[Int]]
точно так же, как List(oneFuture,twoFuture,threeFuture)
и вы требуете, чтобы все значения были возвращены вам в виде List[Int]
вместо каждого Int
заключенного в Future
. sequence
берет ваш List[Future[Int]]
и превращается в Future[List[Int]]
1
|
def getResultsAsList():Future[List[Int]]=Future.sequence(listOfFutures) |
В последний раз я использовал для пакетной обработки, где я выполнял логику параллельно с кусками данных и комбинировал их вместе с sequence
.
Scala-асинхронная библиотека
Библиотека Scala Async является внешним проектом и может быть добавлена в проект путем добавления зависимости в наш build.sbt.
1
|
"org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2" |
Библиотека Async имеет всего две мощные функции в своем классе scala.async.Async
— async
и await
.
асинхронной
async
функция очень похожа на функцию Future.apply
. На самом деле их подписи очень похожи, и мы могли бы с комфортом заменить Future.apply
async
где бы он ни был доступен.
Future.apply
1
|
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] |
асинхронный
1
|
def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T] |
Основным преимуществом использования async
над Future.apply
, помимо общей простоты читабельности, является то, что для каждого генератора Future (при использовании с для понимания) компилятор выдает отдельный анонимный класс, в то время как с async это всего один аноним класс.
Поэтому мы могли бы переписать наше oneFuture
как
1
2
3
4
|
val oneFuture: Future[Int] = async { Thread.sleep( 1000 ) 1 } |
Ждите
Функция await
принимает Future
и возвращает результат. Но разве это не то же самое, что Await.result
который принимает Future
и также возвращает результат? Нет. Ключевое отличие заключается в том, что Await.result
блокируется и настоятельно не рекомендуется использовать в Await.result
коде, за исключением тестовых случаев. Функция await
, с другой стороны, реализована с использованием макросов Scala, и реализация состоит в том, что она возвращает результат Future, используя обратный вызов onComplete
.
Поскольку async
функция возвращает Future
, все другие механизмы обработки ошибок и восстановления остаются такими же, как и раньше.
Код
Давайте перепишем предыдущую сумму трех чисел с помощью async / await:
1
2
3
|
def sumOfThreeNumbersParallelWithAsyncAwait(): Future[Int] = async { await(oneFuture) + await(twoFuture) + await(threeFuture) } |
Прецедент
1
2
3
4
5
|
it( "could be composed using async/await" ) { val futureCombinators = new FutureCombinators val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallelWithAsyncAwait(), 4 seconds)) result shouldBe 6 } |
Как видим, код, написанный таким образом, не только асинхронный, но и выглядит естественным (на самом деле он выглядит синхронно). Мы можем утверждать, что for- flatMap
— огромный скачок от использования map
и flatMap
но async/await
делает еще один большой шаг вперед.
Код
Код и соответствующий ему тестовый пример находятся на github.
Ссылка: | Примечания Scala — Futures — 3 (Combinators and Async) от нашего партнера по JCG Аруна Маниваннана в блоге Rerun.me . |