В последнем посте мы увидели, как извлекать значения из Future
при onComplete
и их аналогах — onSuccess
и onFailure
. Мы также увидели, как использовать Await.result
в Await.result
для блокировки и получения значения из Future. В этом посте мы кратко обсудим отношения между Promise
и Future
.
обещание
Концепции Promise
и Future
идут рука об руку. scala.concurrent.Promise
— это тот, который устанавливает значение для Future
. Другими словами, Promise
является мозгом, стоящим за асинхронным выполнением вычислений, а Future
— всего лишь ручка для чтения результата, когда он станет доступным. Грубо говоря, Promise
является сеттером, а Future
— добытчиком.
Чаще всего нам не нужно явно создавать Обещание. Однако мы должны понимать, что такое Promise
, чтобы действительно понять, как работает Future
.
Давайте использовать следующие примеры, чтобы понять, как создать Promise
.
1. Выполнение обещания
В следующем фрагменте кода мы увидим, как значение устанавливается в обещании и как оно читается с другой стороны.
Мы
1. создать Promise
2. complete
Promise
, установив успешное значение
3. затем верните прочитанную сторону Обещания — Future
обратно вызывающей стороне, используя обещание. Future
За кулисами не происходит много времени. Значение устанавливается на Обещание немедленно, и, следовательно, значение немедленно доступно через Future
.
Код
1
2
3
4
5
6
7
8
9
|
class PromiseInternals { ... def aCompletedPromiseUsingSuccess(num:Int): Future[Int] = { val promise=Promise[Int]() promise.success(num) promise.future } ... ... |
Прецедент
Когда мы запускаем код onComplete
обратный вызов onComplete
вызывается сразу после вызова promise.success(100)
.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
class PromiseInternalsTest extends FunSpec with Matchers { describe( "A Future" ) { it( "gives out the correct value when a Promise is completed" ) { val promiseInternals = new PromiseInternals val aCompletedPromise=promiseInternals.aCompletedPromiseUsingSuccess( 100 ) assertValue(aCompletedPromise, 100 ) } ... ... def assertValueUsingOnComplete(future: Future[Int], expectedValue: Int): Unit = { future.onComplete { case Success(result) => { println (s "Result is $result and expectedValue is $expectedValue" ) result shouldBe expectedValue } case Failure (msg) => fail(msg) } } |
promise.success
— это просто ярлык для использования promise.complete
который принимает Try[T]
в качестве аргумента. Таким образом, мы могли бы написать вышеупомянутую функцию как:
1
2
3
4
5
|
def aCompletedPromiseUsingComplete(num:Int): Future[Int] = { val promise=Promise[Int]() promise.complete(Success(num)) promise.future } |
В качестве альтернативы, если мы хотели бы указать на сбой в вычислениях, мы могли бы использовать promise.complete(Failure(throwable))
или
1
2
3
4
5
|
def aCompletedPromiseUsingFailure(num:Int): Future[Int] = { val promise=Promise[Int]() promise.failure( new RuntimeException( "Evil Exception" )) promise.future } |
Подведем итог вышесказанному на рисунке:
2. Выполнение блока асинхронно
Теперь, когда мы увидели, как выполнить Promise, установив успешное значение или исключение, мы увидим, как выполнить блок кода асинхронно.
В следующем someExternalDelayedCalculation
мы передаем блок кода в someExternalDelayedCalculation
для выполнения асинхронно.
Прецедент
Давайте сначала посмотрим на тестовый пример.
- Мы передаем блок в качестве аргумента. Блок кода просто спит в течение 2 секунд, а затем возвращает 100.
- Подтвердите значение через 3 секунды.
Достаточно просто.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
it( "gives out the correct value when an asynchronous block is submitted and is completed through a Promise" ) { val promiseInternals = new PromiseInternals val longCalculationFuture = promiseInternals.someExternalDelayedCalculation{()=> Thread.sleep( 2000 ) 100 } println (s "We have submitted a block to be executed asynchronously ${longCalculationFuture.isCompleted}" ) //false at this point assertValue(longCalculationFuture, 100 ) } def assertValue(future: Future[Int], expectedValue: Int): Unit = { val resultVal=Await.result(future, 3000 seconds) resultVal shouldBe expectedValue } |
Код
Реализация someExternalDelayedCalculation
интересна:
Мы
1. создать FixedThreadPool
для выполнения нашего асинхронного кода.
2. создать обещание
3. создать Runnable
и обернуть блок для асинхронного run
методе run
4. закройте обещание и завершите обещание, используя результат run
5. запустите Runnable
в somePool
потоков somePool.
6. вернуть promise.future
Из которого звонящий может прочитать значение.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
val somePool=Executors.newFixedThreadPool( 2 ) def someExternalDelayedCalculation(f:()=>Int): Future[Int] = { val promise=Promise[Int]() val thisIsWhereWeCallSomeExternalComputation = new Runnable { override def run(): Unit ={ promise.complete{ try (Success(f())) catch { case NonFatal (msg)=> Failure(msg) } } } } somePool.execute(thisIsWhereWeCallSomeExternalComputation) promise.future } |
Это оно !!
3. Как на Future.apply()
деле реализован Future.apply()
?
Ну, я обманул. Код в пуле 2 фактически украден из фактической реализации самого Future.apply
.
Помните, в предыдущем посте мы видели, что когда блок кода передается в функцию применения Future
, он выполняется асинхронно.
Теперь сравните приведенный выше код в someExternalDelayedCalculation
с фактической реализацией Future.apply
и Runnable
который он переносит.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { val runnable = new PromiseCompletingRunnable(body) executor.prepare.execute(runnable) runnable.promise.future } class PromiseCompletingRunnable[T](body: => T) extends Runnable { val promise = new Promise.DefaultPromise[T]() override def run() = { promise complete { try Success(body) catch { case NonFatal(e) => Failure(e) } } } } |
Чтобы повторить те же шаги, что и выше, apply
функцию
- содержит
ThreadPool
который мы предоставляем как неявныйExecutionContext
- создает
Promise
, создаваяPromiseCompletingRunnable
который являетсяRunnable
- оборачивает блок для асинхронного
run
методеrun
- закрывает обещание и завершает обещание, используя результат
run
- выполняет
Runnable
используяExecutionContext
- возвращает
promise.future
из которого вызывающая сторона может прочитать значение.
4. После того, как написано, дважды ошибка
Как только обещание будет выполнено с Success
или Failure
, все, что мы можем сделать после этого, это извлечь значение из его Future
. Также onComplete
обратный вызов onComplete
Future. Значение, заключенное в Будущее Обещания, задано камнем и не может быть изменено.
Если мы пытаемся установить новое значение, выполнив уже выполненное обещание, возникает IllegalStateException
.
Код
Давайте посмотрим на это, используя фрагмент. В следующем коде мы создаем Promise и заполняем его значением 100. Затем мы пытаемся завершить его с ошибкой.
1
2
3
4
5
6
|
def alreadyCompletedPromise(): Future[Int] = { val promise = Promise[Int]() promise.success( 100 ) //completed promise.failure( new RuntimeException( "Will never be set because an IllegalStateException will be thrown beforehand" )) promise.future } |
Прецедент
IllegalStateException
только подтверждает, что IllegalStateException
при попытке выполнить Обещание с ошибкой.
1
2
3
4
5
6
|
it( "should throw an error if a Promise is attempted to be completed more than once" ) { val promiseInternals = new PromiseInternals intercept[IllegalStateException] { promiseInternals.alreadyCompletedPromise() } } |
Код
Полный код, поддерживающий этот блог, доступен в github.
Ссылка: | Примечания Scala — Futures — 2 (Обещания) от нашего партнера JCG Аруна Маниваннана в блоге Rerun.me . |