Статьи

Микросервисы в мире хроники — часть 2

В этой части мы рассмотрим превращение компонента в сервис.

В первой части мы рассмотрели, как легко создавать и тестировать компоненты, которые ожидают поступления асинхронных сообщений и выдают асинхронные сообщения. Однако, как мы превращаем это в услугу ?

Превращение наших компонентов в сервис.

Главное, чего не хватает в наших компонентах, — это транспорт . Отсутствие транспорта упрощает тестирование, профилирование и отладку, но нам нужно распространять наши компоненты, а для этого нам нужен транспорт.

Существует широкий спектр возможных видов транспорта:

  • Хроника Очередь
  • Необработанные TCP-сообщения или UDP-пакеты с такой библиотекой, как Netty
  • JMS Messaging
  • REST API [ 4 ]
  • Таблицы базы данных через JDBC
  • Файлы сбрасываются в каталог и используют каталог WatchService
  • потокобезопасная очередь, например BlockingQueue
  • Общая память
  • Apache Aries для сменных транспортов
  • вообще нет транспорта (вызовы методов подходят для данного варианта использования)

Это Очередь Хроник, которую мы рассмотрим в посте.

Использование очереди в модульном тесте

Chronicle Queue сохраняется, однако в модульных тестах вы обычно хотите начать все заново и впоследствии удалить очередь. Вы можете использовать следующую фразу:

Создание временной очереди

1
2
3
4
5
6
7
8
9
File queuePath = new File(OS.TARGET, "testName-" + System.nanoTime());
try {
    try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) {
        // use the queue
    }
 
} finally {
    IOTools.shallowDeleteDirWithFiles(queuePath);
}

Это создает очередь, которая хранится в одном файле. Файл катится ежедневно по умолчанию и включает в себя текущую дату в пути.

Написание событий

Если мы повторим наш тест, как и раньше, вместо использования имитирующего прослушивателя , мы можем использовать прослушиватель, который записывает каждый вызванный метод в очередь:

Запись методов, вызываемых в очередь для любого интерфейса

1
2
OrderIdeaListener orderManager = queue.createAppender()
                                      .methodWriter(OrderIdeaListener.class, MarketDataListener.class);

Наш комбинатор пишет в эту очередь, как и наш тест:

Комбайнер SidedPrice

1
SidedMarketDataCombiner combiner = new SidedMarketDataCombiner((MarketDataListener) orderManager);

Мы также можем повторить входящие события. Собрав все это вместе, мы получим:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) {
    OrderIdeaListener orderManager = queue.createAppender().methodWriter(OrderIdeaListener.class, MarketDataListener.class);
    SidedMarketDataCombiner combiner = new SidedMarketDataCombiner((MarketDataListener) orderManager);
 
    // events in
    orderManager.onOrderIdea(new OrderIdea("EURUSD", Side.Buy, 1.1180, 2e6)); // not expected to trigger
 
    combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789000L, Side.Sell, 1.1172, 2e6));
    combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789100L, Side.Buy, 1.1160, 2e6));
 
    combiner.onSidedPrice(new SidedPrice("EURUSD", 123456789100L, Side.Buy, 1.1167, 2e6));
 
    orderManager.onOrderIdea(new OrderIdea("EURUSD", Side.Buy, 1.1165, 1e6)); // expected to trigger
}

Как только мы записали все события в нашу очередь, мы можем обработать очередь в нашем тесте. Более реалистичным примером является запуск этих двух компонентов в разных потоках, процессах или на разных машинах, однако это только усложняет тесты, и результат должен быть одинаковым при условии, что транспорт выполняет свою работу.

Чтение событий.

Когда мы читаем события, нам нужен компонент, который реализует методы, названные выше, и фиктивный слушатель, чтобы гарантировать, что он вызывает правильные события.

Прочитайте все события и проверьте правильный вывод

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
// what we expect to happen
OrderListener listener = createMock(OrderListener.class);
listener.onOrder(new Order("EURUSD", Side.Buy, 1.1167, 1_000_000));
replay(listener);
 
try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(queuePath).build()) {
    // build our scenario
    OrderManager orderManager = new OrderManager(listener); (1)
    MethodReader reader = queue.createTailer().methodReader(orderManager); (2)
    for (int i = 0; i < 5; i++)
        assertTrue(reader.readOne()); (3)
 
    assertFalse(reader.readOne()); (4)
    System.out.println(queue.dump()); (5)
}
 
verify(listener);
Наконечник
наш компонент для тестирования
наш читатель очереди, который будет вызывать методы
цикл для чтения / обработки по одному методу за раз.
у нас больше нет сообщений
выведите содержимое очереди, чтобы мы могли увидеть, что было введено.

Наконец, тест выдает необработанное содержимое очереди. Это читает данные, хранящиеся в файле, который использует очередь. Этот дамп полезен только для небольших очередей с несколькими МБ данных. Если у вас есть несколько ГБ, он не сможет быть сохранен в строке. Вы можете использовать DumpQueueMain

Вывод дампа ()

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
--- !!meta-data #binary
header: !SCQStore {
  wireType: !WireType BINARY,
  writePosition: 777,
  roll: !SCQSRoll {
    length: 86400000,
    format: yyyyMMdd,
    epoch: 0
    },
  indexing: !SCQSIndexing {
    indexCount: !int 8192,
    indexSpacing: 64,
    index2Index: 0,
    lastIndex: 0
    }
}
# position: 227
--- !!data #binary
onOrderIdea: {
  symbol: EURUSD,
  side: Buy,
  limitPrice: 1.118,
  quantity: 2000000.0
}
# position: 306
--- !!data #binary
onTopOfBookPrice: {
  symbol: EURUSD,
  timestamp: 123456789000,
  buyPrice: NaN,
  buyQuantity: 0,
  sellPrice: 1.1172,
  sellQuantity: 2000000.0
}
# position: 434
--- !!data #binary
onTopOfBookPrice: {
  symbol: EURUSD,
  timestamp: 123456789100,
  buyPrice: 1.116,
  buyQuantity: 2000000.0,
  sellPrice: 1.1172,
  sellQuantity: 2000000.0
}
# position: 566
--- !!data #binary
onTopOfBookPrice: {
  symbol: EURUSD,
  timestamp: 123456789100,
  buyPrice: 1.1167,
  buyQuantity: 2000000.0,
  sellPrice: 1.1172,
  sellQuantity: 2000000.0
}
# position: 698
--- !!data #binary
onOrderIdea: {
  symbol: EURUSD,
  side: Buy,
  limitPrice: 1.1165,
  quantity: 1000000.0
}
...
# 83885299 bytes remaining

Для запуска теста и выгрузки очереди в моей IDE потребовалось 233 мс.

Вывод

Мы можем тестировать компоненты в одиночку с очередью или в цепочке, используя больше очередей. Что еще более важно мы можем тестировать наши компоненты без инфраструктуры, которая усложняет процесс отладки. Когда наши компоненты работают без транспорта, мы можем показать, что они делают то же самое с транспортом.

В нашей следующей части

В третьей части мы рассмотрим бенчмаркинг и профилирование с помощью Queue . Хотя очередь разработана так, чтобы быть простой и прозрачной, она также разработана так, чтобы быть быстрее, чем другие постоянные транспорты, даже без настройки.

Ссылка: Микросервисы в мире хроники — часть 2 от нашего партнера по JCG Питера Лоури из блога Vanilla Java .