Статьи

Реактивное и производительное решение Spray + Akka для «Игры с параллелизмом и производительностью в Java и Node.js»

В моем предыдущем посте я рассмотрел фиктивный торговый движок и сравнил решение для блокировки на основе Java с решением для блокировки на основе Node.js. В конце поста я написал, что:

Я подозреваю, что после недавнего успеха Node.js появится все больше и больше асинхронных библиотек Java.

Ну, такие библиотеки уже существуют, например: Akka , Spray и этот асинхронный драйвер Mysql .

Я поставил перед собой задачу создать неблокирующее решение на основе Java с использованием именно этих библиотек, чтобы я мог сравнить его производительность с решением Node.js, созданным для последней статьи. Первое, что вы могли заметить, это то, что все они основаны на Scala, но я написал это решение на Java, хотя оно немного менее синтаксически элегантно. В прошлой статье я представил решение, основанное на Akka, в соответствии с которым торговый механизм был обёрнут в актера. Здесь я выбрал Tomcat как HTTP-сервер и заменил его на Spray, который аккуратно интегрирует HTTP-сервер прямо в Akka. Теоретически это не должно иметь никакого значения для производительности, потому что Spray — это NIO, как и Tomcat 8, из коробки. Но что привлекло меня к этому решению, так это то, что в целом количество потоков значительно уменьшилось, поскольку Spray, Akka и библиотека async Mysql используют один и тот же контекст выполнения . Работая на моей машине для разработки Windows, Tomcat имеет более 30 потоков по сравнению с несколькими более 10 для решения, построенного здесь, или по сравнению с Websphere или JBoss, где есть сотни потоков. Контекст выполнения — это в основном пул потоков, которые выполняют задачи, которые ему даны. Поскольку все библиотеки, использованные в представленном здесь решении, были неблокирующими, число потоков можно поддерживать на низком уровне и приближаться к теоретическому оптимуму, так что при минимальном переключении контекста происходит эффективная работа процесса.

Код, написанный для этой статьи, находится на GitHub . Первая часть программы — это main метод, который запускает Spray и Akka:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
public static final ActorSystem system = ActorSystem.create("system");
 
public static void main(String[] args) {
    ...
    ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor");
     
    InetSocketAddress endpoint = new InetSocketAddress(3000);
    int backlog = 100;
    List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();
    Option<ServerSettings> settings = scala.Option.empty();
    ServerSSLEngineProvider sslEngineProvider = null;
    Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);
    IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());
     
    system.scheduler().schedule(new FiniteDuration(5, TimeUnit.SECONDS), new FiniteDuration(5, TimeUnit.SECONDS), ()->{
        System.out.println(new Date() + " - numSales=" + numSales.get());
    }, system.dispatcher());
}

Строка 1 создает общедоступную систему субъекта, так что я могу получить к ней доступ из любого места, поскольку она используется для доступа к единому контексту выполнения, который я хочу использовать в программе. (В коде, где ремонтопригодность является проблемой, я бы написал что-то, чтобы этот объект мог быть внедрен в соответствующие части программы.) Строка 5 затем использует систему для создания экземпляра субъекта, который используется для обработки всех HTTP-запросов на покупку и продажу. заказы. Строки 7-11 просто устанавливают данные конфигурации для сервера. В строках 12 и 13 мы затем берем конфигурацию и нашего актера и говорим, что Akka IO использует их и модуль HTTP для отправки всех HTTP-запросов в виде сообщений нашему актеру из строки 5. В строках 15-17 я эффективно настраиваю задачу таймера. который срабатывает каждые 5 секунд, чтобы вывести некоторую статистику. Здесь важно отметить, что я не использую таймер Java для планирования задачи, поскольку это просто добавляет больше ненужных потоков в мой процесс. Вместо этого я использую тот же контекст выполнения, что и Akka, поэтому создается как можно меньше потоков.

Далее идет актер для обработки HTTP-запросов:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static class HttpActor extends AbstractActor {
 
    private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();
 
    public HttpActor() {
        final Router router = partitionAndCreateRouter();
         
        receive(ReceiveBuilder
            .match(HttpRequest.class, r -> {
                int id = Constants.ID.getAndIncrement();
                String path = String.valueOf(r.uri().path());
                if("/sell".equals(path)){
                    String productId = r.uri().query().get("productId").get();
                    ...
                    SalesOrder so = new SalesOrder(price, productId, quantity, id);
                    so.setSeller(new Seller(who));
                    router.route(so, self());
                    replyOK(id);
                }else if("/buy".equals(path)){
                    ...
                }else{
                    handleUnexpected(r);
                }
            }).match(Tcp.Connected.class, r ->{
                sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self()); //tell that connection will be handled here!
            }).build());
    }

Строка 3 показывает пример того, как интеграция Scala в Java-программу может быть уродливой, но как иногда вы можете скрыть эти уродливые части, добавляя свои собственные абстракции. Актер HTTP, который отвечает на запросы HTTP, имеет 3 задания. Первая задача в строке 6 — это создание маршрутизатора, который я опишу ниже, и который он может использовать для делегирования работы. Вторая задача — обрабатывать все новые соединения в строках 24-25, что говорит Spray, что этот субъект также будет обрабатывать фактические запросы, а не только соединения. Третье задание, выполняемое этим актером, показано в строках 9-18, где актер получает HTTP-запрос и делегирует (направляет) некоторую работу другому субъекту в системе.

Этот актер знает модель HTTP, но абстракция HTTP не проникает в следующий уровень системы. Вместо этого субъект передает объекты предметной области (или объекты-значения, или классы дел или тому подобное) субъектам, которые инкапсулируют торговые механизмы. Построение таких доменных объектов можно увидеть в строках 15 и 16, используя данные, извлеченные из HTTP-запроса, например, в строке 13, или, скажем, из объекта JSON в теле запроса. Spray содержит полезные директивы, которые могут помочь вам извлечь данные из запроса и немного абстрагироваться от HTTP, если вы этого хотите. Какой доменный объект создать, зависит от REST-подобного интерфейса, который я построил и который обрабатывается в строках 9, 12 и 19. Если бы я использовал Scala, я мог бы написать более элегантный код, используя сопоставление с образцом HttpRequest . Объект домена передается в торговый механизм, заставляя маршрутизатор из строки 6 направлять объект домена подходящему действующему субъекту в строке 17. Последняя, ​​но не менее важная строка 18 — это то, где запрос заказа на продажу подтверждается в ответе HTTP. который передает объект JSON обратно потребителю вместе с уникальным идентификатором, назначенным заказу, чтобы позднее можно было запросить его статус (он сохраняется в объектах продаж).

В следующем фрагменте показано, как мы разбиваем рынок и создаем ряд действующих лиц для параллельной обработки запросов.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
private Router partitionAndCreateRouter() {
    Map<String, ActorRef> kids = new HashMap<>();
    java.util.List<Routee> routees = new ArrayList<Routee>();
    int chunk = Constants.PRODUCT_IDS.length / NUM_KIDS;
    for (int i = 0, j = Constants.PRODUCT_IDS.length; i < j; i += chunk) {
        String[] temparray = Arrays.copyOfRange(Constants.PRODUCT_IDS, i, i + chunk);
        LOGGER.info("created engine for products " + temparray);
        ActorRef actor = getContext().actorOf(Props.create(EngineActor.class));
        getContext().watch(actor);
        routees.add(new ActorRefRoutee(actor));
 
        for (int k = 0; k < temparray.length; k++) {
            LOGGER.debug("mapping productId '" + temparray[k] + "' to engine " + i);
            kids.put(temparray[k], actor);
        }
        LOGGER.info("---started trading");
        actor.tell(EngineActor.RUN, ActorRef.noSender());
    }          
    Router router = new Router(new PartitioningRoutingLogic(kids), routees);
    return router;
}

Этот код похож на то, что мы делали в прошлой статье. Чтобы одновременно масштабировать и использовать более одного ядра, рынок разделен по идентификатору продукта, и каждый торговый механизм работает одновременно для отдельного раздела рынка. В представленном здесь EngineActor создается для каждого раздела и Routee в Routee в строке 10. Карта действующих лиц, идентифицированных по идентификатору продукта, также заполняется в строке 14. Маршрутизатор создается с использованием маршрутов и карты в строке 19 и это то, что HttpActor использует в предыдущем фрагменте при делегировании работы. Обратите также внимание на строку 17, которая запускает торговый механизм, содержащийся в EngineActor , чтобы он работал и был готов торговать заказами на покупку и продажу, когда они передаются этим участникам.

Класс EngineActor здесь явно не показан, поскольку он практически идентичен актерам, использованным в предыдущей статье, и он просто инкапсулирует торговый механизм, который обрабатывает все продукты из определенного сегмента рынка. Строка 19 выше использует RoutingLogic для построения маршрутизатора, что показано ниже:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class PartitioningRoutingLogic implements RoutingLogic {
 
    private Map<String, ActorRef> kids;
 
    public PartitioningRoutingLogic(Map<String, ActorRef> kids) {
        this.kids = kids;
    }
 
    @Override
    public Routee select(Object message, IndexedSeq<Routee> routees) {
 
        //find which product ID is relevant here
        String productId = null;
        if(message instanceof PurchaseOrder){
            productId = ((PurchaseOrder) message).getProductId();
        }else if(message instanceof SalesOrder){
            productId = ((SalesOrder) message).getProductId();
        }
        ActorRef actorHandlingProduct = kids.get(productId);
 
        //no go find the routee for the relevant actor
        for(Routee r : JavaConversions.asJavaIterable(routees)){
            ActorRef a = ((ActorRefRoutee) r).ref(); //cast ok, since the are by definition in this program all routees to ActorRefs
            if(a.equals(actorHandlingProduct)){
                return r;
            }
        }
         
        return akka.routing.NoRoutee$.MODULE$; //none found, return NoRoutee
    }
}

Метод select(...) в строке 10 вызывается маршрутизатором, когда он получает объект, который он должен направить правильному субъекту. Используя карту, созданную в предыдущем листинге, и идентификатор продукта, полученный из запроса, легко найти актера, который содержит торговый движок, отвечающий за соответствующий сегмент рынка. Возвращая роут, который оборачивает этого актера, Akka передает объект ордера правильному EngineActor , который затем помещает данные в модель, когда это сообщение обрабатывается в то время, когда торговый движок находится между торговыми циклами и затем актер проверяет его входящие

ОК, так что это передний конец имеет дело с. Вторым важным изменением, которое требовалось для решения из предыдущей статьи, была разработка метода, который сохраняет продажи после совершения торговых операций. В решении на основе Java я синхронно перебирал каждую продажу и отправлял оператор insert в базу данных и обрабатывал следующую продажу только после того, как база данных ответила. С решением, представленным здесь, я решил обработать продажи параллельно, insert запрос на insert в базу данных и немедленно перейдя к следующей продаже, и сделав то же самое. Ответы обрабатывались асинхронно в контексте выполнения с использованием обратного вызова, который я предоставил. Я написал программу, чтобы дождаться подтверждения последней вставки, прежде чем продолжить торговлю вновь созданными заказами на покупку и продажу, поступившими после начала последней торговой сессии. Это показано в следующем листинге:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void persistSales(List<Sale> sales, final PersistenceComplete f) {
    if (!sales.isEmpty()) {
        LOGGER.info("preparing to persist sales");
 
        final AtomicInteger count = new AtomicInteger(sales.size());
        sales.forEach(sale -> {
            List values = Arrays.asList(sale.getBuyer().getName(),
                                        sale.getSeller().getName(),
                                        sale.getProductId(),
                                        sale.getPrice(),
                                        sale.getQuantity(),
                                        sale.getPurchaseOrder().getId(),
                                        sale.getSalesOrder().getId());
             
            Future<QueryResult> sendQuery = POOL.sendPreparedStatement(SQL, JavaConversions.asScalaBuffer(values));
            sendQuery.onComplete(new JFunction1<Try<QueryResult>, Void>() {
                @Override
                public Void apply(Try<QueryResult> t) {
                    if(t.isSuccess()){
                        QueryResult qr = t.get();
                        //the query result doesnt contain auto generated IDs! library seems immature...
                        //sale.setId(???);
                    }
                     
                    if(count.decrementAndGet() == 0){
                        if(t.isSuccess()){
                            f.apply(null);
                        }else{
                            f.apply(t.failed().get());
                        }
                         
                    }
                    return null; //coz of Void
                }
            }, Main.system.dispatcher());
        });
    }else{
        f.apply(null); //nothing to do, so continue immediately
    }
}

Метод persistSales(...) вызывается торговым механизмом после каждого торгового цикла, и ему передается список продаж, совершенных в течение этого торгового цикла, и функция обратного вызова, которая вызывается после завершения всей сохраняемости. Если ничего не было продано, то линия 38 немедленно вызывает обратный вызов. В противном случае в строке 5 создается счетчик, который инициализируется количеством продаж, которые будут сохранены. Каждая продажа сохраняется асинхронно в строках 7-15. Обратите внимание, как Future возвращается в строке 15 и как мы используем другой обратный вызов в строках 16-35 для обработки завершения будущего — здесь нет блокировки, ожидающей завершения будущего! Вышеупомянутый счетчик уменьшается в строке 25, как только продажа сохраняется, и как только все продажи сохраняются, persistSales(...) обратный вызов, переданный в метод persistSales(...) . Обратите внимание, что класс JFunction1 используемый в строке 16, представляет собой оболочку, позволяющую упростить интеграцию Scala — код находится на GitHub по ссылке, приведенной выше. Строки 21 и 22 показывают, что у меня возникла небольшая проблема с библиотекой async Mysql, которую я использовал. Это все еще бета-версия, и, похоже, нет способа получить сгенерированный (автоинкремент) первичный ключ продажи. Обратите также внимание на строку 35, где я передаю контекст выполнения, который использует Akka, так что Future который обрабатывает завершение оператора вставки, обрабатывается в одном из существующих потоков, а не в каком-то новом потоке — опять же, сохраняя общее количество темы как можно ниже.

Этот список также показывает интересную проблему, а именно то, что поток, который вызывает базу данных для вставки данных, не обязательно является тем же потоком, который может потребоваться для закрытия соединения [1]. В обычных Java EE и Spring часто используется локальное хранилище потоков (см. Также здесь ). Если вы вызвали bean-компонент из функции, обрабатывающей завершение будущего, ресурсы, которые в него внедрены, могут не работать, потому что контейнер не может определить контекст. Scala решает эту проблему, используя неявные параметры , которые передаются в методы под капотом.

В приведенном выше листинге используется обратный вызов PersistenceComplete , который показан ниже в строках 14-16. Он также использует пул соединений, который создается с использованием следующего кода. Еще раз, контекст выполнения, который использует Akka, передается в асинхронную библиотеку Mysql, в строке 10 ниже. В строке 10 ниже также показана конфигурация пула не по умолчанию, где я разрешаю максимальный размер очереди до тысячи. Во время нагрузочного тестирования я получал много ошибок, указывающих на насыщение пула, и увеличение этого значения решило проблему.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
private static final String SQL = "INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) VALUES (?, ?, ?, ?, ?, ?, ?)";
 
private static final ConnectionPool<MySQLConnection> POOL;
static {
    Duration connectTimeout = Duration.apply(5.0, TimeUnit.SECONDS);
    Duration testTimeout = Duration.apply(5.0, TimeUnit.SECONDS);
    Configuration configuration = new Configuration("root", Main.DB_HOST, 3306, Option.apply("password"), Option.apply("TRADER"), io.netty.util.CharsetUtil.UTF_8, 16777216, PooledByteBufAllocator.DEFAULT, connectTimeout, testTimeout);
     
    MySQLConnectionFactory factory = new MySQLConnectionFactory(configuration);
    POOL = new ConnectionPool<MySQLConnection>(factory, new PoolConfiguration(1000, 4, 1000, 4000), Main.system.dispatcher());
}
 
 
private static interface PersistenceComplete {
    void apply(Throwable failure);
}

Обратный вызов, переданный в persistSales(...) , показан в следующем листинге. Следующий код почти не отличается от оригинала, показанного в прошлой статье, за исключением того, что он теперь асинхронный по стилю. Он вызывается, как только все продажи сохраняются, и только тогда обратный вызов отправляет сообщение (через слушателя событий) субъекту в строке 14 ниже. Это сообщение, как правило, будет находиться в задней части папки «Входящие» после загрузки новых заказов на покупку и продажу. Каждое из этих сообщений будет обработано, что приведет к обновлению модели торгового механизма новыми ордерами до возобновления торговли.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
persistSales(sales, t -> {
    if(t != null){
        LOGGER.error("failed to persist sales: " + sales, t);
    }else{
        LOGGER.info("persisting completed, notifying involved parties...");
        sales.stream().forEach(sale -> {
            if (sale.getBuyer().listener != null)
                sale.getBuyer().listener.onEvent(EventType.PURCHASE, sale);
            if (sale.getSeller().listener != null)
                sale.getSeller().listener.onEvent(EventType.SALE, sale);
        });
        ...
    }
    listener.onEvent(EventType.STOPPED, null);
});

Окончательный листинг кода — это модификация решения Node.js, которая была сделана так, чтобы она также сохраняла продажи параллельно, а не один за другим, как это было в предыдущей статье.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
function persistSales(sales, callback){
    if(sales.length === 0 || process.env.skipPersistence) {
        callback(); //nothing to do, so continue immediately
    }else{
        resources.dbConnection(function(err, connection) {
            if(err) callback(err); else {
                logger.info('preparing to persist ' + sales.length + ' sales');
                var count = sales.length;
                _.each(sales, function(sale){ //save them in parallel
                    connection.query(
                            'INSERT INTO SALES (BUYER_NAME, SELLER_NAME, PRODUCT_ID, PRICE, QUANTITY, PO_ID, SO_ID) values (?, ?, ?, ?, ?, ?, ?)',
                            [sale.buyer.name, sale.seller.name, sale.productId, sale.price, sale.quantity, sale.po.id, sale.so.id],
                            function(err, rows, fields) {
                                if(err) callback(err); else {
                                    sale.id = rows.insertId;
                                    count--;
                                    if(count == 0){
                                        logger.info('persisted all sales');
                                        connection.release();
                                        callback();
                                    }
                                }
                            }
                    );
                });
            }
        });
    }
}

Строка 5 выбирает соединение из пула, и одно и то же соединение повторно используется для всех продаж, «параллельно», и только освобождается, то есть возвращается в пул после сохранения последней продажи, в строке 19.

Итак, еще раз, пришло время сравнить решения с помощью некоторых нагрузочных тестов. На этот раз я решил посмотреть, какой максимальной скорости продаж я смог бы достичь с помощью каждого из следующих трех решений:

  • Случай 1 — представленное здесь решение, а именно Spray + Akka + асинхронный драйвер Mysql,
  • Случай 2 — модифицированное решение Node.js, использующее постоянство параллельно,
  • Случай 3 — оригинальный неблокирующий разъем Tomcat, но с синхронным постоянством.

Случаи выполнялись с использованием оборудования из предыдущей статьи, причем торговые механизмы работали на быстром оборудовании, а база данных — на медленном, потому что это была лучшая установка, чтобы показать, как блокировка ввода-вывода вызывает проблемы с производительностью. Для каждого случая было три переменные, которые я мог настроить во время настройки. Это были:

  • Количество торговых движков (как действующих лиц, так и дочерних процессов),
  • Время ожидания клиента между звонками на сервер,
  • Количество одновременных клиентов.

Последние два в основном настраивали количество запросов в секунду, поскольку соединения не оставались открытыми в ожидании результатов торгов (см. Предыдущую статью). Результаты были следующими, с лучшими характеристиками, выделенными жирным шрифтом.

Случай 1 — Spray + Akka + асинхронный драйвер Mysql
# торговые движки время ожидания клиента между вызовами одновременные клиенты продажи в минуту ок. ЦП на торговом оборудовании
8 100мс 60 42810 25-35%
8 80ms 70 62392 25-35%
8 60мс 80 75600 30-40%
8 40мс 90 59217 30-50%
10 60мс 80 слишком много проблем с подключением к БД
5 60мс 60 67398 25-35%
6 60мс 80 79536 25-35%

Случай 2 — Node.js с постоянством параллельно
# торговые движки время ожидания клиента между вызовами одновременные клиенты продажи в минуту ок. ЦП на торговом оборудовании
8 200мс 30 6684 40-50%
8 100мс 60 начал отставать
8 100мс 40 17058 25-35%
8 100мс 50 начал отставать
12 100мс 50 20808 45-60%
16 100мс 60 24960 45-65%
20 100мс 80 32718 45-70%
25 60мс 80 51234 75-85%
30 50мс 80 22026 75-85%
25 10мс 70 17604 75-90%

Случай 3 — Tomcat 8 NIO с синхронной блокировкой
# торговые движки время ожидания клиента между вызовами одновременные клиенты продажи в минуту ок. ЦП на торговом оборудовании
4 200мс 30 9586 5%
4 150мс 30 10221 5%
8 200мс 30 9510 5%

Результаты показывают, что прикручивать разъем NIO к Tomcat и думать, что вы неблокируете и работоспособны, опасно, поскольку это решение уступает почти в 8 раз по сравнению с решением Akka. Результаты также показывают, что с помощью неблокирующих библиотек и написания неблокирующего решения на Java можно создать очень производительное решение по сравнению с Node.js. Решение Java не только обеспечивало пропускную способность около 50%, оно использовало менее половины процессорного времени.

Очень важно: обратите внимание, что это результат, характерный для используемых здесь алгоритмов и моей архитектуры, дизайна и реализации. Это также зависит от использования «нестандартных» библиотек Java, и действительно, в используемой мной библиотеке Mysql отсутствовала функциональность, например, чтение сгенерированных первичных ключей из результата insert . Пожалуйста, проведите свои собственные эксперименты для своих вариантов использования, прежде чем делать выводы об относительной производительности Java против Scala против Node.js!

Замечательный момент при сравнении вариации количества торговых движков: в Node.js он напрямую контролирует количество дочерних процессов, аналогично количеству потоков; в решении Akka это никак не повлияло на количество потоков в системе — это число оставалось постоянным! В решениях Akka изменение количества действующих лиц влияет на количество сообщений в их почтовых ящиках.

Дополнительную информацию, касающуюся использования Akka и Spray, можно найти в этом хорошем видео . Пожалуйста, найдите время, чтобы также быстро прочитать о реактивном манифесте . Представленное здесь решение Akka является реагирующим, потому что оно отзывчивое (самая высокая пропускная способность во всех трех случаях), устойчивое (Akka предоставляет простые способы устранения сбоев, хотя здесь это не требуется), эластичное (оно автоматически масштабируется, поскольку Akka управляет пулом потоков размер в контексте выполнения, и он масштабируется, потому что Akka обеспечивает прозрачное расположение актеров), и он управляется сообщениями (благодаря использованию модели актера).

[1] Используемая здесь библиотека Mysql не требует, чтобы соединение было закрыто и возвращено в пул, как, например, делает пул баз данных Apache . Это на самом деле вызывает проблемы! Оставив его открытым, проблем не возникает, что подтверждается проведенными мною нагрузочными тестами.