Статьи

Разработка современных приложений с помощью Scala: параллелизм и параллелизм с Akka

Эта статья является частью нашего академического курса под названием « Разработка современных приложений с помощью Scala» .

В этом курсе мы предоставляем среду и набор инструментов, чтобы вы могли разрабатывать современные приложения Scala. Мы охватываем широкий спектр тем: от сборки SBT и реактивных приложений до тестирования и доступа к базе данных. С нашими простыми учебными пособиями вы сможете запустить и запустить собственные проекты за минимальное время. Проверьте это здесь !

Было бы справедливо сказать, что эпоха компьютеров с одним процессором (или только одним ядром) в наши дни в основном стала забытой историей. Теперь большинство устройств, независимо от их размера, были построены с использованием многоядерных архитектур ЦП, направленных на увеличение общей вычислительной мощности.

Такие достижения и инновации в области аппаратного обеспечения заставили нас переосмыслить подходы к разработке и запуску программных систем для эффективного использования всех доступных ресурсов.

1. Введение

В этом разделе руководства мы поговорим о двух основных концепциях, на которые опираются современные программные системы: параллелизм и параллелизм. Хотя они очень близки друг к другу, есть небольшое, но важное различие. Параллелизм описывает процесс, когда несколько задач могут прогрессировать с течением времени, в то время как параллелизм описывает процесс, когда несколько задач выполняются одновременно.

2. Темы

Традиционно большинство параллельных моделей программирования строились вокруг потоков, и Java не была исключением. Типичное приложение JVM (запускаемое как процесс) может порождать множество потоков, чтобы выполнять некоторую работу одновременно или, в идеале, параллельно.

JVM процесс

Рис.1 Типичный процесс JVM порождает пару потоков

Некоторое время эта модель работала довольно хорошо, но есть по крайней мере один фундаментальный недостаток моделей параллелизма на основе потоков: совместное использование состояний или ресурсов. В большинстве случаев потоки должны обмениваться общими данными или использовать другие общие ресурсы для выполнения своей работы. Без надлежащих механизмов неконтролируемый доступ к совместно используемым ресурсам или изменение общего состояния несколькими потоками (известными как условия гонки ) приводят к повреждению данных и часто вызывают сбой приложения.

синхронизация

Рис.2 Использование синхронизации для доступа к общему состоянию

Правильное использование примитивов синхронизации (блокировок, мьютексов, семафоров, мониторов и т. Д.) Решает проблему доступа к общему состоянию (или ресурсу), однако цена за него очень высока. Мало того, что это усложняет модели программирования, недетерминированный характер многопоточных потоков выполнения делает процесс устранения проблем очень трудоемким и сложным. Более того, возник целый новый класс проблем: конфликт блокировок, истощение потоков, взаимоблокировки, блокировки реального времени и многое другое.

Наряду с этим управление потоками и потоками потребляет довольно много ресурсов базовой операционной системы и, по сути, в какой-то момент создание большего количества потоков фактически отрицательно скажется на производительности и масштабируемости приложения.

3. Реакторы и петли событий

Недостатки моделей параллелизма на основе потоков обратили внимание отрасли на поиск альтернатив, более адекватно отвечающих требованиям современных программных систем. Паттерн реактора и обработка событий , основа современных парадигм асинхронного и неблокирующего программирования, являются одними из новых моделей программирования, которые можно набирать с параллелизмом в масштабе.

реактор

Рис. 3 Схема реактора (упрощенно)

Обратите внимание, что это очень упрощенная визуализация одной из возможных реализаций шаблона Reactor, однако она довольно хорошо иллюстрирует его ключевые элементы. В основе Reactor лежит однопоточный цикл обработки событий. В рамках обработки событий может использоваться один или несколько потоков (часто сгруппированных в пулы) для выполнения фактической работы, однако назначение и использование потоков в этой модели весьма различны и полностью защищены от приложений.

4. Актеры и сообщения

Передача сообщений , или, если быть более точным, асинхронная передача сообщений является еще одной очень интересной и мощной моделью параллелизма, которая в последнее время приобрела большую популярность. Модель Actor , созданная еще в 1973 году, является одним из лучших примеров асинхронного параллельного обмена сообщениями.

Актеры являются основными объектами модели акторов. Каждый субъект имеет собственную очередь сообщений (называемую почтовым ящиком), однопоточный обработчик сообщений и взаимодействует с другими субъектами только посредством асинхронных сообщений (не исключая того факта, что субъекты также могут создавать других участников).

актер

Рис. 4 Актер в Акке

Это типичная архитектура без участия: у актеров может быть собственное состояние, но они никогда не делятся ничем с другими актерами. Участники могут жить в одном и том же процессе JVM или нескольких процессах JVM на одном физическом узле, или даже распространяться по сети, это не имеет большого значения, поскольку они могут ссылаться друг на друга и обмениваться сообщениями.

5. Познакомьтесь с Аккой

Мы уже встречали инструментарий Akka, когда говорили о реактивных приложениях и узнали об Akka Streams . Однако, возвращаясь к истории, стоит упомянуть, что инструментарий Akka начался как реализация Actor Model на платформе JVM. С тех пор он видел много релизов (последний из которых был 2.4.10 ), получил много дополнительных функций и возможностей, но, тем не менее, актеры являются голыми костями Акки даже сегодня.

ActorSystem является точкой входа во вселенную актеров Akka . Это единственное место в приложении для создания и управления актерами.

1
implicit val system = ActorSystem("akka-actors")

С этим мы готовы создавать новых актеров! В Akka каждый актер должен создавать подкласс (или смешивать с ним) черту Actor и реализовывать функцию receive , например:

1
2
3
4
5
6
7
import akka.actor.Actor
 
class SampleActor extends Actor {
  def receive = {
    ...
  }
}

Но чаще, чем обычно, вы также включали бы черту ActorLogging в микс, чтобы иметь доступ к выделенному регистратору, используя ссылку на журнал, например:

1
2
3
4
5
6
7
8
import akka.actor.Actor
import akka.actor.ActorLogging
 
class SampleActor extends Actor with ActorLogging {
  def receive = {
    case _ => log.info("Received some message!")
  }
}

Хотя наш SampleActor на данный момент не делает ничего особенного, мы можем создать его экземпляр и отправить ему буквально любое сообщение. Как мы уже упоминали, акторы создаются только через экземпляр ActorSystem, а не с использованием оператора new , например:

1
2
val sampleActor = system.actorOf(Props[SampleActor], "sample-actor")
sampleActor ! "Message!"

Если вы думаете, что переменная sampleActor является экземпляром класса SampleActor , вы не совсем правы. На самом деле это ссылка на актер SampleActor , представленный в виде класса ActorRef . Это единственный механизм для обращения к конкретному субъекту в Akka , прямой доступ к базовым экземплярам класса субъекта недоступен.

Что произойдет, когда мы запустим приложение? Ничего особенного, кроме того, что мы должны увидеть что-то подобное в консоли:

1
[INFO] [akka-actors-akka.actor.default-dispatcher-2] [akka://akka-actors/user/sample-actor] Received some message!

6. Надзор

Интересное, но очень важное свойство актеров в Akka — это то, что они организованы в иерархии. Естественно, что актеры могут порождать дочерних акторов, чтобы разделить работу на более мелкие части и, таким образом, сформировать иерархию родителей / детей.

Это звучит как незначительная деталь, но это не потому, что в Akka родительские актеры могут наблюдать за своими детьми, процесс, известный как наблюдение . В этом случае родительские субъекты по существу становятся супервайзерами и могут применять различные стратегии в случае, когда дочерние субъекты сталкиваются с ошибками (или, если быть более точным, завершаются с исключением).

Официальная документация подробно обсуждает значения по умолчанию и различные стратегии надзора, но давайте взглянем на быстрый пример. Наш ChildActor определен таким образом, что всегда выдает исключение при получении любого Message .

1
2
3
4
5
6
class ChildActor extends Actor with ActorLogging {
  def receive = {
    case Message(m) =>
      throw new IllegalStateException("Something unexpected happened")
  }
}

Следовательно, наш актер ParentActor создает экземпляр ChildActor и перенаправляет любое полученное сообщение в экземпляр ChildActor .

01
02
03
04
05
06
07
08
09
10
11
12
class ParentActor extends Actor with ActorLogging {
  val child = context.actorOf(Props[ChildActor])
   
  override val supervisorStrategy = OneForOneStrategy() {
    case _: IllegalStateException => Resume
    case _: Exception => Escalate
  }
 
  def receive = {
    case _ => child ! Message("Message from parent")
  }
}

Согласно стратегии надзора по умолчанию, субъект, который вызывает Exception , будет перезапущен (что, вероятно, является желаемым поведением в большинстве случаев). Однако в нашем примере мы переписали эту политику (используя свойство supervisorStrategy объекта ParentActor ), чтобы возобновить обычную обработку сообщений надзорных акторов ( ChildActor ) только в случае IllegalStateException .

7. Шаблоны

В своей основной форме актеры в Akka общаются посредством асинхронных односторонних сообщений. Это, безусловно, работает, однако для многих реальных сценариев требуется более сложное взаимодействие, например, с использованием обмена запросами / ответами, прерывателями цепи, передачей сообщений между участниками и другими. К счастью, пакет akka.pattern предоставляет набор часто используемых шаблонов Akka , готовых к применению. Например, давайте немного SampleActor реализацию SampleActor для обработки сообщений типа Message и после получения ответим с помощью MessageReply .

1
2
3
4
5
6
7
8
case class Message(message: String)
case class MessageReply(reply: String)
 
class SampleActor extends Actor with ActorLogging {
  def receive = {
    case Message(m) => sender ! MessageReply(s"Reply: $m")
  }
}

Теперь мы можем использовать шаблон запроса для отправки сообщения актеру и ожидания ответа, например:

1
2
3
4
5
import akka.pattern.ask
import akka.util.Timeout
 
implicit val timeout: Timeout = 1 second
val reply = (sampleActor ? Message("Please reply!")).mapTo[MessageReply]

В этом случае отправитель отправляет сообщение субъекту и ожидает ответа (с 1-секундным таймаутом). Пожалуйста, обратите внимание, что типичные актеры Akka не поддерживают никакой семантики безопасности типов в отношении сообщений: все может быть отправлено, а также получено в качестве ответа. Отправитель несет ответственность за правильное приведение типов (например, с mapTo метода mapTo ). Точно так же, если отправитель отправляет сообщение субъекту, который не знает, как с ним работать, сообщение заканчивается пустыми буквами .

8. Типизированные актеры

Как было упомянуто ранее, актеры в Akka не предлагают никакой безопасности в отношении сообщений, которые они принимают или отвечают. Но уже довольно давно Akka включает в себя экспериментальную поддержку Typed Actors , где контакты являются явными и будут выполняться компилятором.

Определение Typed Actors сильно отличается от обычных актеров Akka и во многом напоминает способ, которым мы использовали для построения систем в стиле RPC . Прежде всего, мы должны начать с определения интерфейса и его реализации, например:

1
2
3
4
5
6
7
trait Typed {
  def send(message: String): Future[String]
}
 
class SampleTypedActor extends Typed {
  def send(message: String): Future[String] = Future.successful("Reply: " + message)
}

В свою очередь, способ создания Typed Actors требует немного больше кода, хотя все еще использует ActorSystem под капотом.

1
2
3
4
5
6
7
8
implicit val system = ActorSystem("typed-akka-actors")
   
val sampleTypedActor: Typed =
  TypedActor(system).typedActorOf(TypedProps[SampleTypedActor]())
   
val reply = sampleTypedActor
  .send("Hello Typed Actor!")
  .andThen { case Success(r) => println(r) }

В этот момент вам может прийти в голову логичный вопрос: не следует ли использовать Typed Actors везде? Хороший вопрос, но краткий ответ: нет, вероятно, нет. Если вам любопытно, пожалуйста, уделите немного времени обсуждению этой замечательной дискуссии о плюсах и минусах использования типизированных актеров по сравнению с обычными нетипизированными.

9. Планировщик

Помимо обеспечения превосходной реализации Actor Model , Akka также предлагает довольно много полезных утилит. Одним из них является поддержка планирования, которая предоставляет возможность отправлять сообщение определенному субъекту периодически или в определенный момент времени.

1
2
3
4
5
implicit val system = ActorSystem("akka-utilities")
import system.dispatcher
   
val sampleActor = system.actorOf(Props[SampleActor], "sample-actor"
system.scheduler.schedule(0 seconds, 100 milliseconds, sampleActor, "Wake up!")

Само собой разумеется, что возможность планировать выполнение некоторых задач является требованием для большинства реальных приложений, поэтому очень удобно иметь эту функцию доступной из коробки.

10. Event Bus

Еще одна очень полезная утилита, в которой Akka содержит общую концепцию шины событий и ее конкретную реализацию, предоставляемую ActorSystem в виде потока событий .

Если взаимодействие субъект-субъект предполагает, что отправитель сообщения каким-то образом знает, кто является получателем, с помощью потока событий субъекты имеют возможность передавать любые события (сообщения какого-либо типа) любому другому субъекту, без какого-либо предварительного знания, кто будет получить это. В этом случае заинтересованные стороны должны выразить свою заинтересованность, подписавшись на такие мероприятия по их типу.

Например, предположим, у нас есть важное сообщение, которое мы просто называем Event .

1
case class Event(id: Int)

Наш SampleEventActor предназначен для обработки такого рода сообщений и выводит на консоль id сообщения каждый раз, когда он его получает.

1
2
3
4
5
class SampleEventActor extends Actor with ActorLogging {
  def receive = {
    case Event(id) => log.info(s"Event with '$id' received")
  }
}

Выглядит просто, но на данный момент ничего особенного. Теперь давайте посмотрим на поток событий и опубликуем / подпишем шаблон связи в действии.

1
2
3
4
5
6
7
8
implicit val system = ActorSystem("akka-utilities")
   
val sampleEventActor = system.actorOf(Props[SampleEventActor]) 
system.eventStream.subscribe(sampleEventActor, classOf[Event])
   
system.eventStream.publish(Event(1))
system.eventStream.publish(Event(2))
system.eventStream.publish(Event(3))

Наш sampleEventActor выражает интерес к получению сообщений типа Event , вызывая system.eventStream.subscribe() . Теперь, каждый раз, когда Event будет публиковаться с помощью system.eventStream.publish() , sampleEventActor будет получать его независимо от того, кто был издателем. При включенном ведении журнала мы увидим нечто подобное в выводе консоли:

1
2
3
[INFO] [akka-utilities-akka.actor.default-dispatcher-2] [akka://akka-utilities/user/$a] Event with '1' received
[INFO] [akka-utilities-akka.actor.default-dispatcher-2] [akka://akka-utilities/user/$a] Event with '2' received
[INFO] [akka-utilities-akka.actor.default-dispatcher-2] [akka://akka-utilities/user/$a] Event with '3' received

11. Дистанционное

Все примеры, которые мы видели до сих пор, касались только одной ActorSystem , работающей в одной JVM в пределах одного узла. Но сетевые расширения Akka поддерживают развертывание с несколькими JVM / несколькими узлами, поэтому разные ActorSystem могут взаимодействовать друг с другом в действительно распределенной среде.

Чтобы включить удаленные возможности ActorSystem, нам нужно изменить поставщика ссылок на акторов по умолчанию и включить сетевой транспорт. Все это легко сделать с помощью файла конфигурации application.conf :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
   
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty {
        tcp {
          hostname = "localhost"
          port = ${port}
        }
    }
  }
}

В качестве упражнения мы собираемся запустить два экземпляра ActorSystem , один с именем akka-remote-1 на порту 12000 , а другой — akka-remote-2 на порту 12001 . Мы также собираемся определить одного актера для взаимодействия с SampleRemoteActor .

1
2
3
4
5
class SampleRemoteActor extends Actor with ActorLogging {
  def receive = {
    case m: Any => log.info(s"Received: $m")
  }
}

В первой системе ActorSystem , akka-remote-1 , мы собираемся создать экземпляр SampleRemoteActor и отправить ему одно сообщение.

1
2
3
4
implicit val system = ActorSystem("akka-remote-1")
   
val sampleActor = system.actorOf(Props[SampleRemoteActor], "sample-actor")
sampleActor ! "Message from Actor System #1!"

Однако на втором, akka-remote-2 , мы собираемся отправить сообщение экземпляру SampleRemoteActor используя его удаленную ссылку, которая среди прочего включает имя akka-remote-1 ( akka-remote-1 ), хост ( localhost ), порт ( 12000 ) и имя актера (в нашем случае это sample-actor ):

1
2
3
4
5
implicit val system = ActorSystem("akka-remote-2")
   
val sampleActor = system.actorSelection(
  "akka.tcp://akka-remote-1@localhost:12000/user/sample-actor")
sampleActor ! "Message from Actor System #2!"

Довольно просто, не правда ли? Одновременный запуск обоих экземпляров ActorSystem приведет к следующему выводу в консоли процесса akka-remote-1 :

1
2
3
4
5
[INFO] [main] [akka.remote.Remoting] Starting remoting
[INFO] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://akka-remote-1@localhost:12000]
[INFO] [main] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://akka-remote-1@localhost:12000]
[INFO] [akka-remote-1-akka.actor.default-dispatcher-2] [akka.tcp://akka-remote-1@localhost:12000/user/sample-actor] Received: Message from Actor System #1!
[INFO] [akka-remote-1-akka.actor.default-dispatcher-4] [akka.tcp://akka-remote-1@localhost:12000/user/sample-actor] Received: Message from Actor System #2!

Помимо того, что мы видели до сих пор, не только одна система акторов может ссылаться на акторов из других систем акторов, но и удаленно создавать новые экземпляры актеров.

12. Тестирование

Akka включает превосходную поддержку тестирования, чтобы гарантировать, что каждый аспект поведения и взаимодействий актеров может быть покрыт. Фактически, Akka TestKit предоставляет соответствующие леса для написания традиционных модульных тестов, а также интеграционных тестов.

Методы модульного тестирования вращаются вокруг TestActorRef, что упрощает обычную реализацию ActorRef без параллелизма и доступа к внутренним объектам состояний акторов . Давайте начнем с этого и SampleActor простой модульный тест для нашего SampleActor с использованием уже знакомой нам инфраструктуры specs2 .

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
class SampleActorTest extends Specification with AfterAll {
  implicit val timeout: Timeout = 1 second
  implicit lazy val system = ActorSystem("test")
   
  "Sample actor" >> {
    "should reply on message" >> { implicit ee: ExecutionEnv =>
      val actorRef = TestActorRef(new SampleActor)
      actorRef ? Message("Hello") must be_==(MessageReply("Reply: Hello")).await
    }
  }
   
  def afterAll() = {
    system.terminate()
  }
}

Модульное тестирование, безусловно, является очень хорошим началом, но в то же время оно часто весьма ограничено, поскольку опирается на упрощенное представление о системе. Однако, опять же, благодаря Akka TestKit , в нашем распоряжении есть более мощные методы тестирования.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class SampleActorIntegrationTest extends TestKit(ActorSystem("test"))
    with ImplicitSender with WordSpecLike with BeforeAndAfterAll {
   
  "Sample actor" should {
    "should reply on message" in {
      val actorRef = system.actorOf(Props[SampleActor])
      actorRef ! Message("Hello")
      expectMsg(MessageReply("Reply: Hello"))
    }
     
    "should log an event" in {
      val actorRef = system.actorOf(Props[SampleActor])     
      EventFilter.info(
        message = "Event with '100' received", occurrences = 1) intercept {
        actorRef ! Event(100)
      }
    }
  }
   
  override def afterAll() = {
    shutdown()
  }
}

На этот раз мы использовали перспективу фреймворка ScalaTest и применили немного другой подход, опираясь на класс TestKit, который предлагает богатый набор утверждений относительно ожиданий сообщений. Мы не только имеем возможность шпионить за сообщениями, мы также можем делать утверждения относительно ожидаемых записей журнала, используя класс EventFilter , поддерживаемый TestEventListener в файле application.conf .

1
2
3
4
5
akka {
  loggers = [
    akka.testkit.TestEventListener
  ]
}

Действительно, тестовые примеры выглядят простыми, удобочитаемыми и удобными в обслуживании. Однако возможности тестирования Akka на этом не заканчиваются и продолжают развиваться очень быстро. Например, стоит упомянуть о наличии экспериментальной поддержки многоузлового тестирования .

13. Выводы

Akka — потрясающий инструментарий и служит прочной основой для многих других библиотек и фреймворков. В качестве реализации Actor Model он предлагает еще один подход к параллелизму и параллелизму, использующий асинхронную передачу сообщений и способствующий неизменности.

Стоит отметить, что в наши дни Акка активно развивается и выходит далеко за рамки Актерской модели . С каждым новым выпуском в него входит все больше и больше инструментов (стабильных или экспериментальных) для построения высококонкурентных и распределенных систем. Многие из его расширенных функций, таких как кластеризация , постоянство , конечные автоматы , маршрутизация ,… мы вообще не затрагивали, но официальная документация Akka — лучшее место для ознакомления со всеми из них.

14. Что дальше

В следующем разделе урока мы поговорим об Play! Framework : мощная, высокопроизводительная и многофункциональная платформа для создания масштабируемых, полноценных веб-приложений в Scala .