Akka — это актерская, управляемая событиями среда для создания высококонкурентных, надежных приложений. Не должно быть сюрпризом, что концепция будущего повсеместна в такой системе. Обычно вы никогда не блокируете ожидание ответа, вместо этого вы отправляете сообщение и ожидаете, что ответ придет в будущем. Похоже, отлично подходит для … будущего. Более того, фьючерсы в Akka особенные по двум причинам: синтаксис Scala вместе с выводом типов значительно улучшают читабельность и монадическую природу. Чтобы полностью оценить последнее преимущество, ознакомьтесь с scala.Option Cheat Sheet, если вы еще не освоили монады на практике в Scala.
Мы продолжим наш пример с веб-сканером , применяя еще один подход, на этот раз с Akka на вершине Scala. Сначала основной синтаксис:
1
2
3
4
|
val future = Future { Source.fromURL( ).mkString |
Это было быстро! future
имеет scala.concurrent.Future[String]
. При условии, что блок кода будет выполняться асинхронно позже, а future
(типа Future[String]
) представляет дескриптор значения этого блока. К настоящему времени вам должно быть интересно, как вы настраиваете потоки, выполняющие эту задачу? Хороший вопрос, этот код не будет компилироваться в том виде, в каком он есть, для работы нужен ExecutionContext
. ExecutionContext
аналогичен ExecutorService
но может быть задан неявно. У вас есть несколько вариантов:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
import ExecutionContext.Implicits.global //or implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool( 50 )) //or (inside actor) import context.dispatcher //or (explicitly) val future = Future { //... } (ec) |
Первый подход использует встроенный контекст выполнения, состоящий из такого количества потоков, сколько у вас есть процессоров / ядер. Используйте этот контекст только для небольших приложений, поскольку он плохо масштабируется и довольно негибок. Второй подход берет существующий ExecutorService
и оборачивает его. Вы полностью контролируете количество потоков и их поведение. Обратите внимание, что implicit val
выбирается автоматически. Если вы находитесь внутри актера, вы можете повторно использовать dispatcher
Akka для запуска вашей задачи, используя тот же пул потоков, что и актеры. Наконец, вы, конечно, можете явно передать ExecutionContext
. В следующих примерах я предполагаю, что некоторый неявный контекст доступен.
Имея экземпляр Future
мы бы хотели обработать результат. Я даже не упоминаю о блокировке и их синхронном ожидании (но изучите официальную документацию, если она вам действительно нужна). Более в духе ListenableFuture
из Гуавы, мы сначала зарегистрируем несколько обратных вызовов завершения:
1
2
3
4
5
6
|
Future { } onComplete { case Success(html) = > logger.info( "Result: " + html) case Failure(ex) = > logger.error( "Problem" , ex) } |
Это похоже на ListenableFuture
но с более чистым синтаксисом. Однако в нашей коробке есть еще более мощные инструменты. Помните, в прошлый раз у нас был один синхронный метод для анализа загруженного HTML и второй асинхронный метод для вычисления релевантности документа (что бы это ни значило):
1
2
3
4
5
6
7
|
def downloadPage(url : URL) = Future { Source.fromURL(url, StandardCharsets.UTF _ 8 .name()).mkString } def parse(html : String) : Document = //... def calculateRelevance(doc : Document) : Future[Double] = //... |
Конечно, мы можем зарегистрировать обратный вызов onComplete
но фьючерсы в Akka / Scala являются монадами, поэтому мы можем обрабатывать данные как последовательность связанных строго типизированных преобразований (явные типы сохранены для ясности):
1
2
3
4
5
|
val documentFuture : Future[Document] = htmlFuture map parse val relevanceFuture : Future[Double] = documentFuture flatMap calculateRelevance val bigRelevanceFuture : Future[Double] = relevanceFuture filter { _ > 0.5 } bigRelevanceFuture foreach println |
Я хочу быть ясным здесь. Вызов Future.map(someOperation)
не ожидает завершения этого будущего. Он просто оборачивает его и запускает someOperation
тот момент, когда она завершается, когда-то в будущем. То же самое относится к Future.filter
и Future.foreach
. Вы можете быть удивлены, увидев их в этом контексте, поскольку мы обычно связываем такие операторы с коллекциями. Но, как и в Option[T]
, Future[T]
, значительно упрощая, представляет собой коллекцию, которая может содержать или не содержать ровно один элемент. С этим сравнением должно быть очевидно, что делает код выше. Вызов Future.filter
может быть неясным, но в основном он указывает, что мы не заинтересованы в результате асинхронной операции, которая не соответствует определенным критериям. Если предикат выдает false
, операция foreach
никогда не будет выполнена.
Конечно, вы можете воспользоваться преимуществами вывода типов и цепочки, чтобы получить более краткий, но не обязательно более легкий для чтения код:
1
2
3
4
5
|
map(parse). flatMap(calculateRelevance). filter( _ > 0.5 ). foreach(println |
Но самая большая победа приходит из-за понимания. Вы можете не знать об этом, но поскольку Future
реализует map
, foreach
, filter
и тому подобное (упрощение), мы можем использовать его внутри для понимания (то же самое относится и к Option[T]
). Итак, еще один, возможно, наиболее читаемый подход будет:
1
2
3
4
5
6
7
|
for { relevance <- calculateRelevance(parse(html)) if (relevance > 0.5 ) } println(relevance) println( "Done" ) |
Это кажется очень обязательным и последовательным, но на самом деле каждый шаг этого для понимания выполняется асинхронно, и здесь нет блокировки. Сообщение "Done"
будет отображено немедленно, задолго до вычисленной релевантности. Эта конструкция приносит лучшее из обоих миров — выглядит последовательно, но на самом деле работает в фоновом режиме. Более того, он скрывает скрытую разницу между функциями, возвращающими значение в сравнении с Future
значения ( map
против flatMap
).
Скажем, мы запустили приведенный выше код для списка веб-сайтов, который дает нам List[Future[Double]]
и теперь мы хотим найти наибольшую релевантность в этом наборе. К настоящему времени вы должны отказаться от всех решений, связанных с блокировкой. В Scala есть два умных способа сделать это: либо превратить List[Future[Double]]
в Future[List[Double]]
либо свернуть список фьючерсов. Первые решения идентичны Futures.allAsList
в Futures.allAsList
:
1
2
3
4
5
|
val futures : Seq[Future[Double]] = //... val future : Future[Seq[Double]] = Future sequence futures future.onSuccess{ case x = > println(s "Max relevance: ${x.max}" ) } |
или даже более кратко (помните, что x
является Seq[Double]
в обоих случаях:
1
2
3
|
Future.sequence(futures).map {x = > println(s "Max relevance: ${x.max}" ) } |
Помните, что здесь нет блокировки. Future[Seq[Double]]
завершается, когда последний базовый Future[Double]
сообщает о завершении. Если вам нравится foldLeft()
точно так же, как я (но не обязательно здесь), рассмотрите следующую идиому:
1
2
3
|
Future.fold(futures)( 0.0 ) { _ max _ } map {maxRel = > println(s "Max relevance: $maxRel" ) } |
Он перебирает фьючерсы один за другим и вызывает предоставленную нами функцию {_ max _}
сгиба всякий раз, когда данное будущее будет успешным.
Резюме
Фьючерсы в Scala и Akka очень мощные: они позволяют неблокирующее, эффективное для процессора асинхронное программирование, но они чувствуют себя как императивное однопоточное программирование. Вы можете применить последовательность преобразований поверх одного будущего или набора из них так же, как если бы это будущее уже было разрешено. Код выглядит абсолютно обязательным, когда вы ожидаете один этап, выполняете некоторые преобразования и запускаете второй этап. Но в действительности все является асинхронным и управляемым событиями. Из-за монадической природы Future[V]
и лаконичного синтаксиса, фьючерсы в Scala являются прекрасным инструментом без излишней церемонии.
Ссылка: Фьючерсы в Akka со Scala от нашего партнера JCG Томаша Нуркевича в блоге NoBlogDefFound .