Статьи

Scala notes — Фьючерсы — 3 (Комбинаторы и Async)

В предыдущих частях этого поста мы обсуждали фьючерсы и обещания . В этой последней части мы создадим Futures, используя его мощные комбинаторы.

Составление фьючерсов:

В первом посте мы увидели, как извлечь значение из Future используя onComplete , foreach и в Await.result используя Await.result . Извлечение значения из одного Future — это хорошо, но часто мы порождаем более одной асинхронной операции и ожидаем, пока несколько Futures достигнут окончательного результата. Еще лучше, иногда результат одного Future будет подан в другое или цепочку фьючерсов.

Future — это Монада . (Мне жаль, что я сбросил М-бомбу здесь, и я попытаюсь объяснить свое понимание того, что такое Monoid , Functor , Monad и Applicative позже) . Но пока давайте жить с этим грубым объяснением:

  1. Future — это контейнер значений (значений) некоторого типа (то есть он принимает тип в качестве аргумента и не может существовать без него). У вас может быть Future[Int] или Future[String] или Future[AwesomeClass] — вы не можете просто иметь простое Future . Причудливый термин для этого — конструктор типов . Для сравнения, List — это конструктор типов (а также Monad). Список — это контейнер значений типа Int , String или любого другого типа. Список / Будущее без отдельного типа не существует.
  2. Future есть функции flatMap и unit (и, соответственно, функция map ).

Причина, по которой я это поднял, заключается в том, что вместо использования обратного вызова onComplete или foreach , мы могли бы просто map или flatMap результат Future точно так же, как мы сделали бы это для Option или List .

Теперь давайте посмотрим на map и комбинаторы flatMap .

Отображение фьючерсов, которые выполняются последовательно

Давайте рассмотрим эту простую задачу, которая добавляет три числа, которые вычисляются асинхронно через некоторый интервал.

Предупреждение: следующий код является грязным и выполняет Futures последовательно

Код

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
class FutureCombinators {
 
  def sumOfThreeNumbersSequentialMap(): Future[Int] = {
    Future {
      Thread.sleep(1000)
      1
    }.flatMap { oneValue =>
      Future {
        Thread.sleep(2000)
        2
      }.flatMap { twoValue =>
        Future {
          Thread.sleep(3000)
          3
        }.map { thirdValue =>
          oneValue + twoValue + thirdValue
        }
      }
    }
  }
...
...

Первое Будущее возвращает 1 через 1 секунду, второе Будущее возвращает 2 через 2 секунды, а третье Будущее возвращает 3 через 3 секунды. Вложенный блок, наконец, вычисляет сумму трех значений и возвращает одно Future[Int] .

Прецедент

Для вычисления времени, необходимого для вычисления значений, у нас есть небольшая служебная функция (внутри черты ConcurrentUtils ), называемая timed, которая вычисляет и печатает время, затраченное блоком.

Мы Await.result что в результате блокировки futureCombinators.sumOfThreeNumbersSequentialMap результатов будет futureCombinators.sumOfThreeNumbersSequentialMap результат futureCombinators.sumOfThreeNumbersSequentialMap . Мы также рассчитываем время выполнения и печатаем его.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
class FutureCombinatorsTest extends FunSpec with Matchers with ConcurrentUtils {
 
  describe("Futures") {
    it("could be composed using map") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialMap(), 7 seconds))
      result shouldBe 6
    }
  }
...
...
}
 
trait ConcurrentUtils { 
  def timed[T](block: => T): T = {
    val start = System.currentTimeMillis()
    val result = block
    val duration = System.currentTimeMillis() - start
    println(s"Time taken : $duration")
    result
  }
}

Выход

1
Time taken : 6049

Выполнение функции заняло чуть более 6 секунд, что указывает на то, что фьючерсы выполняются последовательно.

Использование для понимания синтаксического сахара вместо Map

Scala предоставляет отличный способ работы с классами, которые имеют map и flatMap ( flatMap ) — для понимания. Для понимания это просто синтаксический сахар, который flatMap в flatMap и map .

Следующий код означает то же самое, что и выше, за исключением того, что расширение выполняется компилятором Scala.

Код

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
def sumOfThreeNumbersSequentialForComprehension(): Future[Int] = {
    for {
      localOne <- Future {
        Thread.sleep(1000)
        1
      }
      localTwo <- Future {
        Thread.sleep(2000)
        2
      }
      localThree <- Future {
        Thread.sleep(3000)
        3
      }
    } yield localOne + localTwo + localThree
  }

Прецедент

Это так же, как и выше.

1
2
3
4
5
it("could be composed using for comprehensions") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersSequentialForComprehension(), 7 seconds))
      result shouldBe 6
    }

Выход

1
Time taken : 6012

Выполнение фьючерсов параллельно

Как мы видели, предыдущий блок кода запускает три Futures последовательно и, следовательно, для завершения вычисления требуется всего 6 секунд. Это не хорошо Наши фьючерсы должны работать параллельно. Чтобы добиться этого, все, что нам нужно сделать, это извлечь блок Future и объявить их отдельно.

Код

01
02
03
04
05
06
07
08
09
10
11
12
13
14
val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
  }
 
  val twoFuture: Future[Int] = Future {
    Thread.sleep(2000)
    2
  }
 
  val threeFuture: Future[Int] = Future {
    Thread.sleep(3000)
    3
  }

Теперь давайте воспользуемся формой для понимания значения.

1
2
3
4
5
def sumOfThreeNumbersParallelMapForComprehension(): Future[Int] = for
    oneValue <- oneFuture
    twoValue <- twoFuture
    threeValue <- threeFuture
} yield oneValue + twoValue + threeValue

Прецедент

Давайте рассчитаем время вычисления и подтвердим правильное значение, используя следующий тестовый пример.

1
2
3
4
5
6
7
describe("Futures that are executed in parallel") {
    it("could be composed using for comprehensions") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallel(), 4 seconds))
      result shouldBe 6
    }
  }

Выход

1
Time taken : 3005

Как мы видим, эта sumOfThreeNumbersParallel занимает почти sumOfThreeNumbersParallel же времени, сколько самое длинное будущее ( threeFuture ), что составляет 3 секунды.

Просто для сравнения, приведенный выше код может быть написан без использования для понимания как:

1
2
3
4
5
6
7
def sumOfThreeNumbersParallelMap(): Future[Int] = oneFuture.flatMap { oneValue => 
    twoFuture.flatMap { twoValue =>
      threeFuture.map { threeValue =>
        oneValue + twoValue + threeValue
      }
    }
}

Охранники в зачете

Точно так же, как мы добавили защищенное условие if в for-comp понять для List и других коллекций (также называемых другими монадическими типами), мы могли бы также добавить охрану против генераторов Future . Следующее, if guard проверяет, является ли значение, возвращаемое twoFuture , больше 1, что и есть.

1
2
3
4
5
def sumOfThreeNumbersParallelWithGuard(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 1
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

Эти охранники withFilter сахара, как и в случае с withFilter (я на 90% уверен, что никто не хочет писать так):

1
2
3
4
5
6
7
def sumOfThreeNumbersMapAndFlatMapWithFilter(): Future[Int] = oneFuture.flatMap { oneValue => 
    twoFuture.withFilter(_ > 1).flatMap { twoValue =>
      threeFuture.map { threeValue =>
        oneValue + twoValue + threeValue
      }
    }
  }

Охранники в тюрьмах — дело об отказе

Если охранник оценивает как ложное, таким образом, генератор, NoSuchElementException ошибку, NoSuchElementException . Давайте изменим условие охраны, чтобы оценить как ложное.

1
2
3
4
5
def sumOfThreeNumbersParallelWithGuardAndFailure(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 2
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue

Выход

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
Future.filter predicate is not satisfied 
java.util.NoSuchElementException: Future.filter predicate is not satisfied 
    at scala.concurrent.Future$$anonfun$filter$1.apply(Future.scala:280)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Success.map(Try.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Обработка исключений

Как и NoSuchElementException создаваемое защитником, код, выполняемый асинхронно внутри Future может NoSuchElementException различные исключения. Хотя можно утверждать, что исключения не очень похожи на FP, есть вероятность, что с распределенным приложением или использованием библиотек Java внутри вашего Future исключения случаются.

Код

Обе две функции ниже NoSuchElementException исключения — первая выдает NoSuchElementException а вторая — LegacyException .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
//NoSuchElementException
  def throwsNoSuchElementIfGuardFails(): Future[Int] = for {
    oneValue <- oneFuture
    twoValue <- twoFuture if twoValue > 2
    threeValue <- threeFuture
  } yield oneValue + twoValue + threeValue
 
  //LegacyException
 
  val futureCallingLegacyCode: Future[Int] = Future {
    Thread.sleep(1000)
    throw new LegacyException("Danger! Danger!")
  }
 
  def throwsExceptionFromComputation(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCode
  } yield oneValue + futureThrowingException
 
case class LegacyException(msg: String) extends Exception(msg)

Testcases

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
describe("Futures that throw exception") {
    it("could blow up on the caller code when guard fails") {
      val futureCombinators = new FutureCombinators
      intercept[NoSuchElementException] {
        val result = timed(Await.result(futureCombinators.throwsNoSuchElementIfGuardFails(), 4 seconds))
      }
    }
 
    it("could blow up on the caller code when exception comes from a computation executed inside the Future") {
      val futureCombinators = new FutureCombinators
      intercept[LegacyException] {
        val result = timed(Await.result(futureCombinators.throwsExceptionFromComputation(), 4 seconds))
      }
    }
...
...

Обратите внимание, что даже если одно из Futures приведет к исключению, весь результат составного вычисления приведет к распространению исключения.

Восстановление из исключения:

Использование recover

Если Future scala.util.control.NonFatal и мы хотим иметь запасное значение по умолчанию вместо того, чтобы scala.util.control.NonFatal ошибку вызывающей стороне, мы могли бы использовать функцию recover . recover очень похоже на ловушку.

Давайте throwsExceptionFromComputation вышеупомянутую функцию throwsExceptionFromComputation которая выбрасывает LegacyException . Функция recover принимает функцию PartialFunction которая сопоставляется с Throwable типу, который переносит Future .

Код

В приведенном ниже коде, если futureCallingLegacyCode выдает Exception (что он и делает), значение, которое является результатом этого вычисления, устанавливается равным 200 . Если бы он не выдал исключение, результирующее значение будет результатом самого вычисления.

1
2
3
4
5
6
7
8
val futureCallingLegacyCodeWithRecover: Future[Int] = futureCallingLegacyCode.recover {
    case LegacyException(msg) => 200
  }
 
  def recoversFromExceptionUsingRecover(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCodeWithRecover
  } yield oneValue + futureThrowingException

Повторим, если исходное Future дает успешное значение, блок recover никогда не выполняется. Кроме того, если PartialFunction внутри функции recover не обрабатывает исходное исключение, исключение PartialFunction вызывающей стороне.

Прецедент

oneFuture утверждает, что результатом вычисления является сумма значений, возвращаемых oneFuture (который равен 1) и futureCallingLegacyCodeWithRecover (который равен 200).

1
2
3
4
5
it("could be recovered with a recovery value") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecover(), 2 seconds))
      result shouldBe 201
    }

Выход

1
Time taken : 1004

Использование recoverWith

Вместо восстановления со значением, когда Future приводит к Exception , мы можем захотеть восстановить с помощью некоторого другого Future в некоторых обстоятельствах. Скажем, недоступность HTTP-вызова к Server1 из-за сбоя в сети может быть восстановлена ​​с помощью HTTP-вызова к другой службе, работающей на Server2.

Подобно recover , recoverWith принимает функцию PartialFunction . Однако PartialFunction отображает Throwable в Future того же типа, что и исходное Future .

Как и в случае recover , если основное Future для которого recoverWith не выполняется, вызывается то Future , которое отображается в PartialFunction . Если второе будущее приводит к успешному значению, то возвращается новый результат.

Код

01
02
03
04
05
06
07
08
09
10
11
val futureCallingLegacyCodeWithRecoverWith: Future[Int] = futureCallingLegacyCode.recoverWith {
    case LegacyException(msg) =>
      println("Exception occurred. Recovering with a Future that wraps 1000")
      Thread.sleep(2000)
      Future(1000)
  }
 
  def recoversFromExceptionUsingRecoverWith(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureCallingLegacyCodeWithRecoverWith
  } yield oneValue + futureThrowingException

Прецедент

oneFuture занимает 1 секунду, а восстановление Future занимает 2 секунды. Итак, мы установили тайм-аут Await.result на 4 секунды. Конечный результат 1001 является суммой результата oneFuture и futureCallingLegacyCodeWithRecoverWith .

1
2
3
4
5
it("could be recovered with a recovery Future") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWith(), 4 seconds))
      result shouldBe 1001
    }

Выход

1
Time taken : 3006

Обратите внимание, что так же, как и recover , если второе будущее также не удается, то ошибка, выданная вторым будущим, передается вызывающей стороне.

Код

В следующем коде мы создаем еще одно Future которое выдает Exception с сообщением Dieded !! и мы восстанавливаем первое будущее с помощью этого будущего. Тестовый сценарий показал бы, что исключение из второго будущего (восстановление одного) возвращается вызывающей стороне.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
val anotherErrorThrowingFuture: Future[Int] = Future {
    Thread.sleep(1000)
    throw new LegacyException("Dieded!!")
  }
 
  val futureRecoveringWithAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.recoverWith {
    case LegacyException(msg) =>
      anotherErrorThrowingFuture
  }
 
  def recoversFromExceptionUsingRecoverWithThatFails(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureRecoveringWithAnotherErrorThrowingFuture
  } yield oneValue + futureThrowingException

Прецедент

1
2
3
4
5
6
7
it("when recovered with another Future that throws Exception would throw the error from the second Future") {
      val futureCombinators = new FutureCombinators
      val exception = intercept[LegacyException] {
        timed(Await.result(futureCombinators.recoversFromExceptionUsingRecoverWithThatFails(), 4 seconds))
      }
      exception.msg shouldBe "Dieded!!"
    }

Использование fallbackTo:

fallbackTo работает так же, как recoverWith когда дело доходит до успешного значения. Он использует значение первого Future, если он успешен или возвращается к значению второго Future. Однако, если и первое, и второе Future терпят неудачу, то ошибка, которая передается вызывающей стороне, является ошибкой первого Future, а не второго Future.

Код

Давайте использовать те же Futures, которые мы использовали в recoverWith .

1
2
3
4
5
6
val futureFallingBackToAnotherErrorThrowingFuture: Future[Int] = futureCallingLegacyCode.fallbackTo (anotherErrorThrowingFuture)
 
  def recoversFromExceptionUsingFallbackTo(): Future[Int] = for {
    oneValue <- oneFuture
    futureThrowingException <- futureFallingBackToAnotherErrorThrowingFuture
  } yield oneValue + futureThrowingException

Обратите внимание, что функция fallbackTo просто принимает другое Future а не функцию PartialFunction как recoverWith .

Прецедент

1
2
3
4
5
6
7
it("when fallen back to another Future that throws Exception would throw the error from the first Future") {
      val futureCombinators = new FutureCombinators
      val exception = intercept[LegacyException] {
        timed(Await.result(futureCombinators.recoversFromExceptionUsingFallbackTo(), 4 seconds))
      }
      exception.msg shouldBe "Danger! Danger!"
    }

Другие интересные и полезные комбинаторы

Ниже приведен супер-краткий список других комбинаторов Future, которые я считаю очень полезными.

застежка-молния

zip работает так же, как List.zip . Он просто объединяет два фьючерса и дает Future Tuple .

1
def zipTwoFutures:Future[(Int,Int)]=oneFuture zip twoFuture

firstCompletedOf

Ах! firstCompletedOf очень удобен, когда у вас есть два эквивалентных сервиса, и вы хотите продолжить, как только самый быстрый сервис вернет значение.

1
2
val listOfFutures=List(oneFuture,twoFuture,threeFuture)
  def getFirstResult():Future[Int]=Future.firstCompletedOf(listOfFutures)

В приведенном выше случае oneFuture возвращает самый быстрый.

последовательность

sequence чистая магия. Скажем, у вас есть List[Future[Int]] точно так же, как List(oneFuture,twoFuture,threeFuture) и вы требуете, чтобы все значения были возвращены вам в виде List[Int] вместо каждого Int заключенного в Future . sequence берет ваш List[Future[Int]] и превращается в Future[List[Int]]

1
def getResultsAsList():Future[List[Int]]=Future.sequence(listOfFutures)

В последний раз я использовал для пакетной обработки, где я выполнял логику параллельно с кусками данных и комбинировал их вместе с sequence .

Scala-асинхронная библиотека

Библиотека Scala Async является внешним проектом и может быть добавлена ​​в проект путем добавления зависимости в наш build.sbt.

1
"org.scala-lang.modules" %% "scala-async" % "0.9.6-RC2"

Библиотека Async имеет всего две мощные функции в своем классе scala.async.Asyncasync и await .

асинхронной

async функция очень похожа на функцию Future.apply . На самом деле их подписи очень похожи, и мы могли бы с комфортом заменить Future.apply async где бы он ни был доступен.

Future.apply

1
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T]

асинхронный

1
def async[T](body: => T)(implicit execContext: ExecutionContext): Future[T]

Основным преимуществом использования async над Future.apply , помимо общей простоты читабельности, является то, что для каждого генератора Future (при использовании с для понимания) компилятор выдает отдельный анонимный класс, в то время как с async это всего один аноним класс.

Поэтому мы могли бы переписать наше oneFuture как

1
2
3
4
val oneFuture: Future[Int] = async { 
    Thread.sleep(1000)
    1
}

Ждите

Функция await принимает Future и возвращает результат. Но разве это не то же самое, что Await.result который принимает Future и также возвращает результат? Нет. Ключевое отличие заключается в том, что Await.result блокируется и настоятельно не рекомендуется использовать в Await.result коде, за исключением тестовых случаев. Функция await , с другой стороны, реализована с использованием макросов Scala, и реализация состоит в том, что она возвращает результат Future, используя обратный вызов onComplete .

Поскольку async функция возвращает Future , все другие механизмы обработки ошибок и восстановления остаются такими же, как и раньше.

Код

Давайте перепишем предыдущую сумму трех чисел с помощью async / await:

1
2
3
def sumOfThreeNumbersParallelWithAsyncAwait(): Future[Int] = async {
    await(oneFuture) + await(twoFuture) + await(threeFuture)
  }

Прецедент

1
2
3
4
5
it("could be composed using async/await") {
      val futureCombinators = new FutureCombinators
      val result = timed(Await.result(futureCombinators.sumOfThreeNumbersParallelWithAsyncAwait(), 4 seconds))
      result shouldBe 6
    }

Как видим, код, написанный таким образом, не только асинхронный, но и выглядит естественным (на самом деле он выглядит синхронно). Мы можем утверждать, что for- flatMap — огромный скачок от использования map и flatMap но async/await делает еще один большой шаг вперед.

Код

Код и соответствующий ему тестовый пример находятся на github.

Ссылка: Примечания Scala — Futures — 3 (Combinators and Async) от нашего партнера по JCG Аруна Маниваннана в блоге Rerun.me .