Как мы уже обнаружили, flatMap() не сохраняет порядок исходного потока. Давайте проиллюстрируем это на примере API GeoNames из предыдущей статьи :
|
1
2
3
4
5
|
public interface GeoNames { Flowable<Long> populationOf(String city); } |
Запрашивая население нескольких городов с помощью flatMap() мы не гарантируем, что они будут доставлены в следующем порядке:
|
1
2
3
4
5
|
Flowable<String> cities = Flowable.just("Warsaw", "Paris", "London", "Madrid"); cities .flatMap(geoNames::populationOf) .subscribe(response -> log.info("Population: {}", response)); |
Вывод несколько удивителен:
|
01
02
03
04
05
06
07
08
09
10
11
12
|
17:09:49.838 | Rx-3 | --> GET .../searchJSON?q=London http/1.117:09:49.838 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.117:09:49.838 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.117:09:49.838 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.117:09:49.939 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (98ms)17:09:49.939 | Rx-3 | <-- 200 OK .../searchJSON?q=London (98ms)17:09:49.956 | Rx-3 | Population: 755690017:09:49.958 | Rx-3 | Population: 325594417:09:51.099 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (1258ms)17:09:51.100 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (1259ms)17:09:51.100 | Rx-2 | Population: 213855117:09:51.100 | Rx-2 | Population: 1702139 |
Через некоторое время мы получаем ответ для Мадрида, затем Лондона, который позже получает подписчик. 7556900 (население Лондона) и 3255944 (Мадрид) идут первыми. Через некоторое время прибывают также Париж и Варшава. С одной стороны, хорошо, что мы можем приступить к каждой популяции сразу после ее прибытия. Это делает систему более отзывчивой. Но мы что-то потеряли. Входным потоком были "Warsaw" , "Paris" , "London" , "Madrid" тогда как результирующий поток содержит население "London" , "Madrid" , "Paris" , "Warsaw" . Как мы можем сказать, какое число представляет какой город?
Очевидно, что следующее решение совершенно неверно , но оно не является неслыханным в реальных базах кода:
|
1
2
3
4
|
Flowable<Long> populations = cities.flatMap(geoNames::populationOf);cities .zipWith(populations, Pair::of) .subscribe(response -> log.info("Population: {}", response)); |
Он компилируется, работает, он даже дает некоторые результаты. К сожалению, эти результаты совершенно неверны:
|
01
02
03
04
05
06
07
08
09
10
11
12
|
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.117:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.117:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.117:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.117:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)17:20:03.975 | Rx-2 | Population: (Warsaw,2138551)17:20:03.976 | Rx-2 | Population: (Paris,3255944)17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)17:20:03.988 | Rx-3 | Population: (London,7556900)17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)17:20:04.080 | Rx-1 | Population: (Madrid,1702139) |
Мы объединяем города с некоторой случайной перестановкой населения. Что еще хуже, мне удалось получить неправильные результаты после десятка попыток. По какой-то причине эта программа работала на моей машине большую часть времени. Худший вид ошибки, которую вы можете себе представить.
Проблема с flatMap() заключается в том, что он теряет исходный запрос. Представьте себе асинхронную систему, в которой вы получаете ответ в какой-то очереди, но не знаете, что это был за запрос. Очевидное решение состоит в том, чтобы каким-то образом прикрепить какой-либо идентификатор корреляции или даже весь запрос к ответу. К сожалению, populationOf(String city) не возвращает исходный запрос ( city ), только ответ ( population ). Было бы намного проще, если бы он возвращал что-то вроде CityWithPopulation значения CityWithPopulation или даже Pair<String, Long> . Итак, теперь представьте, что мы улучшаем оригинальный метод, прикрепляя запрос ( city ):
|
1
2
3
4
|
Flowable<Pair<String, Long>> populationOfCity(String city) { Flowable<Long> population = geoNames.populationOf(city); return population.map(p -> Pair.of(city, p));} |
Теперь мы можем воспользоваться этим методом для большого потока городов:
|
1
2
3
|
cities .flatMap(this::populationOfCity) .subscribe(response -> log.info("Population: {}", response)); |
… или чтобы избежать дополнительного вспомогательного метода:
|
1
2
3
4
5
6
|
cities .flatMap(city -> geoNames .populationOf(city) .map(p -> Pair.of(city, p)) ) .subscribe(response -> log.info("Population: {}", response)); |
На этот раз переменная result — Pair<String, Long> но вам рекомендуется использовать более выразительный объект значения.
|
01
02
03
04
05
06
07
08
09
10
11
12
|
17:20:03.778 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.117:20:03.778 | Rx-3 | --> GET .../searchJSON?q=London http/1.117:20:03.778 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.117:20:03.778 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.117:20:03.953 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (172ms)17:20:03.959 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (179ms)17:20:03.975 | Rx-2 | Population: (Paris,2138551)17:20:03.976 | Rx-2 | Population: (Madrid,3255944)17:20:03.988 | Rx-3 | <-- 200 OK .../searchJSON?q=London (207ms)17:20:03.988 | Rx-3 | Population: (London,7556900)17:20:04.080 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (299ms)17:20:04.080 | Rx-1 | Population: (Warsaw,1702139) |
Я обнаружил, что flatMap() с вложенным map() добавляющим дополнительный контекст, является наиболее эффективным способом борьбы с flatMap() результатами. Конечно, это не самый читаемый кусок реактивного кода, поэтому убедитесь, что вы скрываете эту сложность за некоторым фасадом.
ОБНОВИТЬ
Как отметил Давид Карнок в своем комментарии к этому посту , оператор map() внутри flatMap() является настолько распространенной идиомой, что существует специальная перегрузка flatMap() . Помимо стандартной функции преобразования (в нашем случае String -> Flowable<Long> ) она также принимает би-функцию комбинатора (например, (String, Long) -> SomeType ). Цель этой функции — предоставить преобразование, которое объединяет входной элемент с каждым выходным элементом, сгенерированным преобразованием. Это именно то, что мы сделали с вложенной map() (обогащение населения названием города, которому она соответствует), но намного короче:
|
1
2
|
Flowable<Pair<String, Long>> populations = cities .flatMap(city -> geoNames.populationOf(city), (city, pop) -> Pair.of(city, pop)); |
Второе лямбда-выражение ( (city, pop) -> Pair.of(city, pop) ) выполняется для каждого последующего события, генерируемого (city, pop) -> Pair.of(city, pop) populationOf() . Если вы идете до крайности, вы можете использовать ссылки на методы:
|
1
2
|
Flowable<Pair<String, Long>> populations = cities .flatMap(geoNames::populationOf, Pair::of); |
Найдите минутку, чтобы изучить последний пример, на самом деле это очень просто:
- для каждого
cityнайти свое населениеpop - для каждого населения объедините его с
city, сформировавPair<String, Long>
PS: это был 200-й пост за 9 лет!
| Ссылка: | flatMap () и порядок событий — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге, посвященном Java и соседству . |