Как мы уже обнаружили, flatMap()
не сохраняет порядок исходного потока. Давайте проиллюстрируем это на примере API GeoNames из предыдущей статьи :
1
2
3
4
5
|
public interface GeoNames { Flowable<Long> populationOf(String city); } |
Запрашивая население нескольких городов с помощью flatMap()
мы не гарантируем, что они будут доставлены в следующем порядке:
1
2
3
4
5
|
Flowable<String> cities = Flowable.just( "Warsaw" , "Paris" , "London" , "Madrid" ); cities .flatMap(geoNames::populationOf) .subscribe(response -> log.info( "Population: {}" , response)); |
Вывод несколько удивителен:
01
02
03
04
05
06
07
08
09
10
11
12
|
17:09:49.838 | Rx-3 | --> GET ... /searchJSON ?q=London http /1 .1 17:09:49.838 | Rx-1 | --> GET ... /searchJSON ?q=Warsaw http /1 .1 17:09:49.838 | Rx-4 | --> GET ... /searchJSON ?q=Madrid http /1 .1 17:09:49.838 | Rx-2 | --> GET ... /searchJSON ?q=Paris http /1 .1 17:09:49.939 | Rx-4 | <-- 200 OK ... /searchJSON ?q=Madrid (98ms) 17:09:49.939 | Rx-3 | <-- 200 OK ... /searchJSON ?q=London (98ms) 17:09:49.956 | Rx-3 | Population: 7556900 17:09:49.958 | Rx-3 | Population: 3255944 17:09:51.099 | Rx-2 | <-- 200 OK ... /searchJSON ?q=Paris (1258ms) 17:09:51.100 | Rx-1 | <-- 200 OK ... /searchJSON ?q=Warsaw (1259ms) 17:09:51.100 | Rx-2 | Population: 2138551 17:09:51.100 | Rx-2 | Population: 1702139 |
Через некоторое время мы получаем ответ для Мадрида, затем Лондона, который позже получает подписчик. 7556900 (население Лондона) и 3255944 (Мадрид) идут первыми. Через некоторое время прибывают также Париж и Варшава. С одной стороны, хорошо, что мы можем приступить к каждой популяции сразу после ее прибытия. Это делает систему более отзывчивой. Но мы что-то потеряли. Входным потоком были "Warsaw"
, "Paris"
, "London"
, "Madrid"
тогда как результирующий поток содержит население "London"
, "Madrid"
, "Paris"
, "Warsaw"
. Как мы можем сказать, какое число представляет какой город?
Очевидно, что следующее решение совершенно неверно , но оно не является неслыханным в реальных базах кода:
1
2
3
4
|
Flowable<Long> populations = cities.flatMap(geoNames::populationOf); cities .zipWith(populations, Pair::of) .subscribe(response -> log.info( "Population: {}" , response)); |
Он компилируется, работает, он даже дает некоторые результаты. К сожалению, эти результаты совершенно неверны:
01
02
03
04
05
06
07
08
09
10
11
12
|
17:20:03.778 | Rx-2 | --> GET ... /searchJSON ?q=Paris http /1 .1 17:20:03.778 | Rx-3 | --> GET ... /searchJSON ?q=London http /1 .1 17:20:03.778 | Rx-4 | --> GET ... /searchJSON ?q=Madrid http /1 .1 17:20:03.778 | Rx-1 | --> GET ... /searchJSON ?q=Warsaw http /1 .1 17:20:03.953 | Rx-4 | <-- 200 OK ... /searchJSON ?q=Madrid (172ms) 17:20:03.959 | Rx-2 | <-- 200 OK ... /searchJSON ?q=Paris (179ms) 17:20:03.975 | Rx-2 | Population: (Warsaw,2138551) 17:20:03.976 | Rx-2 | Population: (Paris,3255944) 17:20:03.988 | Rx-3 | <-- 200 OK ... /searchJSON ?q=London (207ms) 17:20:03.988 | Rx-3 | Population: (London,7556900) 17:20:04.080 | Rx-1 | <-- 200 OK ... /searchJSON ?q=Warsaw (299ms) 17:20:04.080 | Rx-1 | Population: (Madrid,1702139) |
Мы объединяем города с некоторой случайной перестановкой населения. Что еще хуже, мне удалось получить неправильные результаты после десятка попыток. По какой-то причине эта программа работала на моей машине большую часть времени. Худший вид ошибки, которую вы можете себе представить.
Проблема с flatMap()
заключается в том, что он теряет исходный запрос. Представьте себе асинхронную систему, в которой вы получаете ответ в какой-то очереди, но не знаете, что это был за запрос. Очевидное решение состоит в том, чтобы каким-то образом прикрепить какой-либо идентификатор корреляции или даже весь запрос к ответу. К сожалению, populationOf(String city)
не возвращает исходный запрос ( city
), только ответ ( population
). Было бы намного проще, если бы он возвращал что-то вроде CityWithPopulation
значения CityWithPopulation
или даже Pair<String, Long>
. Итак, теперь представьте, что мы улучшаем оригинальный метод, прикрепляя запрос ( city
):
1
2
3
4
|
Flowable<Pair<String, Long>> populationOfCity(String city) { Flowable<Long> population = geoNames.populationOf(city); return population.map(p -> Pair.of(city, p)); } |
Теперь мы можем воспользоваться этим методом для большого потока городов:
1
2
3
|
cities .flatMap( this ::populationOfCity) .subscribe(response -> log.info( "Population: {}" , response)); |
… или чтобы избежать дополнительного вспомогательного метода:
1
2
3
4
5
6
|
cities .flatMap(city -> geoNames .populationOf(city) .map(p -> Pair.of(city, p)) ) .subscribe(response -> log.info( "Population: {}" , response)); |
На этот раз переменная result
— Pair<String, Long>
но вам рекомендуется использовать более выразительный объект значения.
01
02
03
04
05
06
07
08
09
10
11
12
|
17:20:03.778 | Rx-2 | --> GET ... /searchJSON ?q=Paris http /1 .1 17:20:03.778 | Rx-3 | --> GET ... /searchJSON ?q=London http /1 .1 17:20:03.778 | Rx-4 | --> GET ... /searchJSON ?q=Madrid http /1 .1 17:20:03.778 | Rx-1 | --> GET ... /searchJSON ?q=Warsaw http /1 .1 17:20:03.953 | Rx-4 | <-- 200 OK ... /searchJSON ?q=Madrid (172ms) 17:20:03.959 | Rx-2 | <-- 200 OK ... /searchJSON ?q=Paris (179ms) 17:20:03.975 | Rx-2 | Population: (Paris,2138551) 17:20:03.976 | Rx-2 | Population: (Madrid,3255944) 17:20:03.988 | Rx-3 | <-- 200 OK ... /searchJSON ?q=London (207ms) 17:20:03.988 | Rx-3 | Population: (London,7556900) 17:20:04.080 | Rx-1 | <-- 200 OK ... /searchJSON ?q=Warsaw (299ms) 17:20:04.080 | Rx-1 | Population: (Warsaw,1702139) |
Я обнаружил, что flatMap()
с вложенным map()
добавляющим дополнительный контекст, является наиболее эффективным способом борьбы с flatMap()
результатами. Конечно, это не самый читаемый кусок реактивного кода, поэтому убедитесь, что вы скрываете эту сложность за некоторым фасадом.
ОБНОВИТЬ
Как отметил Давид Карнок в своем комментарии к этому посту , оператор map()
внутри flatMap()
является настолько распространенной идиомой, что существует специальная перегрузка flatMap()
. Помимо стандартной функции преобразования (в нашем случае String -> Flowable<Long>
) она также принимает би-функцию комбинатора (например, (String, Long) -> SomeType
). Цель этой функции — предоставить преобразование, которое объединяет входной элемент с каждым выходным элементом, сгенерированным преобразованием. Это именно то, что мы сделали с вложенной map()
(обогащение населения названием города, которому она соответствует), но намного короче:
1
2
|
Flowable<Pair<String, Long>> populations = cities .flatMap(city -> geoNames.populationOf(city), (city, pop) -> Pair.of(city, pop)); |
Второе лямбда-выражение ( (city, pop) -> Pair.of(city, pop)
) выполняется для каждого последующего события, генерируемого (city, pop) -> Pair.of(city, pop)
populationOf()
. Если вы идете до крайности, вы можете использовать ссылки на методы:
1
2
|
Flowable<Pair<String, Long>> populations = cities .flatMap(geoNames::populationOf, Pair::of); |
Найдите минутку, чтобы изучить последний пример, на самом деле это очень просто:
- для каждого
city
найти свое населениеpop
- для каждого населения объедините его с
city
, сформировавPair<String, Long>
PS: это был 200-й пост за 9 лет!
Ссылка: | flatMap () и порядок событий — часто задаваемые вопросы по RxJava от нашего партнера по JCG Томаша Нуркевича в блоге, посвященном Java и соседству . |