Статьи

Scala notes — Фьючерсы — 2 (Обещания)

В последнем посте мы увидели, как извлекать значения из Future при onComplete и их аналогах — onSuccess и onFailure . Мы также увидели, как использовать Await.result в Await.result для блокировки и получения значения из Future. В этом посте мы кратко обсудим отношения между Promise и Future .

обещание

Концепции Promise и Future идут рука об руку. scala.concurrent.Promise — это тот, который устанавливает значение для Future . Другими словами, Promise является мозгом, стоящим за асинхронным выполнением вычислений, а Future — всего лишь ручка для чтения результата, когда он станет доступным. Грубо говоря, Promise является сеттером, а Future — добытчиком.

Чаще всего нам не нужно явно создавать Обещание. Однако мы должны понимать, что такое Promise , чтобы действительно понять, как работает Future .

Давайте использовать следующие примеры, чтобы понять, как создать Promise .

1. Выполнение обещания

В следующем фрагменте кода мы увидим, как значение устанавливается в обещании и как оно читается с другой стороны.

Мы
1. создать Promise
2. complete Promise , установив успешное значение
3. затем верните прочитанную сторону Обещания — Future обратно вызывающей стороне, используя обещание. Future

За кулисами не происходит много времени. Значение устанавливается на Обещание немедленно, и, следовательно, значение немедленно доступно через Future .

Код

1
2
3
4
5
6
7
8
9
class PromiseInternals { 
...
 def aCompletedPromiseUsingSuccess(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.success(num)
    promise.future
  }
...
...

Прецедент

Когда мы запускаем код onComplete обратный вызов onComplete вызывается сразу после вызова promise.success(100) .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
class PromiseInternalsTest extends FunSpec with Matchers { 
  describe("A Future") {
    it("gives out the correct value when a Promise is completed") {
      val promiseInternals = new PromiseInternals
      val aCompletedPromise=promiseInternals.aCompletedPromiseUsingSuccess(100)
      assertValue(aCompletedPromise, 100)
    }
...
...
 
  def assertValueUsingOnComplete(future: Future[Int], expectedValue: Int): Unit = {
    future.onComplete {
      case Success(result) => {
        println (s"Result is $result and expectedValue is $expectedValue")
        result shouldBe expectedValue
      }
      case Failure (msg) => fail(msg)
    }
  }

promise.success — это просто ярлык для использования promise.complete который принимает Try[T] в качестве аргумента. Таким образом, мы могли бы написать вышеупомянутую функцию как:

1
2
3
4
5
def aCompletedPromiseUsingComplete(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.complete(Success(num))
    promise.future
  }

В качестве альтернативы, если мы хотели бы указать на сбой в вычислениях, мы могли бы использовать promise.complete(Failure(throwable)) или

1
2
3
4
5
def aCompletedPromiseUsingFailure(num:Int): Future[Int] = {
    val promise=Promise[Int]()
    promise.failure(new RuntimeException("Evil Exception"))
    promise.future
  }

Подведем итог вышесказанному на рисунке:

PromiseAndFuture

2. Выполнение блока асинхронно

Теперь, когда мы увидели, как выполнить Promise, установив успешное значение или исключение, мы увидим, как выполнить блок кода асинхронно.

В следующем someExternalDelayedCalculation мы передаем блок кода в someExternalDelayedCalculation для выполнения асинхронно.

Прецедент

Давайте сначала посмотрим на тестовый пример.

  1. Мы передаем блок в качестве аргумента. Блок кода просто спит в течение 2 секунд, а затем возвращает 100.
  2. Подтвердите значение через 3 секунды.

Достаточно просто.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
it("gives out the correct value when an asynchronous block is submitted and is completed through a Promise") {
      val promiseInternals = new PromiseInternals
      val longCalculationFuture = promiseInternals.someExternalDelayedCalculation{()=>
        Thread.sleep(2000)
        100
      }
      println (s"We have submitted a block to be executed asynchronously ${longCalculationFuture.isCompleted}") //false at this point
      assertValue(longCalculationFuture, 100)
    }
 
  def assertValue(future: Future[Int], expectedValue: Int): Unit = {
    val resultVal=Await.result(future, 3000 seconds)
    resultVal shouldBe expectedValue
  }

Код

Реализация someExternalDelayedCalculation интересна:

Мы
1. создать FixedThreadPool для выполнения нашего асинхронного кода.
2. создать обещание
3. создать Runnable и обернуть блок для асинхронного run методе run
4. закройте обещание и завершите обещание, используя результат run
5. запустите Runnable в somePool потоков somePool.
6. вернуть promise.future Из которого звонящий может прочитать значение.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
val somePool=Executors.newFixedThreadPool(2)
 
  def someExternalDelayedCalculation(f:()=>Int): Future[Int] = {
    val promise=Promise[Int]()
    val thisIsWhereWeCallSomeExternalComputation = new Runnable {
      override def run(): Unit ={
        promise.complete{
          try(Success(f()))
          catch {
            case NonFatal (msg)=> Failure(msg)
          }
        }
      }
    }
 
    somePool.execute(thisIsWhereWeCallSomeExternalComputation)
    promise.future
  }

Это оно !!

3. Как на Future.apply() деле реализован Future.apply() ?

Ну, я обманул. Код в пуле 2 фактически украден из фактической реализации самого Future.apply .

Помните, в предыдущем посте мы видели, что когда блок кода передается в функцию применения Future , он выполняется асинхронно.

Теперь сравните приведенный выше код в someExternalDelayedCalculation с фактической реализацией Future.apply и Runnable который он переносит.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
    val runnable = new PromiseCompletingRunnable(body)
    executor.prepare.execute(runnable)
    runnable.promise.future
  }
 
class PromiseCompletingRunnable[T](body: => T) extends Runnable { 
    val promise = new Promise.DefaultPromise[T]()
 
    override def run() = {
      promise complete {
        try Success(body) catch { case NonFatal(e) => Failure(e) }
      }
    }
  }

Чтобы повторить те же шаги, что и выше, apply функцию

  1. содержит ThreadPool который мы предоставляем как неявный ExecutionContext
  2. создает Promise , создавая PromiseCompletingRunnable который является Runnable
  3. оборачивает блок для асинхронного run методе run
  4. закрывает обещание и завершает обещание, используя результат run
  5. выполняет Runnable используя ExecutionContext
  6. возвращает promise.future из которого вызывающая сторона может прочитать значение.

4. После того, как написано, дважды ошибка

Как только обещание будет выполнено с Success или Failure , все, что мы можем сделать после этого, это извлечь значение из его Future . Также onComplete обратный вызов onComplete Future. Значение, заключенное в Будущее Обещания, задано камнем и не может быть изменено.

Если мы пытаемся установить новое значение, выполнив уже выполненное обещание, возникает IllegalStateException .

Код

Давайте посмотрим на это, используя фрагмент. В следующем коде мы создаем Promise и заполняем его значением 100. Затем мы пытаемся завершить его с ошибкой.

1
2
3
4
5
6
def alreadyCompletedPromise(): Future[Int] = {
    val promise = Promise[Int]()
    promise.success(100) //completed
    promise.failure(new RuntimeException("Will never be set because an IllegalStateException will be thrown beforehand"))
    promise.future
  }

Прецедент

IllegalStateException только подтверждает, что IllegalStateException при попытке выполнить Обещание с ошибкой.

1
2
3
4
5
6
it("should throw an error if a Promise is attempted to be completed more than once") {
      val promiseInternals = new PromiseInternals
      intercept[IllegalStateException] {
        promiseInternals.alreadyCompletedPromise()
      }
    }

Код

Полный код, поддерживающий этот блог, доступен в github.

Ссылка: Примечания Scala — Futures — 2 (Обещания) от нашего партнера JCG Аруна Маниваннана в блоге Rerun.me .