В приложении Akka наступает момент, когда актер может дольше выдерживать растущую нагрузку. Поскольку каждый субъект может обрабатывать только одно сообщение за раз, и он хранит в очереди ожидающие сообщения в очереди, называемой почтовым ящиком , существует риск перегрузки одного субъекта, если слишком много сообщений отправлено одному субъекту одновременно или субъект не может обрабатывать сообщения достаточно быстро — очередь будет расти и расти. Это отрицательно повлияет на быстродействие системы и может даже привести к сбою приложения.
На самом деле очень легко смоделировать такую нагрузку, просто посылая непрерывный поток сообщений актеру как можно быстрее:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
case object Ping class PingActor extends Actor { def receive = { case Ping => //don't do this at home! Thread sleep 1 } } object Main extends App { val system = ActorSystem( "Heavy" ) val client = system.actorOf(Props[PingActor], "Ping" ) while ( true ) { client ! Ping } } |
Конечно, вы не должны спать в актере, это просто для того, чтобы подчеркнуть почтовый ящик. Если вам (не) повезло и вы немного поиграли с количеством сна, ваше приложение вскоре проведет большую часть времени, выполняя (бесполезный) сбор данных, и вы можете увидеть ужасные ошибки OOM:
1
2
|
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Heavy-akka.actor.default-dispatcher-6" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Heavy-akka.actor.default-dispatcher-10" |
… и, наконец, умереть
1
2
3
|
Uncaught error from thread [Heavy-akka.actor.default-dispatcher-7] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[Heavy] java.lang.OutOfMemoryError: GC overhead limit exceeded |
Сегодня мы узнаем, как справляться с такими перегруженными участниками, чтобы внезапный всплеск трафика не приводил к краху всего приложения.
Маршрутизация и балансировка нагрузки
Одним из простых решений для уменьшения нагрузки на одного актера является распределение работы между несколькими копиями такого актера. Акка предоставляет встроенный
актер маршрутизации и балансировки нагрузки, который стоит впереди и управляет несколькими экземплярами нашего актера. Маршрутизатор выбирает (используя настраиваемую стратегию) один из базовых экземпляров, поэтому распределяет нагрузку:
1
2
3
|
val props = Props[PingActor]. withRouter(RoundRobinRouter(nrOfInstances = 10 )) val client = system.actorOf(props, "Ping" ) |
Мы попросили Акку поставить маршрутизатор Round Robin перед 10 независимыми экземплярами PingActor
(вместо одного). Теоретически это может сократить задержку на порядок. Итак, если маршрутизация настолько эффективна, почему бы не использовать ее по умолчанию и прозрачно, как пул Enterprise Java Beans?
Чтобы ответить на этот вопрос, нам нужен более сложный пример. PingActor
имеет состояния, поэтому его можно безопасно реплицировать за маршрутизатором. Но как насчет следующего актера?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
class StoreActor extends Actor { private var lastUsedId = 0 def receive = { case Store(s) => val id = nextUniqueId() slowStore(s, id) sender ! Done(id) } private def nextUniqueId() = { lastUsedId += 1 lastUsedId } private def slowStore(s: String, id: Int) { //... } } |
Очевидно, что StoreActor
предполагает, что существует только один экземпляр lastUsedId
и поскольку receive
никогда не вызывается одновременно, уникальность идентификаторов гарантируется. Мы генерируем уникальный идентификатор, сохраняем некоторое сообщение и отправляем сгенерированный идентификатор обратно клиенту.
К сожалению, в тот момент, когда мы помещаем любой маршрутизатор перед StoreActor
, каждая копия этого субъекта имеет собственную переменную lastUsedId
и дублирование неизбежно. Давайте переосмыслим наш дизайн. Чтобы генерировать уникальные идентификаторы, мы должны иметь только одну копию счетчика и ограничить доступ к нему. Но хранение, скорее всего, поточно-ориентированное, поэтому может быть распараллелено. Самое простое решение — использовать сопутствующий объект AtomicInteger
и AtomicInteger
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
//DIRTY! Close your eyes! object StoreActor { val lastUsedId = new AtomicInteger } class StoreActor extends Actor with ActorLogging { private def nextUniqueId() = StoreActor.lastUsedId.incrementAndGet() //... } |
Ну … Честно говоря, общее изменяемое состояние — вряд ли то, что мы ищем. Мы должны лучше изучить модель актора и выделить логику генерации идентификатора для отделения актора, продвигая принцип единой ответственности в качестве бонуса:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
case object GiveMeUniqueId class UniqueIdActor extends Actor { private var lastUsedId = 0 def receive = { case GiveMeUniqueId => lastUsedId += 1 sender ! lastUsedId } } |
Очевидно, что все экземпляры StoreActor
за маршрутизатором должны совместно использовать ссылку на один единственный экземпляр UniqueIdActor
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
class StoreActor(uniqueIdActor: ActorRef) extends Actor { private implicit val timeout = Timeout( 10 minutes) import context.dispatcher def receive = { case Store(s) => uniqueIdActor ? GiveMeUniqueId map { case id: Int => slowStore(s, id) Done(id) } pipeTo sender } private def slowStore(s: String, id: Int) { //... } } |
Как вы можете видеть, uniqueIdActor
передается конструктору актера. Очевидно, что мы не должны создавать новый UniqueIdActor
в каждом StoreActor
как это приведет к созданию 10 независимых дочерних копий, а не одного централизованного актера. Вот код клея:
1
2
3
4
|
val uniqueIdActor = system.actorOf(Props[UniqueIdActor], "UniqueId" ) val props = Props(classOf[StoreActor], uniqueIdActor). withRouter(RoundRobinRouter(nrOfInstances = 10 )) val client = system.actorOf(props, "Heavy" ) |
Программная транзакционная память
У вас может возникнуть ощущение, что отдельный актер, чтобы просто обернуть один Int
является излишним. С другой стороны, общедоступный изменяемый AtomicInteger
далек от AtomicInteger
Акки. Мы можем поэкспериментировать с программной транзакционной памятью в Akka, построенной на основе ScalaSTM . Мы обернем изменчивый Int
транзакцией Ref
и поделимся этой ссылкой со всеми StoreActor
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
class StoreActor(counter: Ref[Int]) extends Actor { def receive = { case Store(s) => val id = nextUniqueId() slowStore(s, id) sender ! Done(id) } private def nextUniqueId() = atomic { implicit tx => counter += 1 counter() } //... } |
На этот раз все экземпляры StoreActor
совместно используют транзакцию Ref[Int]
. Вызов nextUniqueId()
увеличивает counter
внутри транзакции, поэтому код является поточно-ориентированным. Гораздо более простая архитектура и синхронный nextUniqueId()
легче читать и поддерживать. Однако общая структура данных любого рода проблематична, особенно когда мы пытаемся масштабироваться. Но в качестве упражнения попробуйте заменить STM
агенты . Вот начальный код клея для STM:
1
2
3
4
5
6
|
import scala.concurrent.stm.Ref val globalUniqueId = Ref( 0 ) val props = Props(classOf[StoreActor], globalUniqueId). withRouter(RoundRobinRouter(nrOfInstances = 10 )) val client = system.actorOf(props, "Heavy" ) |
В идеальном мире может распределяться работа между несколькими актерами. Но что, если нам действительно нужен только один экземпляр, и он не поспевает за входящими сообщениями? В этом случае мы должны по крайней мере быстро потерпеть неудачу с
Ограниченный почтовый ящик
По умолчанию почтовые ящики ограничены только объемом имеющейся у нас памяти. Это означает, что один мошенник может влиять на всю систему, поскольку у каждого участника есть отдельный почтовый ящик, но все они используют одну и ту же кучу. Простое решение — ограничить размер почтового ящика и просто отбросить все, что выше указанного порога. К счастью, Akka поддерживает ограниченные почтовые ящики вне почтового ящика. В общем, если мы не справляемся с возрастающей нагрузкой, мы должны хотя бы быстро потерпеть неудачу, а не зависать вечно.
1
2
3
4
5
6
7
|
class StoreActor extends Actor with RequiresMessageQueue[BoundedMessageQueueSemantics] { private var lastUniqueId = 0 //... } |
Кроме того, вы должны настроить емкость очереди, либо в коде, либо в application.conf
:
1
2
3
4
5
|
bounded-mailbox { mailbox-type = "akka.dispatch.BoundedMailbox" mailbox-capacity = 1000 mailbox-push-timeout-time = 100ms } |
В этой конфигурации есть только один экземпляр StoreActor
который может StoreActor
очередь до 1000 сообщений. Если отправлено больше сообщений, они отбрасываются и перенаправляются в очередь StoreActor
, если только почтовый ящик StoreActor
не сжимается в течение 100 миллисекунд.
Резюме
Укорочение почтовых ящиков и быстрота действий актеров — это ключевой фактор, влияющий на скорость отклика и стабильность приложения Akka. Контролируя вашу систему, вы должны обнаруживать узкие места и либо увеличивать / уменьшать, либо быстро выходить из строя. В противном случае ваша JVM быстро начнет задыхаться и терять импульс.