Статьи

Управление перегруженными актерами в Акке

Hovedøya

Hovedøya

В приложении Akka наступает момент, когда актер может дольше выдерживать растущую нагрузку. Поскольку каждый субъект может обрабатывать только одно сообщение за раз, и он хранит в очереди ожидающие сообщения в очереди, называемой почтовым ящиком , существует риск перегрузки одного субъекта, если слишком много сообщений отправлено одному субъекту одновременно или субъект не может обрабатывать сообщения достаточно быстро — очередь будет расти и расти. Это отрицательно повлияет на быстродействие системы и может даже привести к сбою приложения.

На самом деле очень легко смоделировать такую ​​нагрузку, просто посылая непрерывный поток сообщений актеру как можно быстрее:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
case object Ping
  
class PingActor extends Actor {
  
    def receive = {
        case Ping =>
            //don't do this at home!
            Thread sleep 1
    }
}
  
object Main extends App {
    val system = ActorSystem("Heavy")
    val client = system.actorOf(Props[PingActor], "Ping")
    while(true) {
        client ! Ping
    }
}

Конечно, вы не должны спать в актере, это просто для того, чтобы подчеркнуть почтовый ящик. Если вам (не) повезло и вы немного поиграли с количеством сна, ваше приложение вскоре проведет большую часть времени, выполняя (бесполезный) сбор данных, и вы можете увидеть ужасные ошибки OOM:

1
2
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Heavy-akka.actor.default-dispatcher-6"
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "Heavy-akka.actor.default-dispatcher-10"

… и, наконец, умереть

1
2
3
Uncaught error from thread [Heavy-akka.actor.default-dispatcher-7] shutting down JVM
since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[Heavy]
java.lang.OutOfMemoryError: GC overhead limit exceeded

Сегодня мы узнаем, как справляться с такими перегруженными участниками, чтобы внезапный всплеск трафика не приводил к краху всего приложения.

Маршрутизация и балансировка нагрузки

Одним из простых решений для уменьшения нагрузки на одного актера является распределение работы между несколькими копиями такого актера. Акка предоставляет встроенный
актер маршрутизации и балансировки нагрузки, который стоит впереди и управляет несколькими экземплярами нашего актера. Маршрутизатор выбирает (используя настраиваемую стратегию) один из базовых экземпляров, поэтому распределяет нагрузку:

1
2
3
val props = Props[PingActor].
       withRouter(RoundRobinRouter(nrOfInstances = 10))
val client = system.actorOf(props, "Ping")

Мы попросили Акку поставить маршрутизатор Round Robin перед 10 независимыми экземплярами PingActor (вместо одного). Теоретически это может сократить задержку на порядок. Итак, если маршрутизация настолько эффективна, почему бы не использовать ее по умолчанию и прозрачно, как пул Enterprise Java Beans?

Чтобы ответить на этот вопрос, нам нужен более сложный пример. PingActor имеет состояния, поэтому его можно безопасно реплицировать за маршрутизатором. Но как насчет следующего актера?

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
class StoreActor extends Actor {
  
    private var lastUsedId = 0
  
    def receive = {
        case Store(s) =>
            val id = nextUniqueId()
            slowStore(s, id)
            sender ! Done(id)
    }
  
    private def nextUniqueId() = {
        lastUsedId += 1
        lastUsedId
    }
  
    private def slowStore(s: String, id: Int) {
        //...
    }
}

Очевидно, что StoreActor предполагает, что существует только один экземпляр lastUsedId и поскольку receive никогда не вызывается одновременно, уникальность идентификаторов гарантируется. Мы генерируем уникальный идентификатор, сохраняем некоторое сообщение и отправляем сгенерированный идентификатор обратно клиенту.

К сожалению, в тот момент, когда мы помещаем любой маршрутизатор перед StoreActor , каждая копия этого субъекта имеет собственную переменную lastUsedId и дублирование неизбежно. Давайте переосмыслим наш дизайн. Чтобы генерировать уникальные идентификаторы, мы должны иметь только одну копию счетчика и ограничить доступ к нему. Но хранение, скорее всего, поточно-ориентированное, поэтому может быть распараллелено. Самое простое решение — использовать сопутствующий объект AtomicInteger и AtomicInteger :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
//DIRTY! Close your eyes!
object StoreActor {
  
    val lastUsedId = new AtomicInteger
  
}
  
class StoreActor extends Actor with ActorLogging {
  
    private def nextUniqueId() = StoreActor.lastUsedId.incrementAndGet()
  
    //...
  
}

Ну … Честно говоря, общее изменяемое состояние — вряд ли то, что мы ищем. Мы должны лучше изучить модель актора и выделить логику генерации идентификатора для отделения актора, продвигая принцип единой ответственности в качестве бонуса:

01
02
03
04
05
06
07
08
09
10
11
12
13
case object GiveMeUniqueId
  
class UniqueIdActor extends Actor {
  
    private var lastUsedId = 0
  
    def receive = {
        case GiveMeUniqueId =>
            lastUsedId += 1
            sender ! lastUsedId
    }
  
}

Очевидно, что все экземпляры StoreActor за маршрутизатором должны совместно использовать ссылку на один единственный экземпляр UniqueIdActor :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
class StoreActor(uniqueIdActor: ActorRef) extends Actor {
  
    private implicit val timeout = Timeout(10 minutes)
  
    import context.dispatcher
  
    def receive = {
        case Store(s) =>
            uniqueIdActor ? GiveMeUniqueId map {
                case id: Int =>
                    slowStore(s, id)
                    Done(id)
            } pipeTo sender
    }
  
    private def slowStore(s: String, id: Int) {
        //...
    }
}

Как вы можете видеть, uniqueIdActor передается конструктору актера. Очевидно, что мы не должны создавать новый UniqueIdActor в каждом StoreActor как это приведет к созданию 10 независимых дочерних копий, а не одного централизованного актера. Вот код клея:

1
2
3
4
val uniqueIdActor = system.actorOf(Props[UniqueIdActor], "UniqueId")
val props = Props(classOf[StoreActor], uniqueIdActor).
       withRouter(RoundRobinRouter(nrOfInstances = 10))
val client = system.actorOf(props, "Heavy")

Программная транзакционная память

У вас может возникнуть ощущение, что отдельный актер, чтобы просто обернуть один Int является излишним. С другой стороны, общедоступный изменяемый AtomicInteger далек от AtomicInteger Акки. Мы можем поэкспериментировать с программной транзакционной памятью в Akka, построенной на основе ScalaSTM . Мы обернем изменчивый Int транзакцией Ref и поделимся этой ссылкой со всеми StoreActor :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
class StoreActor(counter: Ref[Int]) extends Actor {
  
    def receive = {
        case Store(s) =>
            val id = nextUniqueId()
            slowStore(s, id)
            sender ! Done(id)
    }
  
    private def nextUniqueId() = atomic {
        implicit tx =>
            counter += 1
            counter()
    }
  
    //...
  
}

На этот раз все экземпляры StoreActor совместно используют транзакцию Ref[Int] . Вызов nextUniqueId() увеличивает counter внутри транзакции, поэтому код является поточно-ориентированным. Гораздо более простая архитектура и синхронный nextUniqueId() легче читать и поддерживать. Однако общая структура данных любого рода проблематична, особенно когда мы пытаемся масштабироваться. Но в качестве упражнения попробуйте заменить STM
агенты . Вот начальный код клея для STM:

1
2
3
4
5
6
import scala.concurrent.stm.Ref
  
val globalUniqueId = Ref(0)
val props = Props(classOf[StoreActor], globalUniqueId).
       withRouter(RoundRobinRouter(nrOfInstances = 10))
val client = system.actorOf(props, "Heavy")

В идеальном мире может распределяться работа между несколькими актерами. Но что, если нам действительно нужен только один экземпляр, и он не поспевает за входящими сообщениями? В этом случае мы должны по крайней мере быстро потерпеть неудачу с

Ограниченный почтовый ящик

По умолчанию почтовые ящики ограничены только объемом имеющейся у нас памяти. Это означает, что один мошенник может влиять на всю систему, поскольку у каждого участника есть отдельный почтовый ящик, но все они используют одну и ту же кучу. Простое решение — ограничить размер почтового ящика и просто отбросить все, что выше указанного порога. К счастью, Akka поддерживает ограниченные почтовые ящики вне почтового ящика. В общем, если мы не справляемся с возрастающей нагрузкой, мы должны хотя бы быстро потерпеть неудачу, а не зависать вечно.

1
2
3
4
5
6
7
class StoreActor extends Actor with RequiresMessageQueue[BoundedMessageQueueSemantics] {
  
    private var lastUniqueId = 0
  
    //...
  
}

Кроме того, вы должны настроить емкость очереди, либо в коде, либо в application.conf :

1
2
3
4
5
bounded-mailbox {
    mailbox-type = "akka.dispatch.BoundedMailbox"
    mailbox-capacity = 1000
    mailbox-push-timeout-time = 100ms
}

В этой конфигурации есть только один экземпляр StoreActor который может StoreActor очередь до 1000 сообщений. Если отправлено больше сообщений, они отбрасываются и перенаправляются в очередь StoreActor , если только почтовый ящик StoreActor не сжимается в течение 100 миллисекунд.

Резюме

Укорочение почтовых ящиков и быстрота действий актеров — это ключевой фактор, влияющий на скорость отклика и стабильность приложения Akka. Контролируя вашу систему, вы должны обнаруживать узкие места и либо увеличивать / уменьшать, либо быстро выходить из строя. В противном случае ваша JVM быстро начнет задыхаться и терять импульс.

Ссылка: Управление перегруженными актерами в Akka от нашего партнера JCG Томаша Нуркевича в блоге Java и соседстве .