Scala — это функциональный и объектно-ориентированный язык, который работает на JVM. Для параллельного и / или параллельного программирования это подходящий выбор вместе с платформой Akka , которая предоставляет богатый набор инструментов для всех видов параллельных задач. В этом посте я хочу показать небольшой пример того, как запланировать задание поиска по журналу на нескольких файлах / серверах с Futures и Actors .
Настроить
Я создал свою настройку с помощью шаблона Hello-Akka активатора Typesafe. В результате получается файл build.sbt со следующим содержимым:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 | name :="""hello-akka"""version :="1.0"scalaVersion :="2.10.2"libraryDependencies ++=Seq(  "com.typesafe.akka"%%"akka-actor"%"2.2.0",  "com.typesafe.akka"%%"akka-testkit"%"2.2.0",  "com.google.guava"%"guava"%"14.0.1",  "org.scalatest"%"scalatest_2.10"%"1.9.1"%"test",  "junit"%"junit"%"4.11"%"test",  "com.novocode"%"junit-interface"%"0.7"%"test->default")testOptions +=Tests.Argument(TestFrameworks.JUnit, "-v") | 
Scala встроенный фьючерс
В Scala уже есть встроенная поддержка Futures. Реализация основана на java.util.concurrent . Давайте реализуем будущее, которое запускает наш поиск по журналу.
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 | importscala.concurrent._importscala.concurrent.duration._importscala.concurrent.ExecutionContext.Implicits._objectLogSearch extendsApp {println("Starting log search")valsearchFuture =future {  Thread sleep 1000  "Found something"}println("Blocking for results")  valresult =Await result (searchFuture, 5seconds)  println(s"Found $result")} | 
Это все, что нам нужно для запуска нашей задачи в другом потоке. Неявный импорт из ExecutionContext предоставляет по умолчанию ExecutionContext, который обрабатывает потоки, в которых будет выполняться будущее. После создания будущего мы ждем с блокирующим вызовом Await result для наших результатов. Пока ничего особенного.
Будущая композиция
Есть много примеров, когда синтаксис for-yield используется для составления будущих результатов. В нашем случае у нас есть динамический список фьючерсов: результаты поиска в журнале с каждого сервера.
Для тестирования будущих возможностей мы создадим список фьючерсов из списка целых, представляющих время выполнения задачи. Типы только для пояснения.
| 1 2 3 4 5 6 7 | valtasks =List(3000, 1200, 1800, 600, 250, 1000, 1100, 8000, 550)valtaskFutures:List[Future[String]] =tasks map { ms =>  future {    Thread sleep ms    s"Task with $ms ms"  }} | 
В конце мы хотим получить List [String] в результате. Это делается с помощью сопутствующего объекта Futures.
| 1 | valsearchFuture:Future[List[String]] =Future sequence taskFutures | 
И, наконец, мы можем дождаться наших результатов с
| 1 | valresult =Await result (searchFuture, 2seconds) | 
Однако это вызовет исключение TimeoutException , так как некоторые из наших задач выполняются более 2 секунд. Конечно, мы могли бы увеличить тайм-аут, но там всегда может произойти ошибка, когда сервер не работает. Другой подход — обработать исключение и вернуть ошибку. Однако все остальные результаты будут потеряны.
Будущее — тайм-аут
Нет проблем, мы создаем запасной вариант, который вернет значение по умолчанию, если операция займет слишком много времени. Очень наивная реализация для нашего запасного варианта может выглядеть так
| 1 2 3 4 | deffallback[A](default:A, timeout:Duration):Future[A] =future {  Thread sleep timeout.toMillis  default} | 
Аварийное будущее вернется после того, как исполняющий поток спит в течение времени ожидания. Код вызова теперь выглядит следующим образом.
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 | valtimeout =2secondsvaltasks =List(3000, 1200, 1800, 600, 250, 1000, 1100, 8000, 550)valtaskFutures:List[Future[String]] =tasks map { ms =>valsearch =future {  Thread sleep ms  s"Task with $ms ms"}Future firstCompletedOf Seq(search,  fallback(s"timeout $ms", timeout))}valsearchFuture:Future[List[String]] =Future sequence taskFuturesprintln("Blocking for results")valresult =Await result (searchFuture, timeout * tasks.length)println(s"Found $result") | 
Важным вызовом здесь является Future firstCompletedOf Seq (..), который создает будущее, возвращающее результат первого законченного будущего.
Эта реализация очень плохая, как обсуждалось здесь . Короче говоря: мы тратим процессорное время, усыпляя потоки. Кроме того, время ожидания блокировки является более или менее предположительным. С однопоточным планировщиком это может занять больше времени.
Фьючерсы и Акка
Теперь давайте сделаем это более производительным и надежным. Наша главная цель — избавиться от плохой резервной реализации, которая блокировала весь поток. Идея состоит в том, чтобы запланировать функцию отката по истечении заданного времени. Таким образом, у вас есть все потоки, работающие на реальном, в то время как запасное время будущего выполнения почти равно нулю. Java имеет ScheduledExecutorService самостоятельно или вы можете использовать другую реализацию, HashedWheelTimer , от Netty. Акка раньше использовал HashWheelTimer, но теперь имеет собственную реализацию .
Итак, начнем с актера.
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | importakka.actor._importakka.pattern.{ after, ask, pipe }importakka.util.TimeoutclassLogSearchActor extendsActor {  defreceive ={    caseSearch(worktimes, timeout) =>      // Doing all the work in one actor using futures      valsearchFutures =worktimes map { worktime =>      valsearchFuture =search(worktime)      valfallback =after(timeout, context.system.scheduler) {          Future successful s"$worktime ms > $timeout"        }        Future firstCompletedOf Seq(searchFuture, fallback)      }      // Pipe future results to sender      (Future sequence searchFutures) pipeTo sender    }  defsearch(worktime:Int):Future[String] =future {      Thread sleep worktime      s"found something in $worktime ms"  }}caseclassSearch(worktime:List[Int], timeout:FiniteDuration) | 
Важной частью является вызов метода after . Вы даете ему продолжительность, после которой будущее должно быть выполнено, и в качестве второго параметра — планировщик, который по умолчанию является системой акторов в нашем случае. Третий параметр — это будущее, которое должно исполниться. Я использую метод сопутствующего успеха в будущем, чтобы вернуть одну строку.
Остальная часть кода практически идентична. PipeTo — это шаблон akka, который возвращает результаты будущего отправителю. Ничего особенного здесь.
Теперь как все это назвать. Сначала код
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | objectLogSearch extendsApp {println("Starting actor system")valsystem =ActorSystem("futures")println("Starting log search")try{  // timeout for each search task  valfallbackTimeout =2seconds  // timeout use with akka.patterns.ask  implicitvaltimeout =newTimeout(5seconds)  require(fallbackTimeout < timeout.duration)   // Create SearchActor   valsearch =system.actorOf(Props[LogSearchActor])   // Test worktimes for search   valworktimes =List(1000, 1500, 1200, 800, 2000, 600, 3500, 8000, 250)   // Asking for results   valfutureResults =(search ? Search(worktimes, fallbackTimeout))     // Cast to correct type     .mapTo[List[String]]     // In case something went wrong     .recover {        casee:TimeoutException => List("timeout")       casee:Exception => List(e getMessage)  }  // Callback (non-blocking)  .onComplete {      caseSuccess(results) =>         println(":: Results ::")         results foreach (r => println(s" $r"))         system shutdown ()      caseFailure(t) =>         t printStackTrace ()      system shutdown ()  }} catch{  caset:Throwable =>  t printStackTrace ()  system shutdown ()}  // Await end of programm  system awaitTermination (20seconds)} | 
Комментарии должны объяснить большинство частей. Этот пример полностью асинхронный и работает с обратными вызовами. Конечно, вы можете использовать результат вызова Await, как и раньше.
связи
- https://gist.github.com/muuki88/6099946
- http://doc.akka.io/docs/akka/2.1.0/scala/futures.html
- http://stackoverflow.com/questions/17672786/scala-future-sequence-and-timeout-handling
- http://stackoverflow.com/questions/16304471/scala-futures-built-in-timeout