Scala — это функциональный и объектно-ориентированный язык, который работает на JVM. Для параллельного и / или параллельного программирования это подходящий выбор вместе с платформой Akka , которая предоставляет богатый набор инструментов для всех видов параллельных задач. В этом посте я хочу показать небольшой пример того, как запланировать задание поиска по журналу на нескольких файлах / серверах с Futures и Actors .
Настроить
Я создал свою настройку с помощью шаблона Hello-Akka активатора Typesafe. В результате получается файл build.sbt со следующим содержимым:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
name := "" "hello-akka" "" version := "1.0" scalaVersion := "2.10.2" libraryDependencies ++ = Seq( "com.typesafe.akka" %% "akka-actor" % "2.2.0" , "com.typesafe.akka" %% "akka-testkit" % "2.2.0" , "com.google.guava" % "guava" % "14.0.1" , "org.scalatest" % "scalatest_2.10" % "1.9.1" % "test" , "junit" % "junit" % "4.11" % "test" , "com.novocode" % "junit-interface" % "0.7" % "test->default" ) testOptions + = Tests.Argument(TestFrameworks.JUnit, "-v" ) |
Scala встроенный фьючерс
В Scala уже есть встроенная поддержка Futures. Реализация основана на java.util.concurrent . Давайте реализуем будущее, которое запускает наш поиск по журналу.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
import scala.concurrent. _ import scala.concurrent.duration. _ import scala.concurrent.ExecutionContext.Implicits. _ object LogSearch extends App { println( "Starting log search" ) val searchFuture = future { Thread sleep 1000 "Found something" } println( "Blocking for results" ) val result = Await result (searchFuture, 5 seconds) println(s "Found $result" ) } |
Это все, что нам нужно для запуска нашей задачи в другом потоке. Неявный импорт из ExecutionContext предоставляет по умолчанию ExecutionContext, который обрабатывает потоки, в которых будет выполняться будущее. После создания будущего мы ждем с блокирующим вызовом Await result для наших результатов. Пока ничего особенного.
Будущая композиция
Есть много примеров, когда синтаксис for-yield используется для составления будущих результатов. В нашем случае у нас есть динамический список фьючерсов: результаты поиска в журнале с каждого сервера.
Для тестирования будущих возможностей мы создадим список фьючерсов из списка целых, представляющих время выполнения задачи. Типы только для пояснения.
1
2
3
4
5
6
7
|
val tasks = List( 3000 , 1200 , 1800 , 600 , 250 , 1000 , 1100 , 8000 , 550 ) val taskFutures : List[Future[String]] = tasks map { ms = > future { Thread sleep ms s "Task with $ms ms" } } |
В конце мы хотим получить List [String] в результате. Это делается с помощью сопутствующего объекта Futures.
1
|
val searchFuture : Future[List[String]] = Future sequence taskFutures |
И, наконец, мы можем дождаться наших результатов с
1
|
val result = Await result (searchFuture, 2 seconds) |
Однако это вызовет исключение TimeoutException , так как некоторые из наших задач выполняются более 2 секунд. Конечно, мы могли бы увеличить тайм-аут, но там всегда может произойти ошибка, когда сервер не работает. Другой подход — обработать исключение и вернуть ошибку. Однако все остальные результаты будут потеряны.
Будущее — тайм-аут
Нет проблем, мы создаем запасной вариант, который вернет значение по умолчанию, если операция займет слишком много времени. Очень наивная реализация для нашего запасного варианта может выглядеть так
1
2
3
4
|
def fallback[A](default : A, timeout : Duration) : Future[A] = future { Thread sleep timeout.toMillis default } |
Аварийное будущее вернется после того, как исполняющий поток спит в течение времени ожидания. Код вызова теперь выглядит следующим образом.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
val timeout = 2 seconds val tasks = List( 3000 , 1200 , 1800 , 600 , 250 , 1000 , 1100 , 8000 , 550 ) val taskFutures : List[Future[String]] = tasks map { ms = > val search = future { Thread sleep ms s "Task with $ms ms" } Future firstCompletedOf Seq(search, fallback(s "timeout $ms" , timeout)) } val searchFuture : Future[List[String]] = Future sequence taskFutures println( "Blocking for results" ) val result = Await result (searchFuture, timeout * tasks.length) println(s "Found $result" ) |
Важным вызовом здесь является Future firstCompletedOf Seq (..), который создает будущее, возвращающее результат первого законченного будущего.
Эта реализация очень плохая, как обсуждалось здесь . Короче говоря: мы тратим процессорное время, усыпляя потоки. Кроме того, время ожидания блокировки является более или менее предположительным. С однопоточным планировщиком это может занять больше времени.
Фьючерсы и Акка
Теперь давайте сделаем это более производительным и надежным. Наша главная цель — избавиться от плохой резервной реализации, которая блокировала весь поток. Идея состоит в том, чтобы запланировать функцию отката по истечении заданного времени. Таким образом, у вас есть все потоки, работающие на реальном, в то время как запасное время будущего выполнения почти равно нулю. Java имеет ScheduledExecutorService самостоятельно или вы можете использовать другую реализацию, HashedWheelTimer , от Netty. Акка раньше использовал HashWheelTimer, но теперь имеет собственную реализацию .
Итак, начнем с актера.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import akka.actor. _ import akka.pattern.{ after, ask, pipe } import akka.util.Timeout class LogSearchActor extends Actor { def receive = { case Search(worktimes, timeout) = > // Doing all the work in one actor using futures val searchFutures = worktimes map { worktime = > val searchFuture = search(worktime) val fallback = after(timeout, context.system.scheduler) { Future successful s "$worktime ms > $timeout" } Future firstCompletedOf Seq(searchFuture, fallback) } // Pipe future results to sender (Future sequence searchFutures) pipeTo sender } def search(worktime : Int) : Future[String] = future { Thread sleep worktime s "found something in $worktime ms" } } case class Search(worktime : List[Int], timeout : FiniteDuration) |
Важной частью является вызов метода after . Вы даете ему продолжительность, после которой будущее должно быть выполнено, и в качестве второго параметра — планировщик, который по умолчанию является системой акторов в нашем случае. Третий параметр — это будущее, которое должно исполниться. Я использую метод сопутствующего успеха в будущем, чтобы вернуть одну строку.
Остальная часть кода практически идентична. PipeTo — это шаблон akka, который возвращает результаты будущего отправителю. Ничего особенного здесь.
Теперь как все это назвать. Сначала код
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
object LogSearch extends App { println( "Starting actor system" ) val system = ActorSystem( "futures" ) println( "Starting log search" ) try { // timeout for each search task val fallbackTimeout = 2 seconds // timeout use with akka.patterns.ask implicit val timeout = new Timeout( 5 seconds) require(fallbackTimeout < timeout.duration) // Create SearchActor val search = system.actorOf(Props[LogSearchActor]) // Test worktimes for search val worktimes = List( 1000 , 1500 , 1200 , 800 , 2000 , 600 , 3500 , 8000 , 250 ) // Asking for results val futureResults = (search ? Search(worktimes, fallbackTimeout)) // Cast to correct type .mapTo[List[String]] // In case something went wrong .recover { case e : TimeoutException = > List( "timeout" ) case e : Exception = > List(e getMessage) } // Callback (non-blocking) .onComplete { case Success(results) = > println( ":: Results ::" ) results foreach (r = > println(s " $r" )) system shutdown () case Failure(t) = > t printStackTrace () system shutdown () } } catch { case t : Throwable = > t printStackTrace () system shutdown () } // Await end of programm system awaitTermination ( 20 seconds) } |
Комментарии должны объяснить большинство частей. Этот пример полностью асинхронный и работает с обратными вызовами. Конечно, вы можете использовать результат вызова Await, как и раньше.
связи
- https://gist.github.com/muuki88/6099946
- http://doc.akka.io/docs/akka/2.1.0/scala/futures.html
- http://stackoverflow.com/questions/17672786/scala-future-sequence-and-timeout-handling
- http://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout