Статьи

flatMap () и порядок событий — RxJava FAQ

Как мы уже обнаружили, flatMap() не сохраняет порядок исходного потока. Давайте проиллюстрируем это на примере API GeoNames из предыдущей статьи :

1
2
3
4
5
public interface GeoNames {
  
    Flowable<Long> populationOf(String city);
  
}

Запрашивая население нескольких городов с помощью flatMap() мы не гарантируем, что они будут доставлены в следующем порядке:

1
2
3
4
5
Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid");
  
cities
        .flatMap(geoNames::populationOf)
        .subscribe(response -> log.info("Population: {}", response));

Вывод несколько удивителен:

01
02
03
04
05
06
07
08
09
10
11
12
17:09:49.838 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:09:49.838 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:09:49.838 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:09:49.838 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:09:49.939 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (98ms)
17:09:49.939 | Rx-3 | <-- 200 OK .../searchJSON?q=London (98ms)
17:09:49.956 | Rx-3 | Population: 7556900
17:09:49.958 | Rx-3 | Population: 3255944
17:09:51.099 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (1258ms)
17:09:51.100 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (1259ms)
17:09:51.100 | Rx-2 | Population: 2138551
17:09:51.100 | Rx-2 | Population: 1702139

Через некоторое время мы получаем ответ для Мадрида, затем Лондона, который позже получает подписчик. 7556900 (население Лондона) и 3255944 (Мадрид) идут первыми. Через некоторое время прибывают также Париж и Варшава. С одной стороны, хорошо, что мы можем приступить к каждой популяции сразу после ее прибытия. Это делает систему более отзывчивой. Но мы что-то потеряли. Входным потоком были "Warsaw" , "Paris" , "London" , "Madrid" тогда как результирующий поток содержит население "London" , "Madrid" , "Paris" , "Warsaw" . Как мы можем сказать, какое число представляет какой город?

Очевидно, что следующее решение совершенно неверно , но оно не является неслыханным в реальных базах кода:

1
2
3
4
Flowable<Long> populations = cities.flatMap(geoNames::populationOf);
cities
        .zipWith(populations, Pair::of)
        .subscribe(response -> log.info("Population: {}", response));

Он компилируется, работает, он даже дает некоторые результаты. К сожалению, эти результаты совершенно неверны:

01
02
03
04
05
06
07
08
09
10
11
12
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)
17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)
17:20:03.975 | Rx-2 | Population: (Warsaw,2138551)
17:20:03.976 | Rx-2 | Population: (Paris,3255944)
17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)
17:20:03.988 | Rx-3 | Population: (London,7556900)
17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)
17:20:04.080 | Rx-1 | Population: (Madrid,1702139)

Мы объединяем города с некоторой случайной перестановкой населения. Что еще хуже, мне удалось получить неправильные результаты после десятка попыток. По какой-то причине эта программа работала на моей машине большую часть времени. Худший вид ошибки, которую вы можете себе представить.

Проблема с flatMap() заключается в том, что он теряет исходный запрос. Представьте себе асинхронную систему, в которой вы получаете ответ в какой-то очереди, но не знаете, что это был за запрос. Очевидное решение состоит в том, чтобы каким-то образом прикрепить какой-либо идентификатор корреляции или даже весь запрос к ответу. К сожалению, populationOf(String city) не возвращает исходный запрос ( city ), только ответ ( population ). Было бы намного проще, если бы он возвращал что-то вроде CityWithPopulation значения CityWithPopulation или даже Pair<String, Long> . Итак, теперь представьте, что мы улучшаем оригинальный метод, прикрепляя запрос ( city ):

1
2
3
4
Flowable<Pair<String, Long>> populationOfCity(String city) {
    Flowable<Long> population = geoNames.populationOf(city);
    return population.map(p -> Pair.of(city, p));
}

Теперь мы можем воспользоваться этим методом для большого потока городов:

1
2
3
cities
        .flatMap(this::populationOfCity)
        .subscribe(response -> log.info("Population: {}", response));

… или чтобы избежать дополнительного вспомогательного метода:

1
2
3
4
5
6
cities
        .flatMap(city -> geoNames
                .populationOf(city)
                .map(p -> Pair.of(city, p))
        )
        .subscribe(response -> log.info("Population: {}", response));

На этот раз переменная resultPair<String, Long> но вам рекомендуется использовать более выразительный объект значения.

01
02
03
04
05
06
07
08
09
10
11
12
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
17:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
17:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
17:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
17:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)
17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)
17:20:03.975 | Rx-2 | Population: (Paris,2138551)
17:20:03.976 | Rx-2 | Population: (Madrid,3255944)
17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)
17:20:03.988 | Rx-3 | Population: (London,7556900)
17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)
17:20:04.080 | Rx-1 | Population: (Warsaw,1702139)

Я обнаружил, что flatMap() с вложенным map() добавляющим дополнительный контекст, является наиболее эффективным способом борьбы с flatMap() результатами. Конечно, это не самый читаемый кусок реактивного кода, поэтому убедитесь, что вы скрываете эту сложность за некоторым фасадом.

ОБНОВИТЬ

Как отметил Давид Карнок в своем комментарии к этому посту , оператор map() внутри flatMap() является настолько распространенной идиомой, что существует специальная перегрузка flatMap() . Помимо стандартной функции преобразования (в нашем случае String -> Flowable<Long> ) она также принимает би-функцию комбинатора (например, (String, Long) -> SomeType ). Цель этой функции — предоставить преобразование, которое объединяет входной элемент с каждым выходным элементом, сгенерированным преобразованием. Это именно то, что мы сделали с вложенной map() (обогащение населения названием города, которому она соответствует), но намного короче:

1
2
Flowable<Pair<String, Long>> populations = cities
        .flatMap(city -> geoNames.populationOf(city), (city, pop) -> Pair.of(city, pop));

Второе лямбда-выражение ( (city, pop) -> Pair.of(city, pop) ) выполняется для каждого последующего события, генерируемого (city, pop) -> Pair.of(city, pop) populationOf() . Если вы идете до крайности, вы можете использовать ссылки на методы:

1
2
Flowable<Pair<String, Long>> populations = cities
        .flatMap(geoNames::populationOf, Pair::of);

Найдите минутку, чтобы изучить последний пример, на самом деле это очень просто:

  • для каждого city найти свое население pop
  • для каждого населения объедините его с city , сформировав Pair<String, Long>

PS: это был 200-й пост за 9 лет!

Ссылка: flatMap () и порядок событий — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге, посвященном Java и соседству .