Scala — это функциональный и объектно-ориентированный язык, который работает на JVM. Для параллельного и / или параллельного программирования это подходящий выбор вместе с платформой Akka , которая предоставляет богатый набор инструментов для всех видов параллельных задач. В этом посте я хочу показать небольшой пример того, как запланировать задание поиска по журналу на нескольких файлах / серверах с Futures и Actors .
Настроить
Я создал свою настройку с помощью шаблона Hello-Akka активатора Typesafe. В результате получается файл build.sbt со следующим содержимым:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
name := """hello-akka"""version := "1.0"scalaVersion := "2.10.2"libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.2.0", "com.typesafe.akka" %% "akka-testkit" % "2.2.0", "com.google.guava" % "guava" % "14.0.1", "org.scalatest" % "scalatest_2.10" % "1.9.1" % "test", "junit" % "junit" % "4.11" % "test", "com.novocode" % "junit-interface" % "0.7" % "test->default")testOptions += Tests.Argument(TestFrameworks.JUnit, "-v") |
Scala встроенный фьючерс
В Scala уже есть встроенная поддержка Futures. Реализация основана на java.util.concurrent . Давайте реализуем будущее, которое запускает наш поиск по журналу.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
import scala.concurrent._import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits._object LogSearch extends App {println("Starting log search")val searchFuture = future { Thread sleep 1000 "Found something"}println("Blocking for results") val result = Await result (searchFuture, 5 seconds) println(s"Found $result")} |
Это все, что нам нужно для запуска нашей задачи в другом потоке. Неявный импорт из ExecutionContext предоставляет по умолчанию ExecutionContext, который обрабатывает потоки, в которых будет выполняться будущее. После создания будущего мы ждем с блокирующим вызовом Await result для наших результатов. Пока ничего особенного.
Будущая композиция
Есть много примеров, когда синтаксис for-yield используется для составления будущих результатов. В нашем случае у нас есть динамический список фьючерсов: результаты поиска в журнале с каждого сервера.
Для тестирования будущих возможностей мы создадим список фьючерсов из списка целых, представляющих время выполнения задачи. Типы только для пояснения.
|
1
2
3
4
5
6
7
|
val tasks = List(3000, 1200, 1800, 600, 250, 1000, 1100, 8000, 550)val taskFutures: List[Future[String]] = tasks map { ms => future { Thread sleep ms s"Task with $ms ms" }} |
В конце мы хотим получить List [String] в результате. Это делается с помощью сопутствующего объекта Futures.
|
1
|
val searchFuture: Future[List[String]] = Future sequence taskFutures |
И, наконец, мы можем дождаться наших результатов с
|
1
|
val result = Await result (searchFuture, 2 seconds) |
Однако это вызовет исключение TimeoutException , так как некоторые из наших задач выполняются более 2 секунд. Конечно, мы могли бы увеличить тайм-аут, но там всегда может произойти ошибка, когда сервер не работает. Другой подход — обработать исключение и вернуть ошибку. Однако все остальные результаты будут потеряны.
Будущее — тайм-аут
Нет проблем, мы создаем запасной вариант, который вернет значение по умолчанию, если операция займет слишком много времени. Очень наивная реализация для нашего запасного варианта может выглядеть так
|
1
2
3
4
|
def fallback[A](default: A, timeout: Duration): Future[A] = future { Thread sleep timeout.toMillis default} |
Аварийное будущее вернется после того, как исполняющий поток спит в течение времени ожидания. Код вызова теперь выглядит следующим образом.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
val timeout = 2 secondsval tasks = List(3000, 1200, 1800, 600, 250, 1000, 1100, 8000, 550)val taskFutures: List[Future[String]] = tasks map { ms =>val search = future { Thread sleep ms s"Task with $ms ms"}Future firstCompletedOf Seq(search, fallback(s"timeout $ms", timeout))}val searchFuture: Future[List[String]] = Future sequence taskFuturesprintln("Blocking for results")val result = Await result (searchFuture, timeout * tasks.length)println(s"Found $result") |
Важным вызовом здесь является Future firstCompletedOf Seq (..), который создает будущее, возвращающее результат первого законченного будущего.
Эта реализация очень плохая, как обсуждалось здесь . Короче говоря: мы тратим процессорное время, усыпляя потоки. Кроме того, время ожидания блокировки является более или менее предположительным. С однопоточным планировщиком это может занять больше времени.
Фьючерсы и Акка
Теперь давайте сделаем это более производительным и надежным. Наша главная цель — избавиться от плохой резервной реализации, которая блокировала весь поток. Идея состоит в том, чтобы запланировать функцию отката по истечении заданного времени. Таким образом, у вас есть все потоки, работающие на реальном, в то время как запасное время будущего выполнения почти равно нулю. Java имеет ScheduledExecutorService самостоятельно или вы можете использовать другую реализацию, HashedWheelTimer , от Netty. Акка раньше использовал HashWheelTimer, но теперь имеет собственную реализацию .
Итак, начнем с актера.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import akka.actor._import akka.pattern.{ after, ask, pipe }import akka.util.Timeoutclass LogSearchActor extends Actor { def receive = { case Search(worktimes, timeout) => // Doing all the work in one actor using futures val searchFutures = worktimes map { worktime => val searchFuture = search(worktime) val fallback = after(timeout, context.system.scheduler) { Future successful s"$worktime ms > $timeout" } Future firstCompletedOf Seq(searchFuture, fallback) } // Pipe future results to sender (Future sequence searchFutures) pipeTo sender } def search(worktime: Int): Future[String] = future { Thread sleep worktime s"found something in $worktime ms" }}case class Search(worktime: List[Int], timeout: FiniteDuration) |
Важной частью является вызов метода after . Вы даете ему продолжительность, после которой будущее должно быть выполнено, и в качестве второго параметра — планировщик, который по умолчанию является системой акторов в нашем случае. Третий параметр — это будущее, которое должно исполниться. Я использую метод сопутствующего успеха в будущем, чтобы вернуть одну строку.
Остальная часть кода практически идентична. PipeTo — это шаблон akka, который возвращает результаты будущего отправителю. Ничего особенного здесь.
Теперь как все это назвать. Сначала код
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
object LogSearch extends App {println("Starting actor system")val system = ActorSystem("futures")println("Starting log search")try { // timeout for each search task val fallbackTimeout = 2 seconds // timeout use with akka.patterns.ask implicit val timeout = new Timeout(5 seconds) require(fallbackTimeout < timeout.duration) // Create SearchActor val search = system.actorOf(Props[LogSearchActor]) // Test worktimes for search val worktimes = List(1000, 1500, 1200, 800, 2000, 600, 3500, 8000, 250) // Asking for results val futureResults = (search ? Search(worktimes, fallbackTimeout)) // Cast to correct type .mapTo[List[String]] // In case something went wrong .recover { case e: TimeoutException => List("timeout") case e: Exception => List(e getMessage) } // Callback (non-blocking) .onComplete { case Success(results) => println(":: Results ::") results foreach (r => println(s" $r")) system shutdown () case Failure(t) => t printStackTrace () system shutdown () }} catch { case t: Throwable => t printStackTrace () system shutdown ()} // Await end of programm system awaitTermination (20 seconds)} |
Комментарии должны объяснить большинство частей. Этот пример полностью асинхронный и работает с обратными вызовами. Конечно, вы можете использовать результат вызова Await, как и раньше.
связи
- https://gist.github.com/muuki88/6099946
- http://doc.akka.io/docs/akka/2.1.0/scala/futures.html
- http://stackoverflow.com/questions/17672786/scala-future-sequence-and-timeout-handling
- http://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout