Статьи

Обработка 10 миллионов сообщений с помощью Akka

Актеры Akka обещают параллелизм. Что может быть лучше, чтобы смоделировать это и посмотреть, сколько времени потребуется для обработки 10 миллионов сообщений с использованием стандартного аппаратного и программного обеспечения без каких-либо низкоуровневых настроек. Я написал все 10 миллионов обработок сообщений на Java, и общие результаты меня поразили.

Когда я запустил программу на своем компьютере iMac с процессором Intel i5 — 4, 4 ГБ ОЗУ и кучей JVM на 1024 МБ, программа обработала 10 миллионов компьютеров за 23 секунды. Я запускал программу несколько раз, и среднее время было в пределах 25 секунд. Таким образом, пропускная способность, которую я получил, была почти в диапазоне 400K сообщений в секунду, что феноменально.

На рисунке ниже показан процесс, используемый для моделирования сценария генерации нагрузки.

Предостережение: каждое сообщение отправляет ответ через 1 секунду, что не является правильным симуляцией для сценария реального мира. В этом сценарии обработка сообщений потребляет некоторые ресурсы для операций heap и gc, которые не учитываются.

Программа использует общее направление от актеров поста Akka: 10 миллионов обработанных сообщений (1 с / сообщение) за 75 секунд! хотя без каких-либо сообщений удушение.

База кодов для программы доступна по следующему адресу: https://github.com/write2munish/Akka-Essentials.

ApplicationManagerSystem создает участников и прокачивает трафик к WorkerActor

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private ActorSystem system;
private final ActorRef router;
private final static int no_of_msgs = 10 * 1000000;
 
public ApplicationManagerSystem() {
 
 final int no_of_workers = 10;
 
 system = ActorSystem.create('LoadGeneratorApp');
 
 final ActorRef appManager = system.actorOf(
   new Props(new UntypedActorFactory() {
    public UntypedActor create() {
     return new JobControllerActor(no_of_msgs);
    }
   }), 'jobController');
 
 router = system.actorOf(new Props(new UntypedActorFactory() {
  public UntypedActor create() {
   return new WorkerActor(appManager);
  }
 }).withRouter(new RoundRobinRouter(no_of_workers)));
}
 
private void generateLoad() {
 for (int i = no_of_msgs; i >= 0; i--) {
  router.tell('Job Id ' + i + '# send');
 }
 System.out.println('All jobs sent successfully');
}

Как только сообщения получены WorkerActor, ответы планируется отправить через 1000 миллисекунд.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
public class WorkerActor extends UntypedActor {
 
 private ActorRef jobController;
 
 @Override
 public void onReceive(Object message) throws Exception {
   using scheduler to send the reply after 1000 milliseconds
  getContext()
    .system()
    .scheduler()
    .scheduleOnce(Duration.create(1000, TimeUnit.MILLISECONDS),
      jobController, 'Done');
 }
 
 public WorkerActor(ActorRef inJobController) {
  jobController = inJobController;
 }
}

Ответные сообщения от WorkerActor отправляются в JobControllerActor, который собирает все ответы.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class JobControllerActor extends UntypedActor {
 
 int count = 0;
 long startedTime = System.currentTimeMillis();
 int no_of_msgs = 0;
 
 @Override
 public void onReceive(Object message) throws Exception {
 
  if (message instanceof String) {
   if (((String) message).compareTo('Done') == 0) {
    count++;
    if (count == no_of_msgs) {
     long now = System.currentTimeMillis();
     System.out.println('All messages processed in '
       + (now - startedTime)  1000 + ' seconds');
 
     System.out.println('Total Number of messages processed '
       + count);
     getContext().system().shutdown();
    }
   }
  }
 
 }
}

Ссылка: Учебное пособие: Hibernate, JPA и Spring MVC — часть 2 от нашего партнера по JCG Муниша К Гупты в блоге Akka Essentials .