Статьи

flatMap () против concatMap () против concatMapEager () — RxJava FAQ

В RxJava 2.x есть три одинаково схожих оператора: flatMap() , concatMap() и concatMapEager() . Все они принимают один и тот же аргумент — функцию от отдельного элемента исходного потока до (под) потока произвольного типа. Другими словами, если у вас есть Flowable<T> вы предоставляете функцию от T до Flowable<R> для произвольного типа R После применения любого из этих операторов вы получите Flowable<R> . Так чем они отличаются?

Пример проекта

Сначала давайте создадим пример приложения. Мы будем использовать клиентскую оболочку Retrofit2 HTTP со встроенными плагинами для RxJava2. Наша задача — использовать API GeoNames , чтобы найти население любого города в мире. Интерфейс выглядит следующим образом:

1
2
3
4
5
public interface GeoNames {
  
    Flowable<Long> populationOf(String city);
  
}

Реализация этого интерфейса создается автоматически Retrofit, прокрутите вниз, чтобы увидеть исходный код клея. Пока предположим, что у нас есть функция, которая принимает String с именем города и асинхронно возвращает поток из одного элемента с населением этого города. Также предположим, что у нас есть фиксированный поток городов, которые мы хотим посмотреть:

1
2
3
Flowable<String> cities = Flowable.just(
    "Warsaw", "Paris", "London", "Madrid"
);

Наша цель — собрать население каждого города.

concatMap() : последовательно обрабатывать

Пример приложения с concatMap() выглядит следующим образом:

1
2
3
cities
        .concatMap(geoNames::populationOf)
        .subscribe(response -> log.info("Population: {}", response));

Прежде чем мы увидим результат, давайте изучим, что concatMap() под ним. Для каждого вышестоящего события ( города ) он вызывает функцию, которая заменяет это событие (под) потоком. В нашем случае это одноэлементный поток Long ( Flowable<Long> ). Таким образом, со всеми операторами, которые мы сравниваем, мы получаем поток потоков Long ( Flowable<Flowable<Long>> ). Реальная разница возникает, когда мы анализируем, что делает оператор, чтобы сгладить такой вложенный поток.

concatMap() сначала подпишется на самый первый подпоток ( Flowable<Long> представляющий население Варшавы). Под подпиской мы фактически подразумеваем физический HTTP-вызов. Только когда первый подпоток завершается (в нашем случае выдает один Long и сигнализирует о завершении) concatMap() будет продолжаться. Продолжение означает подписку на второй подпоток и ожидание его завершения. Результирующий поток завершается, когда завершается самый последний подпоток. Это приводит к следующему потоку: 1702139, 2138551, 7556900 и 3255944. Это, соответственно, популяции Варшавы, Парижа, Лондона и Мадрида. Порядок вывода полностью предсказуем. Однако это также совершенно последовательно. Никакого параллелизма не происходит вообще, мы делаем второй HTTP-вызов только после завершения первого. Дополнительная сложность RxJava вообще не окупается:

01
02
03
04
05
06
07
08
09
10
11
12
23:33:33.531 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
23:33:33.656 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (123ms)
23:33:33.674 | Rx-1 | Population: 1702139
23:33:33.676 | Rx-1 | --> GET .../searchJSON?q=Paris http/1.1
23:33:33.715 | Rx-1 | <-- 200 OK .../searchJSON?q=Paris (38ms)
23:33:33.715 | Rx-1 | Population: 2138551
23:33:33.716 | Rx-1 | --> GET .../searchJSON?q=London http/1.1
23:33:33.754 | Rx-1 | <-- 200 OK .../searchJSON?q=London (37ms)
23:33:33.754 | Rx-1 | Population: 7556900
23:33:33.755 | Rx-1 | --> GET .../searchJSON?q=Madrid http/1.1
23:33:33.795 | Rx-1 | <-- 200 OK .../searchJSON?q=Madrid (40ms)
23:33:33.796 | Rx-1 | Population: 3255944

Как вы можете видеть, многопоточность не происходит, запросы являются последовательными, ожидая друг друга. Технически не все из них должны происходить в одном потоке, но они никогда не пересекаются и не используют преимущества параллелизма. Большой плюс — это гарантированный порядок результирующих событий, который не так очевиден, как только мы перейдем к flatMap()

flatMap() : обработка результатов на лету, не в порядке

flatMap() практически такой же:

1
2
3
cities
        .flatMap(geoNames::populationOf)
        .subscribe(response -> log.info("Population: {}", response));

И так же, как прежде, чем мы начнем с потока потоков Long ( Flowable<Flowable<Long>> ). Однако вместо того, чтобы подписываться на каждый подпоток один за другим, оператор flatMap() охотно подписывается на все подпотоки одновременно. Это означает, что мы видим несколько HTTP-запросов, инициируемых одновременно в разных потоках:

01
02
03
04
05
06
07
08
09
10
11
12
00:10:04.919 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:10:04.919 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:10:04.919 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:10:04.919 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:10:05.449 | Rx-3 | <-- 200 OK .../searchJSON (529ms)
00:10:05.462 | Rx-3 | Population: 7556900
00:10:05.477 | Rx-1 | <-- 200 OK .../searchJSON (557ms)
00:10:05.478 | Rx-1 | Population: 1702139
00:10:05.751 | Rx-4 | <-- 200 OK .../searchJSON (831ms)
00:10:05.752 | Rx-4 | Population: 3255944
00:10:05.841 | Rx-2 | <-- 200 OK .../searchJSON (922ms)
00:10:05.843 | Rx-2 | Population: 2138551

Когда какой-либо из нижележащих подпотоков испускает какое-либо значение, оно немедленно передается абоненту в нисходящем направлении. Это означает, что теперь мы можем обрабатывать события «на лету» по мере их возникновения. Обратите внимание, что результирующий поток вышел из строя. Первое событие, которое мы получили, это 7556900, которое, как оказалось, является населением Лондона, второе в первоначальном потоке. В отличие от concatMap() , flatMap() не может сохранить порядок, поэтому выдает значения в «случайном» порядке. Ну, не совсем случайно, мы просто получаем значения, как только они становятся доступны. В этом конкретном исполнении HTTP-ответ для Лондона появился первым, но на это нет абсолютно никаких гарантий. Это приводит к интересной проблеме. У нас есть поток различных ценностей населения и первоначальный поток городов. Однако выходной поток может быть произвольной перестановкой событий, и мы не знаем, какое население соответствует какому городу. Мы рассмотрим эту проблему в следующей статье.

concatMapEager() : одновременный, по порядку, но несколько дорогой

concatMapEager() кажется, приносит лучшее из обоих миров: параллелизм и гарантированный порядок выходных событий:

1
2
3
cities
        .concatMapEager(geoNames::populationOf)
        .subscribe(response -> log.info("Population: {}", response));

Изучив, что concatMap() и flatMap() , понять concatMapEager() довольно просто. Имея поток потоков concatMapEager() охотно ( duh! ) Подписывается на все подпотоки одновременно, одновременно. Однако этот оператор гарантирует, что результаты первого подпотока распространяются первыми, даже если он не завершен первым. Пример быстро покажет, что это значит:

01
02
03
04
05
06
07
08
09
10
11
12
00:34:18.371 | Rx-2 | --> GET .../searchJSON?q=Paris http/1.1
00:34:18.371 | Rx-3 | --> GET .../searchJSON?q=London http/1.1
00:34:18.371 | Rx-4 | --> GET .../searchJSON?q=Madrid http/1.1
00:34:18.371 | Rx-1 | --> GET .../searchJSON?q=Warsaw http/1.1
00:34:18.517 | Rx-3 | <-- 200 OK .../searchJSON?q=London (143ms)
00:34:18.563 | Rx-1 | <-- 200 OK .../searchJSON?q=Warsaw (189ms)
00:34:18.565 | Rx-1 | Population: 1702139
00:34:20.460 | Rx-2 | <-- 200 OK .../searchJSON?q=Paris (2086ms)
00:34:20.460 | Rx-4 | <-- 200 OK .../searchJSON?q=Madrid (2086ms)
00:34:20.461 | Rx-2 | Population: 2138551
00:34:20.462 | Rx-2 | Population: 7556900
00:34:20.462 | Rx-2 | Population: 3255944

Мы инициируем четыре HTTP-запроса мгновенно. Из выходных данных журнала мы ясно видим, что население Лондона было возвращено первым. Однако абонент не получил его, потому что население Варшавы еще не прибыло. По стечению обстоятельств Варшава заняла второе место, поэтому на данный момент население Варшавы может быть передано абоненту вниз по течению. К сожалению, население Лондона должно ждать еще больше, потому что сначала нам нужно население Парижа. Как только Париж (сразу за ним следует Мадрид) завершает работу, все оставшиеся результаты передаются по течению.

Обратите внимание, что население Лондона, хотя и доступно, должно бездействовать, пока не завершатся Варшава и Париж. Так является ли concatMapEager() лучшим из возможных операторов для параллелизма? Не совсем. Представьте, что у нас есть список из тысяч городов, и для каждого мы получаем одну картинку размером 1 МБ. С concatMap() мы загружаем картинки последовательно, т.е. медленно. С flatMap() изображения загружаются одновременно и обрабатываются по мере их поступления, как можно скорее. Теперь, что насчет concatMapEager() ? В худшем случае мы можем получить concatMapEager() буферизующий 999 изображений, потому что изображение из самого первого города оказывается самым медленным. Несмотря на то, что у нас уже есть 99,9% результатов, мы не можем их обработать, потому что мы применяем строгий порядок.

Какой оператор использовать?

flatMap() должен быть вашим первым оружием выбора. Это позволяет эффективный параллелизм с потоковым поведением. Но будьте готовы получить результаты не по порядку. concatMap() работает хорошо, только если преобразование выполняется так быстро, последовательная обработка не является проблемой. concatMapEager() очень удобен, но следите за потреблением памяти. Также в худшем случае вы можете сидеть без дела, ожидая очень мало ответов.

Приложение: настройка клиента Retrofit2

Интерфейс сервиса GeoNames который мы использовали в этой статье, на самом деле выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
public interface GeoNames {
  
    @GET("/searchJSON")
    Single<SearchResult> search(
            @Query("q") String query,
            @Query("maxRows") int maxRows,
            @Query("style") String style,
            @Query("username") String username
    );
  
    default Flowable<Long> populationOf(String city) {
        return search(city, 1, "LONG", "s3cret")
                .map(SearchResult::getGeonames)
                .map(g -> g.get(0))
                .map(Geoname::getPopulation)
                .toFlowable();
    }
  
}

Реализация метода не по умолчанию автоматически генерируется Retrofit2. Обратите внимание, что populationOf() для простоты возвращает одноэлементный Flowable<Long> . Однако, чтобы полностью охватить природу этого API, в реальной жизни было бы более разумно использовать другие реализации. Прежде всего, класс SearchResult возвращает упорядоченный список результатов (методы getter / setters опущены):

01
02
03
04
05
06
07
08
09
10
11
12
class SearchResult {
    private List<Geoname> geonames = new ArrayList<>();
}
  
class Geoname {
    private double lat;
    private double lng;
    private Integer geonameId;
    private Long population;
    private String countryCode;
    private String name;
}

Ведь в мире много Варшав и Лондонов . Мы молча предполагаем, что список будет содержать хотя бы один элемент, и первый из них будет правильным . Более подходящая реализация должна либо возвращать все попадания, либо даже лучше тип Maybe<Long> чтобы не отображать совпадения:

1
2
3
4
5
6
default Maybe<Long> populationOf(String city) {
    return search(city, 1, "LONG", "nurkiewicz")
            .flattenAsFlowable(SearchResult::getGeonames)
            .map(Geoname::getPopulation)
            .firstElement();
}

Клеевой код выглядит следующим образом. Первая настройка Джексона для анализа ответа от API:

1
2
3
4
5
6
import com.fasterxml.jackson.databind.ObjectMapper;
  
private ObjectMapper objectMapper() {
    return new ObjectMapper()
            .configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
}

FAIL_ON_UNKNOWN_PROPERTIES часто то, что вы хотите. В противном случае вы должны отобразить все поля из ответа JSON, и ваш код будет поврежден, когда производитель API вводит новые, иначе обратно совместимые поля. Затем мы OkHttpClient , используемый под Retrofit:

1
2
3
4
5
6
7
8
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
  
private OkHttpClient client() {
    HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
    interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
    return new OkHttpClient.Builder().addInterceptor(interceptor).build();
}

Иногда вы можете пропустить настройку клиента OkHttp, но мы добавили перехватчик логирования. По умолчанию протоколы OkHttp используют java.util.logging поэтому, чтобы использовать подходящую среду ведения журналов, мы должны установить мост в самом начале:

1
2
3
4
5
6
import org.slf4j.bridge.SLF4JBridgeHandler;
  
static {
    SLF4JBridgeHandler.removeHandlersForRootLogger();
    SLF4JBridgeHandler.install();
}

И, наконец, дооснащение себя

01
02
03
04
05
06
07
08
09
10
11
12
13
14
import io.reactivex.schedulers.Schedulers;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;
  
GeoNames createClient() {
    return new Retrofit.Builder()
            .client(client())
            .baseUrl("http://api.geonames.org")
            .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
            .addConverterFactory(JacksonConverterFactory.create(objectMapper()))
            .build()
            .create(GeoNames.class);
}

Вызов createClient() приведет к динамической реализации интерфейса GeoNames . Мы использовали следующие зависимости:

1
2
3
4
5
6
7
8
9
compile 'io.reactivex.rxjava2:rxjava:2.0.6'
  
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.retrofit2:converter-jackson:2.0.1'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.0'
  
compile 'ch.qos.logback:logback-classic:1.1.7'
compile 'org.slf4j:slf4j-api:1.7.21'
compile 'org.slf4j:jul-to-slf4j:1.7.21'