Статьи

Scala Notes — Фьючерсы — 1

Почти все современные языки программирования имеют идиому Future-Promise для параллельного программирования. Я не собираюсь утомлять вас, почему нам нужен более высокий уровень абстракций параллелизма. Вместо этого в этом посте мы перейдем к преследованию и обсудим только подход Scala к Futures.

scala.concurrent.Future — это представление значения, которое еще предстоит реализовать. Это значение обычно является результатом более длинных и / или параллельных вычислений.

Грубо говоря, блок кода, который выполняется синхронно, когда обернут в Future работает асинхронно. Все реализации Future дают нам дескриптор, с помощью которого мы можем получить возвращаемое значение вычисления блока (или какой в ​​этом смысл!).

В этом посте мы рассмотрим основы того, как построить Future и извлечь из него значение путем блокировки ожидания и обратных вызовов. В следующей части мы поговорим о создании Futures и других сложных конструкций, таких как recover и recover для обработки исключений.

СОЗДАНИЕ АСИНХРОННОГО ВЫЧИСЛЕНИЯ

Создать вычисление, которое выполняется асинхронно с использованием Future , очень просто. Нам просто нужно добавить нашу логику в функцию применения Future

1
2
3
val aFuture: Future[Int] = Future { 
    //Some massively huge super important computation
}

В качестве примера, давайте создадим oneFuture который возвращает 1 после задержки в одну секунду.

1
2
3
4
val oneFuture: Future[Int] = Future { 
    Thread.sleep(1000)
    1
}

Давайте сделаем паузу и рассмотрим функцию apply

1
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T]

Ага. Тут ты меня подловил. Метод также принимает нечто, называемое ExecutionContext в качестве неявного аргумента. Когда для построения Future используется блок кода, вычисления выполняются по ExecutionContext . ExecutionContext является синонимом ThreadPool и когда Future запускается, он запускается в отдельном потоке.

Scala предоставляет готовый статический глобальный ExecutionContext в scala.concurrent.ExecutionContext.global который мы планируем использовать сейчас. (Подробнее об этом чуть позже)

Итак, весь код на самом деле выглядит так

01
02
03
04
05
06
07
08
09
10
class PromisingFutures{
 
  import scala.concurrent.ExecutionContext.Implicits.global
 
  val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
  }
  ...
  ...
ПО ИСПОЛЬЗОВАНИЮ SCALA.CONCURRENT.EXECUTIONCONTEXT.GLOBAL

global ExecutionContext очень удобен в использовании. Тем не менее, базовый ThreadPool является ForkJoinPool . В этом нет ничего плохого, и ForkJoinPool удивителен для кратковременных вычислений, но настоятельно не рекомендуется для блокирования ввода-вывода, такого как вызовы базы данных или веб-службы (или для этого даже для длительных вычислений или просто разговоров вне JVM).

Хорошей новостью является то, что мы все еще можем использовать Future, но обходной путь — просто использовать отдельный пул потоков, который не является ForkJoinPool — например, исправленный ThreadPool является хорошим вариантом.

1
2
3
4
implicit lazy val fixedThreadPoolExecutionContext: ExecutionContext =
    val fixedThreadPool: ExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors * 2) //or some fixed number
    ExecutionContext.fromExecutor(fixedThreadPool)
}

БУДУЩИЕ ГОСУДАРСТВА И ЦЕННОСТИ

Прежде чем мы перейдем к извлечению значений из будущего, давайте посмотрим, каковы различные состояния будущего и когда и какое значение действительно доступно во время этих состояний.

У Будущего только два состояния — Не Завершено и Завершено.

Рассмотрим следующее изображение

FutureStatesAndValues

Когда дано вычисление, полученное значение, которое мы могли бы получить из Future, является Option[Try[T]] .

  1. В состоянии Not Complete результат вычисления еще не реализован. Следовательно, значение будет None .
  2. После завершения , результатом будет Some(Try[T]) что означает, что это может быть одним из следующих:
    1. Положительный результат расчета или
    2. Исключение

Давайте посмотрим на случай Success на примере

01
02
03
04
05
06
07
08
09
10
def checkState(): Unit =
    println("Before the job finishes")
    Thread.sleep(500)
    println(s"Completed : ${oneFuture.isCompleted}, Value : ${oneFuture.value}")
 
    println("After the job finishes")
    Thread.sleep(1100)
    println(s"Completed : ${oneFuture.isCompleted}, Value : ${oneFuture.value}")
 
}

Время, необходимое для завершения delayedStringFuture Future, составляет 1 секунду. Итак, сначала мы проверяем через 500 миллисекунд, завершено ли Future и печатаем его текущее значение. Функция value Future возвращает Option[Try[T]] . Не удивительно, что мы получаем false для проверки isCompleted и None как само значение.

Выход за 500 мс:

1
2
Before the job finishes 
Completed : false, Value : None

Давайте проверим еще раз через 1100 миллисекунд, предоставив некоторую свободу после этого 1000 миллисекундного сна. Выходные данные теперь являются результатом самих вычислений, и теперь статус завершения равен true.

Выход за 1100 мс:

1
2
After the job finishes 
Completed : true, Value : Some(Success(1))

Теперь, когда мы получили это из нашего пути, давайте посмотрим, как извлечь ценность из будущего

ДОБЫВАЮЩАЯ ЦЕННОСТЬ ИЗ БУДУЩЕГО

Помимо составления фьючерсов, которые мы увидим на следующей неделе, есть два способа извлечь только значение из будущего — блокирование ожидания и обратного вызова

1. БЛОКИРОВКА ПОДОЖДИТЕ, ИСПОЛЬЗУЯ AWAIT.

Функция результата scala.concurrent.Await имеет следующий синтаксис:

1
2
3
@throws(classOf[Exception])
    def result[T](awaitable: Awaitable[T], atMost: Duration): T =
      blocking(awaitable.result(atMost)(AwaitPermission))

Он принимает реализацию Awaitable черты, которой является Future . Он также принимает второй параметр atMost который указывает максимальную продолжительность времени, которое поток вызывающей стороны должен блокировать для результата. По истечении срока действия atMost , если Future все еще не завершится, будет atMost java.util.concurrent.TimeoutException .

Если будущее завершено, Await.result извлечет нам фактическое значение. Если будущее завершено и если результатом будущего является Throwable, то исключение передается вызывающей стороне.

Использование Await.result в производственном коде крайне Await.result но эта конструкция пригодится для запуска Await.result против Future.

Давайте рассмотрим два Futures, оригинальный oneFuture и oneDangerousFuture который создает исключение.

КОД

Вот как они выглядят:

01
02
03
04
05
06
07
08
09
10
11
val oneFuture: Future[Int] = Future {
    Thread.sleep(1000)
    1
  }
 
  val oneDangerousFuture=Future{
    Thread.sleep(2000)
    throw new SomeComputationException("Welcome to the Dark side !")
  }
 
  case class SomeComputationException(msg: String) extends Exception(msg)

TESTCASES

У нас есть три теста:

  1. Первый сценарий — это наш счастливый день. Вычисление занимает 1 секунду, мы ждем результата в течение максимум 2 секунд. Мы, очевидно, будем иметь наш результат в руках. Наше утверждение, что значение должно быть возвращено, становится верным.
  2. Во втором SomeComputationException мы SomeComputationException Future, который SomeComputationException исключение SomeComputationException и мы утверждаем, что исключение SomeComputationException вызывающей стороне, когда мы ожидаем результата.
  3. В последнем тестовом примере мы ждем только 500 миллисекунд, пока само вычисление занимает 1 секунду. Как мы видели из реализации Await.result , этот вызов вызывает TimeoutException .
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class PromisingFutureTest extends FunSpec with Matchers {
 
  describe("A PromisingFuture") {
 
    it("should hold a Int value if the Await.result is called after the Future completes") {
      val promisingFuture = new PromisingFutures()
      val oneFuture = promisingFuture.oneFuture //Takes 1 second to compute
      val intValue = Await.result(oneFuture, 2 seconds)
      intValue should be(1)
    }
 
    it("should propagate the Exception to the callee if the computation threw an exception") {
      val promisingFuture = new PromisingFutures()
      val oneDangerousFuture = promisingFuture.oneDangerousFuture //throws exception
      intercept[SomeComputationException] {
        val intValue = Await.result(oneDangerousFuture, 2 seconds)
      }
    }
 
    it("should throw a TimeOutException exception when an Await.result's atMost parameter is lesser than the time taken for the Future to complete") {
      val promisingFuture = new PromisingFutures()
      val oneDelayedFuture = promisingFuture.oneFuture //Takes 1 second to compute
      intercept[TimeoutException] {
        Await.result(oneDelayedFuture, 500 millis)
      }
    }
  }
 
}

2. ЗВОНОК:

Альтернативный и чистый способ извлечь значение из Future (кроме составления) — это обратные вызовы. В будущем доступны три различных обратных вызова — onSuccess , onFailure и комбинированный onComplete .

  1. onSuccess вызов onSuccess только после успешного завершения Future с результатом.
  2. onFailure вызов onFailure только при наличии исключения.
  3. onComplete представляет собой комбинацию onSuccess и onFailure . Он принимает функцию, которая работает над Try[T] после того, как Option с Option будущего будет развернут.
1
def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit

Обратите внимание, что все обратные вызовы возвращают Unit что означает, что они не могут быть составлены и имеют побочные эффекты.

Теперь давайте посмотрим, как мы можем использовать обратный вызов onComplete . У меня есть этот маленький метод с именем printFuture который просто записывает на консоль содержимое Future после его завершения. Давайте попробуем передать в него и oneFuture и oneDangerousFuture .

1
2
3
4
5
6
7
8
class PromisingFutures { 
...
...
def printFuture[T](future: Future[T]): Unit = future.onComplete { 
    case Success(result) => println(s"Success $result")
    case Failure(throwable) => println(s"Failure $throwable")
}
...
01
02
03
04
05
06
07
08
09
10
object PromisingFutures{
 
 def main(args: Array[String]) {
    val promisingFutures=new PromisingFutures
    promisingFutures.printFuture(promisingFutures.oneFuture)
    promisingFutures.printFuture(promisingFutures.oneDangerousFuture)
 
    synchronized(wait(3000))
  }
}

Выход :

1
2
Success 1 
Failure SomeComputationException: Welcome to the Dark side !

Как и ожидалось, oneFuture переходит к случаю Success и oneDangerousFuture 1, а oneDangerousFuture переходит к случаю Failure и печатает исключение:

TIMEUNIT

Scala предоставляет очень удобный DSL-подобный синтаксис для представления TimeUnit, например. 2 секунды, 5 минут и т. Д. Три неявных пакета scala.concurrent.duration классов — DurationInt , DurationLong и DurationDouble и trait DurationConversions делают эту магию. Все, что нам нужно сделать, это импортировать scala.concurrent.duration._ .

Кроме того, «500 миллисекунд» в нашем примере может быть представлено как «500 миллисекунд», «500 миллисекунд» или «500 миллисекунд». Полный список псевдонимов для различных единиц времени можно найти в исходном коде или Scaladoc черты DurationConversions .

КОД

Полный код, поддерживающий этот блог, доступен в github.

Ссылка: Scala Notes — Futures — 1 от нашего партнера JCG Аруна Маниваннана в блоге Rerun.me .