Статьи

Spring, Reactor и ElasticSearch: бечмаркинг с поддельными данными испытаний

В предыдущей статье мы создали простой адаптер от API ElasticSearch к Reactor’s Mono , который выглядит следующим образом:

1
2
3
4
5
import reactor.core.publisher.Mono;
  
private Mono indexDoc(Doc doc) {
    //...
}

Теперь мы хотели бы запустить этот метод на уровне управляемого параллелизма, миллионы раз. По сути, мы хотим посмотреть, как наш индексный код ведет себя под нагрузкой, сравнить его.

Поддельные данные с jFairy

Во-первых, нам нужны хорошие тестовые данные. Для этой цели мы будем использовать удобную библиотеку jFairy . Документ, который мы будем индексировать, является простым POJO:

1
2
3
4
5
@Value
class Doc {
    private final String username;
    private final String json;
}

Логика генерации заключена в класс Java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import io.codearte.jfairy.Fairy;
import io.codearte.jfairy.producer.person.Address;
import io.codearte.jfairy.producer.person.Person;
import org.apache.commons.lang3.RandomUtils;
  
  
@Component
class PersonGenerator {
  
    private final ObjectMapper objectMapper;
    private final Fairy fairy;
  
    private Doc generate() {
        Person person = fairy.person();
        final String username = person.getUsername() + RandomUtils.nextInt(1_000_000, 9_000_000);
        final ImmutableMap<String, Object> map = ImmutableMap.<String, Object>builder()
                .put("address", toMap(person.getAddress()))
                .put("firstName", person.getFirstName())
                .put("middleName", person.getMiddleName())
                .put("lastName", person.getLastName())
                .put("email", person.getEmail())
                .put("companyEmail", person.getCompanyEmail())
                .put("username", username)
                .put("password", person.getPassword())
                .put("sex", person.getSex())
                .put("telephoneNumber", person.getTelephoneNumber())
                .put("dateOfBirth", person.getDateOfBirth())
                .put("company", person.getCompany())
                .put("nationalIdentityCardNumber", person.getNationalIdentityCardNumber())
                .put("nationalIdentificationNumber", person.getNationalIdentificationNumber())
                .put("passportNumber", person.getPassportNumber())
                .build();
        final String json = objectMapper.writeValueAsString(map);
        return new Doc(username, json);
    }
  
    private ImmutableMap<String, Object> toMap(Address address) {
        return ImmutableMap.<String, Object>builder()
                .put("street", address.getStreet())
                .put("streetNumber", address.getStreetNumber())
                .put("apartmentNumber", address.getApartmentNumber())
                .put("postalCode", address.getPostalCode())
                .put("city", address.getCity())
                .put("lines", Arrays.asList(address.getAddressLine1(), address.getAddressLine2()))
                .build();
    }
  
}

Довольно скучный код, который на самом деле делает что-то классное. Каждый раз, когда мы запускаем его, он генерирует случайный, но разумный JSON, например:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
  "address": {
    "street": "Ford Street",
    "streetNumber": "32",
    "apartmentNumber": "",
    "postalCode": "63913",
    "city": "San Francisco",
    "lines": [
      "32 Ford Street",
      "San Francisco 63913"
    ]
  },
  "firstName": "Evelyn",
  "middleName": "",
  "lastName": "Pittman",
  "email": "[email protected]",
  "companyEmail": "[email protected]",
  "username": "epittman5795354",
  "password": "VpEfFmzG",
  "sex": "FEMALE",
  "telephoneNumber": "368-005-109",
  "dateOfBirth": "1917-05-14T16:47:06.273Z",
  "company": {
    "name": "Woods LLC",
    "domain": "woodsllc.eu",
    "email": "[email protected]",
    "vatIdentificationNumber": "30-0005081",
    "url": "http://www.woodsllc.eu"
  },
  "nationalIdentityCardNumber": "713-79-5185",
  "nationalIdentificationNumber": "",
  "passportNumber": "jVeyZLSt3"
}

Ухоженная! К сожалению, не задокументировано, является ли jFairy поточно-ориентированным, поэтому на всякий случай в реальном коде я использую ThreadLocal . Итак, у нас есть один документ, но нам нужны миллионы! Использование for -loop очень старомодно. Что вы скажете о бесконечном потоке случайных людей?

01
02
03
04
05
06
07
08
09
10
11
12
13
14
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
  
private final Scheduler scheduler = Schedulers.newParallel(PersonGenerator.class.getSimpleName());
  
Mono<Doc> generateOne() {
    return Mono
            .fromCallable(this::generate)
            .subscribeOn(scheduler);
}
  
Flux<Doc> infinite() {
    return generateOne().repeat();
}

generateOne() оборачивает блокирующий метод generate() в Mono<Doc> . Кроме того, generate() запускается на parallel Scheduler . Почему? Оказалось, что jFairy не был достаточно быстр на одном ядре (много случайных чисел, поиск таблиц и т. Д.), Поэтому мне пришлось распараллеливать генерацию данных. Обычно не должно быть проблемой. Но когда генерация поддельных данных происходит медленнее, чем ваше реагирующее приложение, которое касается внешнего сервера, оно говорит вам о производительности Net-based Spring web-flux (!)

Вызов ElasticSearch одновременно

Хорошо, имея бесконечный поток хороших тестовых данных, мы теперь хотим проиндексировать их в ElasticSearch.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
@PostConstruct
void startIndexing() {
    index(1_000_000, 1_000);
}
  
private void index(int count, int maxConcurrency) {
    personGenerator
            .infinite()
            .take(count)
            .flatMap(this::indexDocSwallowErrors, maxConcurrency)
            .window(Duration.ofSeconds(1))
            .flatMap(Flux::count)
            .subscribe(winSize -> log.debug("Got {} responses in last second", winSize));
}
  
private Mono<IndexResponse> indexDocSwallowErrors(Doc doc) {
    return indexDoc(doc)
            .doOnError(e -> log.error("Unable to index {}", doc, e))
            .onErrorResume(e -> Mono.empty());
}

При запуске приложения начинается индексация 1 миллиона документов. Обратите внимание, как легко сообщить Reactor (то же самое для RxJava), что он должен вызывать до тысячи одновременных запросов к ElasticSearch. Раз в секунду мы подсчитываем, сколько ответов мы получили:

1
2
3
4
5
Got 2925 responses in last second
Got 2415 responses in last second
Got 3336 responses in last second
Got 2199 responses in last second
Got 1861 responses in last second

Неплохо! Особенно, если учесть, что существует до тысячи одновременных HTTP-запросов, и наше приложение запустило всего лишь 30 потоков (хорошо). Хорошо, это localhost <-> localhost , виновен! Но как мы на самом деле знаем все это? Ведение журнала это хорошо, но это XXI век, мы можем сделать лучше! Мониторинг станет предметом следующего взноса.

Исходный код доступен по адресу github.com/nurkiewicz/elastic-flux в разделе « reactive-elastic-search .

Опубликовано на Java Code Geeks с разрешения Томаша Нуркевича, партнера нашей программы JCG. См. Оригинальную статью здесь: Spring, Reactor и ElasticSearch: бечмаркинг с поддельными данными испытаний

Мнения, высказанные участниками Java Code Geeks, являются их собственными.