Hystrix имеет расширенную функцию свертывания (или пакетирования) запросов. Если две или более команд выполняют одинаковый запрос одновременно, Hystrix может объединить их вместе, выполнить один пакетный запрос и отправить результаты разделения обратно всем командам. Давайте сначала посмотрим, как Hystrix работает без разрушения. Представьте, что у нас есть сервис, который ищет StockPrice
данного StockPrice
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import lombok.Value; import java.math.BigDecimal; import java.time.Instant; @Value class Ticker { String symbol; } @Value class StockPrice { BigDecimal price; Instant effectiveTime; } interface StockPriceGateway { default StockPrice load(Ticker stock) { final Set<Ticker> oneTicker = Collections.singleton(stock); return loadAll(oneTicker). get (stock); } ImmutableMap<Ticker, StockPrice> loadAll(Set<Ticker> tickers); } |
Базовая реализация StockPriceGateway
должна предоставлять пакетный метод loadAll()
а метод load()
реализован для нашего удобства. Таким образом, наш шлюз способен загружать несколько цен за один пакет (например, чтобы уменьшить задержку или издержки сетевого протокола), но в настоящее время мы не используем эту функцию, всегда загружая цену одной акции за раз:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
class StockPriceCommand extends HystrixCommand<StockPrice> { private final StockPriceGateway gateway; private final Ticker stock; StockPriceCommand(StockPriceGateway gateway, Ticker stock) { super(HystrixCommandGroupKey.Factory.asKey( "Stock" )); this.gateway = gateway; this.stock = stock; } @Override protected StockPrice run() throws Exception { return gateway.load(stock); } } |
Такая команда всегда будет вызывать StockPriceGateway.load()
для каждого StockPriceGateway.load()
, как показано в следующих тестах:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
class StockPriceCommandTest extends Specification { def gateway = Mock(StockPriceGateway) def 'should fetch price from external service' () { given: gateway.load(TickerExamples.any()) >> StockPriceExamples.any() def command = new StockPriceCommand(gateway, TickerExamples.any()) when: def price = command.execute() then: price == StockPriceExamples.any() } def 'should call gateway exactly once when running Hystrix command' () { given: def command = new StockPriceCommand(gateway, TickerExamples.any()) when: command.execute() then: 1 * gateway.load(TickerExamples.any()) } def 'should call gateway twice when command executed two times' () { given: def commandOne = new StockPriceCommand(gateway, TickerExamples.any()) def commandTwo = new StockPriceCommand(gateway, TickerExamples.any()) when: commandOne.execute() commandTwo.execute() then: 2 * gateway.load(TickerExamples.any()) } def 'should call gateway twice even when executed in parallel' () { given: def commandOne = new StockPriceCommand(gateway, TickerExamples.any()) def commandTwo = new StockPriceCommand(gateway, TickerExamples.any()) when: Future<StockPrice> futureOne = commandOne.queue() Future<StockPrice> futureTwo = commandTwo.queue() and: futureOne. get () futureTwo. get () then: 2 * gateway.load(TickerExamples.any()) } } |
Если вы не знаете Hystrix, добавив внешний вызов в команду, вы получите множество функций, таких как тайм-ауты, автоматические выключатели и т. Д. Но эта тема не рассматривается. Посмотрите на последние два теста: при запросе цены произвольного тикера дважды, последовательно или параллельно ( queue()
) наш внешний gateway
также вызывается дважды. Последний тест особенно интересен — мы просим один и тот же тикер почти в одно и то же время, но Hystrix не может этого понять. Эти две команды полностью независимы, будут выполняться в разных потоках и ничего не знают друг о друге, даже если они выполняются практически одновременно.
Свертывание сводится к поиску подобных запросов и их объединению. Пакетирование (я буду использовать этот термин взаимозаменяемо со свертыванием ) не происходит автоматически и требует немного кодирования. Но сначала давайте посмотрим, как это ведет себя:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
def 'should collapse two commands executed concurrently for the same stock ticker' () { given: def anyTicker = TickerExamples.any() def tickers = [anyTicker] as Set and: def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker) def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker) when: Future<StockPrice> futureOne = commandOne.queue() Future<StockPrice> futureTwo = commandTwo.queue() and: futureOne. get () futureTwo. get () then: 0 * gateway.load(_) 1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any()) } def 'should collapse two commands executed concurrently for the different stock tickers' () { given: def anyTicker = TickerExamples.any() def otherTicker = TickerExamples.other() def tickers = [anyTicker, otherTicker] as Set and: def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker) def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker) when: Future<StockPrice> futureOne = commandOne.queue() Future<StockPrice> futureTwo = commandTwo.queue() and: futureOne. get () futureTwo. get () then: 1 * gateway.loadAll(tickers) >> ImmutableMap.of( anyTicker, StockPriceExamples.any(), otherTicker, StockPriceExamples.other()) } def 'should correctly map collapsed response into individual requests' () { given: def anyTicker = TickerExamples.any() def otherTicker = TickerExamples.other() def tickers = [anyTicker, otherTicker] as Set gateway.loadAll(tickers) >> ImmutableMap.of( anyTicker, StockPriceExamples.any(), otherTicker, StockPriceExamples.other()) and: def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker) def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker) when: Future<StockPrice> futureOne = commandOne.queue() Future<StockPrice> futureTwo = commandTwo.queue() and: def anyPrice = futureOne. get () def otherPrice = futureTwo. get () then: anyPrice == StockPriceExamples.any() otherPrice == StockPriceExamples.other() } |
Первый тест доказывает, что вместо вызова load()
дважды мы едва вызывали loadAll()
один раз. Также обратите внимание, что, поскольку мы запросили один и тот же Ticker
(из двух разных потоков), loadAll()
запрашивает только один тикер. Второй тест показывает, что два одновременных запроса двух разных тикеров объединяются в один пакетный вызов. Третий тест гарантирует, что мы по-прежнему получаем правильные ответы на каждый отдельный запрос. Вместо расширения HystrixCommand
мы должны расширить более сложный HystrixCollapser
. Теперь пришло время увидеть реализацию StockTickerPriceCollapsedCommand
, которая легко заменила StockPriceCommand
:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
class StockTickerPriceCollapsedCommand extends HystrixCollapser<ImmutableMap<Ticker, StockPrice>, StockPrice, Ticker> { private final StockPriceGateway gateway; private final Ticker stock; StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) { super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey( "Stock" )) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds( 100 ))); this.gateway = gateway; this.stock = stock; } @Override public Ticker getRequestArgument() { return stock; } @Override protected HystrixCommand<ImmutableMap<Ticker, StockPrice>> createCommand(Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) { final Set<Ticker> stocks = collapsedRequests.stream() .map(CollapsedRequest::getArgument) . collect (toSet()); return new StockPricesBatchCommand(gateway, stocks); } @Override protected void mapResponseToRequests(ImmutableMap<Ticker, StockPrice> batchResponse, Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) { collapsedRequests.forEach(request -> { final Ticker ticker = request.getArgument(); final StockPrice price = batchResponse. get (ticker); request.setResponse(price); }); } } |
Здесь много чего происходит, поэтому давайте рассмотрим пошагово StockTickerPriceCollapsedCommand
. Первые три универсальных типа:
-
BatchReturnType
(в нашем примереBatchReturnType
ImmutableMap<Ticker, StockPrice>
) — это тип пакетного ответа команды. Как вы увидите позже, collapser превращает несколько маленьких команд в пакетную команду. Это тип ответа этой пакетной команды. Обратите внимание, что это то же самое, что иStockPriceGateway.loadAll()
). -
ResponseType
(StockPrice
) — это тип каждой отдельной сворачиваемой команды. В нашем случае мы сворачиваемHystrixCommand<StockPrice>
. Позже мыBatchReturnType
значениеBatchReturnType
на несколькоStockPrice
. -
RequestArgumentType
(Ticker
) — это ввод каждой отдельной команды, которую мы собираемся свернуть (пакет). Когда несколько команд объединяются вместе, мы в конечном итоге заменяем их все одной пакетной командой. Эта команда должна получить все отдельные запросы для выполнения одного пакетного запроса.
withTimerDelayInMilliseconds(100)
будет объяснено в ближайшее время. createCommand()
создает пакетную команду. Эта команда должна заменить все отдельные команды и выполнить пакетную логику. В нашем случае вместо нескольких отдельных вызовов load()
мы просто делаем один:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
class StockPricesBatchCommand extends HystrixCommand<ImmutableMap<Ticker, StockPrice>> { private final StockPriceGateway gateway; private final Set<Ticker> stocks; StockPricesBatchCommand(StockPriceGateway gateway, Set<Ticker> stocks) { super(HystrixCommandGroupKey.Factory.asKey( "Stock" )); this.gateway = gateway; this.stocks = stocks; } @Override protected ImmutableMap<Ticker, StockPrice> run() throws Exception { return gateway.loadAll(stocks); } } |
Единственное различие между этим классом и StockPriceCommand
состоит в том, что он берет кучу StockPriceCommand
и возвращает цены на все из них. Hystrix соберет несколько экземпляров StockTickerPriceCollapsedCommand
и, как только этого будет достаточно (подробнее об этом позже), создаст одну StockPriceCommand
. Надеюсь, что это понятно, потому что mapResponseToRequests()
немного более вовлечен. Как только наша свернутая StockPricesBatchCommand
, мы должны как-то разделить пакетный ответ и передать ответы обратно отдельным командам, не подозревая о свертывании. С этой точки зрения реализация mapResponseToRequests()
довольно проста: мы получаем пакетный ответ и коллекцию упакованных CollapsedRequest<StockPrice, Ticker>
. Теперь мы должны перебрать все ожидающие отдельные запросы и завершить их ( setResponse()
). Если мы не выполним некоторые запросы, они будут зависать бесконечно и в конечном итоге истечет время ожидания.
Как это работает
Это подходящий момент, чтобы описать, как реализовано свертывание. Я уже говорил, что свертывание происходит, когда два запроса происходят одновременно. Там нет такой вещи, как в то же время . В действительности, когда поступает первый разборный запрос, Hystrix запускает таймер. В наших примерах мы установили его на 100 миллисекунд. В течение этого периода наша команда приостановлена, ожидая присоединения других команд. По истечении этого настраиваемого периода Hystrix вызовет createCommand()
, собрав все ключи запроса (путем вызова getRequestArgument()
) и запустив его. Когда пакетная команда завершится, она позволит нам отправлять результаты всем ожидающим отдельным командам. Также возможно ограничить количество свернутых запросов, если мы боимся создавать огромные партии — с другой стороны, сколько одновременных запросов может уместиться в этот короткий временной интервал?
Варианты использования и недостатки
Свертывание запросов следует использовать в системах с экстремальной нагрузкой — высокой частотой запросов. Если вы получаете только один запрос на сворачивающееся временное окно (100 миллисекунд в примерах), свертывание просто добавит накладные расходы. Это происходит потому, что каждый раз, когда вы вызываете свертываемую команду, она должна ждать на тот случай, если какая-то другая команда захочет присоединиться и сформировать пакет. Это имеет смысл только тогда, когда свернуты хотя бы пара команд. Время, потраченное на ожидание, компенсируется за счет экономии сетевых задержек и / или лучшего использования ресурсов нашим сотрудником (очень часто пакетные запросы выполняются намного быстрее по сравнению с отдельными вызовами). Но имейте в виду, что рушиться — это обоюдоострый меч, полезный в особых случаях.
Последнее, что нужно запомнить — для использования свертывания запросов вам нужны HystrixRequestContext.initializeContext()
и shutdown()
в блоке try-finally
:
1
2
3
4
5
6
|
HystrixRequestContext context = HystrixRequestContext.initializeContext(); try { //... } finally { context.shutdown(); } |
Свертывание против кеширования
Вы можете подумать, что коллапс можно заменить правильным кэшированием. Это неправда. Вы используете кеш, когда:
- ресурс, вероятно, будет доступен несколько раз
- мы можем безопасно использовать предыдущее значение, оно останется действительным в течение некоторого периода времени, или мы точно знаем, как его аннулировать
- мы можем позволить себе одновременные запросы на один и тот же ресурс, чтобы вычислить его несколько раз
С другой стороны, свертывание не обеспечивает локальность данных (1), оно всегда затрагивает реальный сервис и никогда не возвращает устаревшие данные (2). И, наконец, если мы запрашиваем один и тот же ресурс из нескольких потоков, мы будем вызывать службу поддержки только один раз (3). В случае кеширования, если ваш кеш не очень умный, два потока независимо обнаружат отсутствие данного ресурса в кеше и дважды запросят службу поддержки. Однако свертывание может работать вместе с кешированием — обращаясь к кешу перед выполнением команды свертывания.
Резюме
Свертывание запросов — полезный инструмент, но с очень ограниченными случаями использования. Это может значительно улучшить пропускную способность в нашей системе, а также ограничить нагрузку на внешнюю службу. Обрушение может волшебным образом сгладить пики в движении, а не распространять его повсюду. Просто убедитесь, что вы используете его для команд, работающих с предельной частотой.
Ссылка: | Пакетные (сворачивающиеся) запросы в Hystrix от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседних блогах. |