Статьи

Введение в Apache Apollo Часть II: HawtDispatch

Apache Apollo — это высокопроизводительный мультипротокольный брокер обмена сообщениями следующего поколения, созданный с нуля до одного дня и являющийся заменой ActiveMQ 5.x. Я писал об этом в прошлом . Неблокирующая асинхронная архитектура Apollo позволяет ему быть очень быстрым и очень хорошо масштабироваться в многоядерных системах с минимальным количеством потоков. Поддерживаемые протоколы включают AMQP , STOMP , MQTT и собственный двоичный протокол ActiveMQ, Openwire . В этой статье мы подробнее рассмотрим модель потоков, на которой построен Apollo: HawtDispatch .

Как отметил Хирам в своем блоге пару лет назад, повышение производительности ActiveMQтребуется глубокий взгляд на реализацию потоков и известные узкие места / конфликтные точки потоков. ActiveMQ был спроектирован и реализован (2004?), Когда блокировка ввода-вывода и многопоточных систем с общим состоянием были хорошо известны и понятны, но, по мере того, как вы пытаетесь выжать из этого все большую и большую производительность, фундаментальные ограничения начинают проявляться. В последние годы это похоже на неблокирующие методы (которые существуют намного дольше, особенно в разработке операционных систем), и библиотеки, поддерживающие эти методы, начали появляться в экосистеме Java. После долгих исследований, создания прототипов, бенчмаркинга, анализа, полоскания и повторения скраба Хирам и оригинальные дизайнеры Apollo остановились на неблокирующем, основанном на событиях подходе, впервые предложенном инженерами Apple, известном как Grand Central Dispatch., Такой подход сводит к минимуму блокировку общего состояния, более эффективно использует ресурсы ОС (это огромно), обеспечивает лучшую масштабируемость на больших блоках (многоядерных процессорах) и очень хорошо подходит для серверов обмена сообщениями.

На помощь приходит HawtDispatch с его моделью GCD, верно? Ну, в то время (2008-2009-е?) Было не так много вариантов оптимизированных для системы сообщений неблокирующих, асинхронных механизмов обработки событий. Grand Central Dispatch был на самом деле родной реализацией Mac OS ( libdispatch ), и не было никакого эквивалента на уровне приложения. Таким образом, первоначальные разработчики Apollo намеревались построить эту модель, имея в виду тесную связь, которую она будет иметь с будущим продуктом обмена сообщениями.

И теперь у нас это есть . (Это было вокруг в течение пары лет)

И это с открытым исходным кодом .

И у него есть лицензия Apache !

Так что же это?

Неблокирующая, асинхронная система обработки событий / задач

HawtDispatch реализует модель потоков, основанную на цикле обработки событий (на самом деле, несколько циклов обработки событий… см. Раздел ниже о пулах). «Событие» или «задача» (которая является просто исполняемым на языке Java) отправляется диспетчеру и планируется запускать в любом из потоков HawtDispatch в пуле потоков. Те же самые концепции из Grand Central Dispatch используются для отправки задач и их выполнения либо параллельно, либо последовательно. Концепция «Глобальной очереди отправки» используется для представления структуры, в которую отправляются задачи и из каких задач они снимаются с очереди и выполняются в любом из доступных потоков обработки событий. Задачи не гарантированно выполняются в любом порядке, так как они будут распределены по любому доступному потоку в пуле и будут выполняться параллельно.«Последовательная очередь отправки» — это простая структура данных, которая будет выполнять свои задачи в порядке FIFO. Если задачи A, B и C добавляются в последовательную очередь, сначала запускаются A, затем B, затем C. Вы можете использовать последовательные очереди для защиты общего состояния и обеспечения порядка операций, и они настолько легки, что вы можете создать большой их количество по мере необходимости (например, по одному на соединение или по одному на назначение в брокере сообщений:) ) Очередь последовательной отправки больше не будет создавать потоки, она будет работать на доступных потоках HawtDispatch в пуле потоков.

Для получения дополнительной информации о Grand Central Dispatch см. Справочные документы GCD .

Для корректной работы обработки этого события / выполнения задачи ни одной из задач не разрешается блокировать (при вводе-выводе) или синхронизировать в каком-либо общем состоянии (используя блокировки). Но для этого вам понадобится неблокирующая библиотека ввода-вывода (NIO) для этого… HawtDispatch обеспечит вас!

Встроенная поддержка NIO

NIO благодаря поддержке Java NIO встроен прямо в библиотеку и, что более важно, прямо в каждый из потоков HawtDispatch. Это означает, что если поток не занят выполнением задач, он будет обслуживать селекторы готовности и выполнять связанные обработчики событий.

Добавить селекторы очень просто с API HawtDispatch:

DispatchSource createSource(SelectableChannel channel, int interestOps, DispatchQueue queue)

Этот метод создает новый объект DispatchSource, который будет прослушивать на указанном канале интересующие вас события. Перед тем, как DispatchSource начнет обрабатывать события, вам нужно предоставить ему обработчик событий и вызвать resume ():

source.setEventHandler(Runnable runnable)
source.resume()

Вот и все! Не нужно специально работать с селекторами или настраивать цикл для обработки готовности и т. Д. Просто прямой API, который очень хорошо вписывается в общую модель потоков HawtDispatch.

Пользовательская отправка источника событий

Точно так же, как вы можете использовать поддержку NIO с HawtDispatch, вы можете создавать собственные источники событий и связанные с ними обработчики.
Одним из ключей к этой бесперебойной работе является возможность запуска и объединения нескольких событий перед фактической доставкой в ​​обработчик событий.

Создать собственный источник отправки очень просто:

DispatchSource createSource(EventAggregator eventAggregator, DispatchQueue disptachQueue)

EventAggregator контролирует, как несколько событий могут быть объединены вместе перед доставкой в ​​обработчик событий.

Затем вы указываете обработчик события и вызываете resume (), как указано выше:

source.setEventHandler(Runnable runnable)
source.resume()

Из вашего обработчика событий вы можете получить доступ к объединенным событиям (представленным теперь как одно событие) с source.getData()

См. Документацию HawtDispatch по пользовательским источникам отправки для получения дополнительной информации.

На основе оптимизированного пула потоков

HawtDispatch решит, сколько потоков использовать для своего пула, исходя из количества ядер, доступных на вашем компьютере. Так что, если у вас 8 ядер, HawtDispatch создаст и использует 8 потоков для пула выполнения задач. Фактическое выполнение потоков на каждом ядре или закрепление потоков на ядре невозможно настроить с помощью HawtDispatch, что является отклонением от исходного libdispatch, но базовые принципы все еще сохраняются. Сохранение количества потоков, привязанных к числу ядер, позволяет каждому ядру оставаться достаточно горячим с меньшим количеством переключений контекста и меньшим количеством кешей уровня 1.

Scala и Java API

И Java, и Scala поддерживаются. Преимущество использования Scala с HawtDispatch заключается в значительном сокращении стандартного кода для отправки исполняемых файлов. Например, проверьте, сколько кода требуется для выполнения задачи с Java:

queue.execute(new Runnable(){
  public void run() {
    System.out.println("Hi!");
  }
});

Теперь проверьте код Scala:

queue {
  System.out.println("Hi!");
}

Написание замыканий и встроенных функций в Scala намного чище.

Встроенная реализация транспорта для TCP, SSL, UDP с абстракциями «заполните пустые» для обработки пользовательских протоколов

Это потрясающая функция, которая значительно упрощает работу с различными видами транспорта в ваших собственных приложениях. Давайте посмотрим на TcpTransport в качестве быстрого примера.

TcpTransport отвечает за чтение / запись из основного потока TCP. Под прикрытием используется интеграция HawtDispatches NIO, а также пользовательские источники отправки, позволяющие осуществлять обработку протокола. Каждый объект TcpTransport будет иметь связанный объект ProtocolCodec, который отвечает за кодирование / декодирование байтов, которые передаются по проводам. Здесь вам нужно подключить свою пользовательскую проводную реализацию (например, создать двоичный протокол MQTT / AMQP или OpenMire ActiveMQ). Абстрактный класс, AbstractProtocolCodecуже существует реализовать много тяжелого подъема кодирования / декодирования, и все , что вам нужно сделать , это реализовать encode()и initialDecodeAction()методы.

Использование абстракций транспорта HawtDispatch позволяет быстро реализовать протокол на уровне проводов неблокирующим способом.

Дать ему шанс!

HawtDispatch прост, легок и прост в использовании. Я настоятельно рекомендую взглянуть на это, особенно если вы хотите создавать масштабируемые приложения, используя неблокирующий подход.