В последнем посте мы увидели, как извлекать значения из
Future при onComplete и их аналогах — onSuccess и onFailure . Мы также увидели, как использовать Await.result в Await.result для блокировки и получения значения из Future. В этом посте мы кратко обсудим отношения между Promise и Future .
обещание
Концепции Promise и Future идут рука об руку. scala.concurrent.Promise — это тот, который устанавливает значение для Future . Другими словами, Promise является мозгом, стоящим за асинхронным выполнением вычислений, а Future — всего лишь ручка для чтения результата, когда он станет доступным. Грубо говоря, Promise является сеттером, а Future — добытчиком.
Чаще всего нам не нужно явно создавать Обещание. Однако мы должны понимать, что такое Promise , чтобы действительно понять, как работает Future .
Давайте использовать следующие примеры, чтобы понять, как создать Promise .
1. Выполнение обещания
В следующем фрагменте кода мы увидим, как значение устанавливается в обещании и как оно читается с другой стороны.
Мы
1. создать Promise
2. complete Promise , установив успешное значение
3. затем верните прочитанную сторону Обещания — Future обратно вызывающей стороне, используя обещание. Future
За кулисами не происходит много времени. Значение устанавливается на Обещание немедленно, и, следовательно, значение немедленно доступно через Future .
Код
|
1
2
3
4
5
6
7
8
9
|
class PromiseInternals { ... def aCompletedPromiseUsingSuccess(num:Int): Future[Int] = { val promise=Promise[Int]() promise.success(num) promise.future }...... |
Прецедент
Когда мы запускаем код onComplete обратный вызов onComplete вызывается сразу после вызова promise.success(100) .
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
class PromiseInternalsTest extends FunSpec with Matchers { describe("A Future") { it("gives out the correct value when a Promise is completed") { val promiseInternals = new PromiseInternals val aCompletedPromise=promiseInternals.aCompletedPromiseUsingSuccess(100) assertValue(aCompletedPromise, 100) }...... def assertValueUsingOnComplete(future: Future[Int], expectedValue: Int): Unit = { future.onComplete { case Success(result) => { println (s"Result is $result and expectedValue is $expectedValue") result shouldBe expectedValue } case Failure (msg) => fail(msg) } } |
promise.success — это просто ярлык для использования promise.complete который принимает Try[T] в качестве аргумента. Таким образом, мы могли бы написать вышеупомянутую функцию как:
|
1
2
3
4
5
|
def aCompletedPromiseUsingComplete(num:Int): Future[Int] = { val promise=Promise[Int]() promise.complete(Success(num)) promise.future } |
В качестве альтернативы, если мы хотели бы указать на сбой в вычислениях, мы могли бы использовать promise.complete(Failure(throwable)) или
|
1
2
3
4
5
|
def aCompletedPromiseUsingFailure(num:Int): Future[Int] = { val promise=Promise[Int]() promise.failure(new RuntimeException("Evil Exception")) promise.future } |
Подведем итог вышесказанному на рисунке:
2. Выполнение блока асинхронно
Теперь, когда мы увидели, как выполнить Promise, установив успешное значение или исключение, мы увидим, как выполнить блок кода асинхронно.
В следующем someExternalDelayedCalculation мы передаем блок кода в someExternalDelayedCalculation для выполнения асинхронно.
Прецедент
Давайте сначала посмотрим на тестовый пример.
- Мы передаем блок в качестве аргумента. Блок кода просто спит в течение 2 секунд, а затем возвращает 100.
- Подтвердите значение через 3 секунды.
Достаточно просто.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
it("gives out the correct value when an asynchronous block is submitted and is completed through a Promise") { val promiseInternals = new PromiseInternals val longCalculationFuture = promiseInternals.someExternalDelayedCalculation{()=> Thread.sleep(2000) 100 } println (s"We have submitted a block to be executed asynchronously ${longCalculationFuture.isCompleted}") //false at this point assertValue(longCalculationFuture, 100) } def assertValue(future: Future[Int], expectedValue: Int): Unit = { val resultVal=Await.result(future, 3000 seconds) resultVal shouldBe expectedValue } |
Код
Реализация someExternalDelayedCalculation интересна:
Мы
1. создать FixedThreadPool для выполнения нашего асинхронного кода.
2. создать обещание
3. создать Runnable и обернуть блок для асинхронного run методе run
4. закройте обещание и завершите обещание, используя результат run
5. запустите Runnable в somePool потоков somePool.
6. вернуть promise.future Из которого звонящий может прочитать значение.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
val somePool=Executors.newFixedThreadPool(2) def someExternalDelayedCalculation(f:()=>Int): Future[Int] = { val promise=Promise[Int]() val thisIsWhereWeCallSomeExternalComputation = new Runnable { override def run(): Unit ={ promise.complete{ try(Success(f())) catch { case NonFatal (msg)=> Failure(msg) } } } } somePool.execute(thisIsWhereWeCallSomeExternalComputation) promise.future } |
Это оно !!
3. Как на Future.apply() деле реализован Future.apply() ?
Ну, я обманул. Код в пуле 2 фактически украден из фактической реализации самого Future.apply .
Помните, в предыдущем посте мы видели, что когда блок кода передается в функцию применения Future , он выполняется асинхронно.
Теперь сравните приведенный выше код в someExternalDelayedCalculation с фактической реализацией Future.apply и Runnable который он переносит.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { val runnable = new PromiseCompletingRunnable(body) executor.prepare.execute(runnable) runnable.promise.future }class PromiseCompletingRunnable[T](body: => T) extends Runnable { val promise = new Promise.DefaultPromise[T]() override def run() = { promise complete { try Success(body) catch { case NonFatal(e) => Failure(e) } } } } |
Чтобы повторить те же шаги, что и выше, apply функцию
- содержит
ThreadPoolкоторый мы предоставляем как неявныйExecutionContext - создает
Promise, создаваяPromiseCompletingRunnableкоторый являетсяRunnable - оборачивает блок для асинхронного
runметодеrun - закрывает обещание и завершает обещание, используя результат
run - выполняет
RunnableиспользуяExecutionContext - возвращает
promise.futureиз которого вызывающая сторона может прочитать значение.
4. После того, как написано, дважды ошибка
Как только обещание будет выполнено с Success или Failure , все, что мы можем сделать после этого, это извлечь значение из его Future . Также onComplete обратный вызов onComplete Future. Значение, заключенное в Будущее Обещания, задано камнем и не может быть изменено.
Если мы пытаемся установить новое значение, выполнив уже выполненное обещание, возникает IllegalStateException .
Код
Давайте посмотрим на это, используя фрагмент. В следующем коде мы создаем Promise и заполняем его значением 100. Затем мы пытаемся завершить его с ошибкой.
|
1
2
3
4
5
6
|
def alreadyCompletedPromise(): Future[Int] = { val promise = Promise[Int]() promise.success(100) //completed promise.failure(new RuntimeException("Will never be set because an IllegalStateException will be thrown beforehand")) promise.future } |
Прецедент
IllegalStateException только подтверждает, что IllegalStateException при попытке выполнить Обещание с ошибкой.
|
1
2
3
4
5
6
|
it("should throw an error if a Promise is attempted to be completed more than once") { val promiseInternals = new PromiseInternals intercept[IllegalStateException] { promiseInternals.alreadyCompletedPromise() } } |
Код
Полный код, поддерживающий этот блог, доступен в github.
| Ссылка: | Примечания Scala — Futures — 2 (Обещания) от нашего партнера JCG Аруна Маниваннана в блоге Rerun.me . |
