Недавно на InfoQ, Алексей Папу опубликовал статью о своих экспериментах с высокопроизводительным обменом сообщениями между потоками. В статье было несколько примеров, но я собираюсь сосредоточиться на случае с несколькими производителями. Одна из оптимизаций, которые продемонстрировала статья, заключалась в том, что, если вы знали количество производителей, которые у вас есть во время инициализации, вы можете построить структуру, которая значительно уменьшает конкуренцию. Существующий MultiProducerSequencer не имеет этого ограничения, что важно для большого количества вариантов использования. Однако я хотел посмотреть, чего мы сможем достичь, применив этот подход к Disruptor.
Конструкция, которую я собираюсь использовать, состоит в том, чтобы вместо одного Disruptor с MultiProducerSequencer я собирался иметь Disruptor для каждого производителя с SingleProducerSequencer для каждого. Чтобы все каналы событий были объединены в один EventHandler, мне нужно реализовать собственный EventProcessor, который может опрашивать несколько SequenceBarrier / DataProvider . Этот пользовательский MultiBufferBatchEventProcesor можно увидеть в пакете com.lmax.disruptor.support внутри тестов производительности. Ключевым компонентом этого класса является основной цикл, который можно увидеть здесь:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
while ( true ) { try { for ( int i = 0 ; i < barrierLength; i++) { long available = barriers[i].waitFor(- 1 ); Sequence sequence = sequences[i]; long previous = sequence.get(); for ( long l = previous + 1 ; l <= available; l++) { handler.onEvent(providers[i].get(l), l, previous == available); } sequence.set(available); count += (available - previous); } Thread.yield(); } // Lines omitted... } |
Центральным в этом подходе является то, что обработчик событий имеет 2 массива, переданных в его конструктор:
- DataProviders []; массив кольцевых буферов для чтения данных.
- SequenceBarrier []; барьеры, подаваемые каждым из кольцевых буферов.
Исходя из этого, обработчик событий создаст массив последовательностей, которые будут использоваться для отслеживания обработанных событий из каждого кольцевого буфера. Цикл событий будет перебирать барьеры последовательности, чтобы определить, есть ли у любого из кольцевых буферов данные, доступные для чтения. Любые доступные данные будут переданы в предоставленный пользователем обработчик событий.
Чтобы проверить это, я создал ThreeToThreeSequencedThroughputTest, который запускает трех производителей и одного потребителя. Один из аспектов кода, представленного в статье InfoQ, заключается в том, что каждый поезд может вместить три лонга, а один «оп» измеряется как движение одного лонга. Чтобы сделать тест более сопоставимым, я использовал массив из трех длин в качестве записи в кольцевом буфере и умножил общее количество событий, перемещенных между потоками, на 3, чтобы вычислить общее количество операций.
Результаты тестов (процессор Intel® Core ™ TM i7-3770 с тактовой частотой 3,40 ГГц)
Disruptor:
1
2
3
4
5
6
7
|
Run 0, Disruptor=390,738,060 ops /sec Run 1, Disruptor=387,931,034 ops /sec Run 2, Disruptor=397,058,823 ops /sec Run 3, Disruptor=394,160,583 ops /sec Run 4, Disruptor=396,767,083 ops /sec Run 5, Disruptor=394,736,842 ops /sec Run 6, Disruptor=396,767,083 ops /sec |
Железнодорожный / Поезд:
1
2
3
4
5
6
7
|
ops /sec = 243,141,801 ops /sec = 302,695,445 ops /sec = 283,096,862 ops /sec = 273,670,298 ops /sec = 268,340,387 ops /sec = 264,802,500 ops /sec = 262,258,028 |
Очевидно, что этот подход имеет некоторые достоинства. Я рассматриваю возможность добавления этого в основной дистрибутив Disruptor. Тем не менее, есть еще пара вопросов дизайна, которые мне нужно решить в первую очередь.
Значение последовательности, передаваемое обработчику, является значением из исходного кольцевого буфера и может увеличиваться и уменьшаться по мере прохождения событий, и я не уверен, будет ли это проблемой для пользователей. Это похоже на довольно незначительную вещь, так что, надеюсь, мне не нужно будет беспокоиться об этом. Обработчик событий поддерживает только уступки в данный момент. Мне нужно выяснить, как правильно включить стратегии ожидания, так как в стратегии ожидания блокировки и пробуждения потребителя возникнут некоторые проблемы.
Это идея, которую мы упомянули в списке рассылки некоторое время назад, и я изначально не был особенно заинтересован, но, учитывая значительную выгоду от этого подхода, я думаю, что я буду относиться к идее более серьезно.