Это продолжение двух других постов — Мотивация, почему что-то вроде Hystrix требуется в распределенных системах, и базовое введение в Hystrix .
Это будет завершение моего путешествия по Hystrix с деталями различных свойств, которые можно изменить, чтобы изменить поведение Hystrix, и затронет несколько передовых концепций.
Тонкое поведение Hystrix
Конфигурация Hystrix объясняется в этой вики, здесь кратко две широкие группы управляют свойствами Hystrix,
1. Свойства команды
2. Свойства ThreadPool
Свойства следуют порядку приоритета, который объясняется в вики, здесь я сконцентрируюсь на свойствах, указанных в файле свойств.
Для примера команды определен следующий способ:
public class HelloWorldCommand extends HystrixCommand<String> { private static final Logger logger = LoggerFactory.getLogger(HelloWorldCommand.class); private final String name; public HelloWorldCommand(String name) { super(HystrixCommandGroupKey.Factory.asKey("default")); this.name = name; } protected String run() throws Exception { logger.info("HelloWorld Command Invoked"); return "Hello " + name; }}
Первое поведение, которое можно настроить, — выполнить ли команду в пуле потоков или в том же потоке выполнения, что и вызывающая сторона (тип стратегии SEMAPHORE). Если выполнение находится в пуле потоков, то может быть установлено время ожидания для запроса.
hystrix.command.HelloWorldCommand.execution.isolation.strategy=THREADhystrix.command.HelloWorldCommand.execution.isolation.thread.timeoutInMilliseconds=1000
Второе поведение — автоматический выключатель, который работает на основе информации, собранной за скользящее окно времени, настроенной таким образом, скажем, в течение 10 секунд:
hystrix.command.HelloWorldCommand.metrics.rollingStats.timeInMilliseconds=10000
В этом окне, если определенный процент отказов (скажем, 50%) происходит для порога запросов (скажем, 20 за 10 секунд), то цепь разрывается, с конфигурацией, которая выглядит следующим образом:
hystrix.command.HelloWorldCommand.circuitBreaker.requestVolumeThreshold=20hystrix.command.HelloWorldCommand.circuitBreaker.errorThresholdPercentage=50
Как только цепь разорвана, она остается такой в течение времени, установленного следующим образом, в данном случае 5 секунд:
hystrix.command.HelloWorldCommand.circuitBreaker.sleepWindowInMilliseconds=5000
Настройки пула потоков управляются с помощью указанного ключа группы, который в этом примере называется default. Определенный «ключ пула потоков» также мог быть указан как часть конструктора.
hystrix.threadpool.default.coreSize=10hystrix.threadpool.default.queueSizeRejectionThreshold=5
Здесь 10 команд могут потенциально выполняться параллельно, а еще 5 — в очереди, после которой запросы будут отклонены.
Запрос рушится
Томаз Нуркевич в своем блоге NoBlogDefFound проделал отличную работу по объяснению свертывания запросов . Мой пример немного упрощен, рассмотрим случай, когда делается много запросов на получение Person с данным идентификатором, следующим образом:
public class PersonService { public Person findPerson(Integer id) { return new Person(id, "name : " + id); } public List<Person> findPeople(List<Integer> ids) { return ids .stream() .map(i -> new Person(i, "name : " + i)) .collect(Collectors.toList()); }}
Служба отвечает постоянным ответом, но предполагает, что вызов был удаленным хранилищем данных. Также обратите внимание, что этот сервис реализует пакетный метод для получения списка людей, которым дан список идентификаторов.
Свертывание запросов — это функция, которая объединяет несколько пользовательских запросов, происходящих в течение определенного периода времени, в один такой удаленный вызов, а затем выводит ответ обратно пользователю.
Команда Hystrix, которая принимает набор идентификаторов и получает ответ от людей, может быть определена следующим образом:
public class PersonRequestCommand extends HystrixCommand<List<Person>>{ private final List<Integer> ids; private final PersonService personService = new PersonService(); private static final Logger logger = LoggerFactory.getLogger(PersonRequestCommand.class); public PersonRequestCommand(List<Integer> ids) { super(HystrixCommandGroupKey.Factory.asKey("default")); this.ids = ids; } protected List<Person> run() throws Exception { logger.info("Retrieving details for : " + this.ids); return personService.findPeople(this.ids); }}
Довольно простой до этого момента, сложная логика теперь в RequestCollapser, который выглядит следующим образом:
package aggregate.commands.collapsed;import com.netflix.hystrix.HystrixCollapser;import com.netflix.hystrix.HystrixCollapserKey;import com.netflix.hystrix.HystrixCollapserProperties;import com.netflix.hystrix.HystrixCommand;import java.util.Collection;import java.util.List;import java.util.Map;import java.util.function.Function;import java.util.stream.Collectors;public class PersonRequestCollapser extends HystrixCollapser<List<Person>, Person, Integer> { private final Integer id; public PersonRequestCollapser(Integer id) { super(Setter. withCollapserKey(HystrixCollapserKey.Factory.asKey("personRequestCollapser")) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(2000))); this.id = id; } public Integer getRequestArgument() { return this.id; } protected HystrixCommand<List<Person>> createCommand(Collection<CollapsedRequest<Person, Integer>> collapsedRequests) { List<Integer> ids = collapsedRequests.stream().map(cr -> cr.getArgument()).collect(Collectors.toList()); return new PersonRequestCommand(ids); } protected void mapResponseToRequests(List<Person> batchResponse, Collection<CollapsedRequest<Person, Integer>> collapsedRequests) { Map<Integer, Person> personMap = batchResponse.stream().collect(Collectors.toMap(Person::getId, Function.identity())); for (CollapsedRequest<Person, Integer> cr: collapsedRequests) { cr.setResponse(personMap.get(cr.getArgument())); } }}
Здесь происходит несколько вещей: во-первых, типы в параметризованной сигнатуре типа указывают тип ответа (List <Person>), тип ответа, ожидаемый вызывающей стороной (Person), и тип запроса запроса (id человек). Затем есть два метода: один для создания пакетной команды, а второй — для сопоставления ответов с исходными запросами.
Теперь, учитывая это с точки зрения пользователей, ничего особенного не меняется, вызов делается так, как если бы он был одной командой, а запрос свертывания обрабатывает пакетирование, диспетчеризацию и сопоставление ответов. Вот как выглядит пример теста:
void testCollapse() throws Exception { HystrixRequestContext requestContext = HystrixRequestContext.initializeContext(); logger.info("About to execute Collapsed command"); List<Observable<Person>> result = new ArrayList<>(); CountDownLatch cl = new CountDownLatch(1); for (int i = 1; i <= 100; i++) { result.add(new PersonRequestCollapser(i).observe()); } Observable.merge(result).subscribe(p -> logger.info(p.toString()) , t -> logger.error(t.getMessage(), t) , () -> cl.countDown()); cl.await(); logger.info("Completed executing Collapsed Command"); requestContext.shutdown();}
Вывод
В Hystrix есть гораздо больше, чем я описал здесь. Это действительно потрясающая библиотека, необходимая для создания отказоустойчивой системы, и я пришел к выводу о том, сколько мыслительного процесса ушло на разработку этой превосходной библиотеки.
Ссылка
Вот мой репозиторий github со всеми образцами — https://github.com/bijukunjummen/hystrixdemo