Статьи

Пакетные (сворачивающиеся) запросы в Hystrix

Hystrix имеет расширенную функцию свертывания (или пакетирования) запросов. Если две или более команд выполняют одинаковый запрос одновременно, Hystrix может объединить их вместе, выполнить один пакетный запрос и отправить результаты разделения обратно всем командам. Давайте сначала посмотрим, как Hystrix работает без разрушения. Представьте, что у нас есть сервис, который ищет StockPrice данного StockPrice :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import lombok.Value;
import java.math.BigDecimal;
import java.time.Instant;
 
@Value
class Ticker {
    String symbol;
}
 
@Value
class StockPrice {
    BigDecimal price;
    Instant effectiveTime;
}
 
interface StockPriceGateway {
 
    default StockPrice load(Ticker stock) {
        final Set<Ticker> oneTicker = Collections.singleton(stock);
        return loadAll(oneTicker).get(stock);
    }
 
    ImmutableMap<Ticker, StockPrice> loadAll(Set<Ticker> tickers);
}

Базовая реализация StockPriceGateway должна предоставлять пакетный метод loadAll() а метод load() реализован для нашего удобства. Таким образом, наш шлюз способен загружать несколько цен за один пакет (например, чтобы уменьшить задержку или издержки сетевого протокола), но в настоящее время мы не используем эту функцию, всегда загружая цену одной акции за раз:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
class StockPriceCommand extends HystrixCommand<StockPrice> {
 
    private final StockPriceGateway gateway;
    private final Ticker stock;
 
    StockPriceCommand(StockPriceGateway gateway, Ticker stock) {
        super(HystrixCommandGroupKey.Factory.asKey("Stock"));
        this.gateway = gateway;
        this.stock = stock;
    }
 
    @Override
    protected StockPrice run() throws Exception {
        return gateway.load(stock);
    }
}

Такая команда всегда будет вызывать StockPriceGateway.load() для каждого StockPriceGateway.load() , как показано в следующих тестах:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class StockPriceCommandTest extends Specification {
 
    def gateway = Mock(StockPriceGateway)
 
    def 'should fetch price from external service'() {
        given:
            gateway.load(TickerExamples.any()) >> StockPriceExamples.any()
            def command = new StockPriceCommand(gateway, TickerExamples.any())
 
        when:
            def price = command.execute()
 
        then:
            price == StockPriceExamples.any()
    }
 
    def 'should call gateway exactly once when running Hystrix command'() {
        given:
            def command = new StockPriceCommand(gateway, TickerExamples.any())
 
        when:
            command.execute()
 
        then:
            1 * gateway.load(TickerExamples.any())
    }
 
    def 'should call gateway twice when command executed two times'() {
        given:
            def commandOne = new StockPriceCommand(gateway, TickerExamples.any())
            def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())
 
        when:
            commandOne.execute()
            commandTwo.execute()
 
        then:
            2 * gateway.load(TickerExamples.any())
    }
 
    def 'should call gateway twice even when executed in parallel'() {
        given:
            def commandOne = new StockPriceCommand(gateway, TickerExamples.any())
            def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())
 
        when:
            Future<StockPrice> futureOne = commandOne.queue()
            Future<StockPrice> futureTwo = commandTwo.queue()
 
        and:
            futureOne.get()
            futureTwo.get()
 
        then:
            2 * gateway.load(TickerExamples.any())
    }
 
}

Если вы не знаете Hystrix, добавив внешний вызов в команду, вы получите множество функций, таких как тайм-ауты, автоматические выключатели и т. Д. Но эта тема не рассматривается. Посмотрите на последние два теста: при запросе цены произвольного тикера дважды, последовательно или параллельно ( queue() ) наш внешний gateway также вызывается дважды. Последний тест особенно интересен — мы просим один и тот же тикер почти в одно и то же время, но Hystrix не может этого понять. Эти две команды полностью независимы, будут выполняться в разных потоках и ничего не знают друг о друге, даже если они выполняются практически одновременно.

Свертывание сводится к поиску подобных запросов и их объединению. Пакетирование (я буду использовать этот термин взаимозаменяемо со свертыванием ) не происходит автоматически и требует немного кодирования. Но сначала давайте посмотрим, как это ведет себя:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def 'should collapse two commands executed concurrently for the same stock ticker'() {
    given:
        def anyTicker = TickerExamples.any()
        def tickers = [anyTicker] as Set
 
    and:
        def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)
        def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker)
 
    when:
        Future<StockPrice> futureOne = commandOne.queue()
        Future<StockPrice> futureTwo = commandTwo.queue()
 
    and:
        futureOne.get()
        futureTwo.get()
 
    then:
        0 * gateway.load(_)
        1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any())
}
 
def 'should collapse two commands executed concurrently for the different stock tickers'() {
    given:
        def anyTicker = TickerExamples.any()
        def otherTicker = TickerExamples.other()
        def tickers = [anyTicker, otherTicker] as Set
 
    and:
        def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)
        def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)
 
    when:
        Future<StockPrice> futureOne = commandOne.queue()
        Future<StockPrice> futureTwo = commandTwo.queue()
 
    and:
        futureOne.get()
        futureTwo.get()
 
    then:
        1 * gateway.loadAll(tickers) >> ImmutableMap.of(
                anyTicker, StockPriceExamples.any(),
                otherTicker, StockPriceExamples.other())
}
 
def 'should correctly map collapsed response into individual requests'() {
    given:
        def anyTicker = TickerExamples.any()
        def otherTicker = TickerExamples.other()
        def tickers = [anyTicker, otherTicker] as Set
        gateway.loadAll(tickers) >> ImmutableMap.of(
                anyTicker, StockPriceExamples.any(),
                otherTicker, StockPriceExamples.other())
 
    and:
        def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)
        def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)
 
    when:
        Future<StockPrice> futureOne = commandOne.queue()
        Future<StockPrice> futureTwo = commandTwo.queue()
 
    and:
        def anyPrice = futureOne.get()
        def otherPrice = futureTwo.get()
 
    then:
        anyPrice == StockPriceExamples.any()
        otherPrice == StockPriceExamples.other()
}

Первый тест доказывает, что вместо вызова load() дважды мы едва вызывали loadAll() один раз. Также обратите внимание, что, поскольку мы запросили один и тот же Ticker (из двух разных потоков), loadAll() запрашивает только один тикер. Второй тест показывает, что два одновременных запроса двух разных тикеров объединяются в один пакетный вызов. Третий тест гарантирует, что мы по-прежнему получаем правильные ответы на каждый отдельный запрос. Вместо расширения HystrixCommand мы должны расширить более сложный HystrixCollapser . Теперь пришло время увидеть реализацию StockTickerPriceCollapsedCommand , которая легко заменила StockPriceCommand :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class StockTickerPriceCollapsedCommand extends HystrixCollapser<ImmutableMap<Ticker, StockPrice>, StockPrice, Ticker> {
 
    private final StockPriceGateway gateway;
    private final Ticker stock;
 
    StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) {
        super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Stock"))
                .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
        this.gateway = gateway;
        this.stock = stock;
    }
 
    @Override
    public Ticker getRequestArgument() {
        return stock;
    }
 
    @Override
    protected HystrixCommand<ImmutableMap<Ticker, StockPrice>> createCommand(Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {
        final Set<Ticker> stocks = collapsedRequests.stream()
                .map(CollapsedRequest::getArgument)
                .collect(toSet());
        return new StockPricesBatchCommand(gateway, stocks);
    }
 
    @Override
    protected void mapResponseToRequests(ImmutableMap<Ticker, StockPrice> batchResponse, Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {
        collapsedRequests.forEach(request -> {
            final Ticker ticker = request.getArgument();
            final StockPrice price = batchResponse.get(ticker);
            request.setResponse(price);
        });
    }
 
}

Здесь много чего происходит, поэтому давайте рассмотрим пошагово StockTickerPriceCollapsedCommand . Первые три универсальных типа:

  • BatchReturnType (в нашем примере BatchReturnType ImmutableMap<Ticker, StockPrice> ) — это тип пакетного ответа команды. Как вы увидите позже, collapser превращает несколько маленьких команд в пакетную команду. Это тип ответа этой пакетной команды. Обратите внимание, что это то же самое, что и StockPriceGateway.loadAll() ).
  • ResponseType ( StockPrice ) — это тип каждой отдельной сворачиваемой команды. В нашем случае мы сворачиваем HystrixCommand<StockPrice> . Позже мы BatchReturnType значение BatchReturnType на несколько StockPrice .
  • RequestArgumentType ( Ticker ) — это ввод каждой отдельной команды, которую мы собираемся свернуть (пакет). Когда несколько команд объединяются вместе, мы в конечном итоге заменяем их все одной пакетной командой. Эта команда должна получить все отдельные запросы для выполнения одного пакетного запроса.

withTimerDelayInMilliseconds(100) будет объяснено в ближайшее время. createCommand() создает пакетную команду. Эта команда должна заменить все отдельные команды и выполнить пакетную логику. В нашем случае вместо нескольких отдельных вызовов load() мы просто делаем один:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
class StockPricesBatchCommand extends HystrixCommand<ImmutableMap<Ticker, StockPrice>> {
 
    private final StockPriceGateway gateway;
    private final Set<Ticker> stocks;
 
    StockPricesBatchCommand(StockPriceGateway gateway, Set<Ticker> stocks) {
        super(HystrixCommandGroupKey.Factory.asKey("Stock"));
        this.gateway = gateway;
        this.stocks = stocks;
    }
 
    @Override
    protected ImmutableMap<Ticker, StockPrice> run() throws Exception {
        return gateway.loadAll(stocks);
    }
}

Единственное различие между этим классом и StockPriceCommand состоит в том, что он берет кучу StockPriceCommand и возвращает цены на все из них. Hystrix соберет несколько экземпляров StockTickerPriceCollapsedCommand и, как только этого будет достаточно (подробнее об этом позже), создаст одну StockPriceCommand . Надеюсь, что это понятно, потому что mapResponseToRequests() немного более вовлечен. Как только наша свернутая StockPricesBatchCommand , мы должны как-то разделить пакетный ответ и передать ответы обратно отдельным командам, не подозревая о свертывании. С этой точки зрения реализация mapResponseToRequests() довольно проста: мы получаем пакетный ответ и коллекцию упакованных CollapsedRequest<StockPrice, Ticker> . Теперь мы должны перебрать все ожидающие отдельные запросы и завершить их ( setResponse() ). Если мы не выполним некоторые запросы, они будут зависать бесконечно и в конечном итоге истечет время ожидания.

Как это работает

Это подходящий момент, чтобы описать, как реализовано свертывание. Я уже говорил, что свертывание происходит, когда два запроса происходят одновременно. Там нет такой вещи, как в то же время . В действительности, когда поступает первый разборный запрос, Hystrix запускает таймер. В наших примерах мы установили его на 100 миллисекунд. В течение этого периода наша команда приостановлена, ожидая присоединения других команд. По истечении этого настраиваемого периода Hystrix вызовет createCommand() , собрав все ключи запроса (путем вызова getRequestArgument() ) и запустив его. Когда пакетная команда завершится, она позволит нам отправлять результаты всем ожидающим отдельным командам. Также возможно ограничить количество свернутых запросов, если мы боимся создавать огромные партии — с другой стороны, сколько одновременных запросов может уместиться в этот короткий временной интервал?

Варианты использования и недостатки

Свертывание запросов следует использовать в системах с экстремальной нагрузкой — высокой частотой запросов. Если вы получаете только один запрос на сворачивающееся временное окно (100 миллисекунд в примерах), свертывание просто добавит накладные расходы. Это происходит потому, что каждый раз, когда вы вызываете свертываемую команду, она должна ждать на тот случай, если какая-то другая команда захочет присоединиться и сформировать пакет. Это имеет смысл только тогда, когда свернуты хотя бы пара команд. Время, потраченное на ожидание, компенсируется за счет экономии сетевых задержек и / или лучшего использования ресурсов нашим сотрудником (очень часто пакетные запросы выполняются намного быстрее по сравнению с отдельными вызовами). Но имейте в виду, что рушиться — это обоюдоострый меч, полезный в особых случаях.

Последнее, что нужно запомнить — для использования свертывания запросов вам нужны HystrixRequestContext.initializeContext() и shutdown() в блоке try-finally :

1
2
3
4
5
6
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
    //...
} finally {
    context.shutdown();
}

Свертывание против кеширования

Вы можете подумать, что коллапс можно заменить правильным кэшированием. Это неправда. Вы используете кеш, когда:

  1. ресурс, вероятно, будет доступен несколько раз
  2. мы можем безопасно использовать предыдущее значение, оно останется действительным в течение некоторого периода времени, или мы точно знаем, как его аннулировать
  3. мы можем позволить себе одновременные запросы на один и тот же ресурс, чтобы вычислить его несколько раз

С другой стороны, свертывание не обеспечивает локальность данных (1), оно всегда затрагивает реальный сервис и никогда не возвращает устаревшие данные (2). И, наконец, если мы запрашиваем один и тот же ресурс из нескольких потоков, мы будем вызывать службу поддержки только один раз (3). В случае кеширования, если ваш кеш не очень умный, два потока независимо обнаружат отсутствие данного ресурса в кеше и дважды запросят службу поддержки. Однако свертывание может работать вместе с кешированием — обращаясь к кешу перед выполнением команды свертывания.

Резюме

Свертывание запросов — полезный инструмент, но с очень ограниченными случаями использования. Это может значительно улучшить пропускную способность в нашей системе, а также ограничить нагрузку на внешнюю службу. Обрушение может волшебным образом сгладить пики в движении, а не распространять его повсюду. Просто убедитесь, что вы используете его для команд, работающих с предельной частотой.

Ссылка: Пакетные (сворачивающиеся) запросы в Hystrix от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседних блогах.