Статьи

Преобразование между Завершаемым будущим и Наблюдаемым

CompletableFuture<T> из Java 8 — это расширенная абстракция над обещанием, что значение типа T будет доступно в будущем . Observable<T> довольно похож, но он обещает произвольное количество предметов в будущем, от 0 до бесконечности. Эти два представления асинхронных результатов очень похожи на точку, в которой можно использовать Observable только с одним элементом вместо CompletableFuture и наоборот. С другой стороны, CompletableFuture более специализирован, и, поскольку теперь он является частью JDK, он должен стать распространенным довольно скоро. Давайте отпразднуем выпуск RxJava 1.0 с короткой статьей, показывающей, как конвертировать между ними, не теряя при этом их асинхронный и управляемый событиями характер.

От CompletableFuture<T> к Observable<T>

CompletableFuture представляет одно значение в будущем, поэтому преобразовать его в Observable довольно просто. Когда Future завершает работу с каким-то значением, Observable также немедленно выдаст это значение и закроет поток:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FuturesTest extends Specification {
  
    public static final String MSG = "Don't panic"
  
    def 'should convert completed Future to completed Observable'() {
        given:
            CompletableFuture<String> future = CompletableFuture.completedFuture("Abc")
  
        when:
            Observable<String> observable = Futures.toObservable(future)
  
        then:
            observable.toBlocking().toIterable().toList() == ["Abc"]
    }
  
    def 'should convert failed Future into Observable with failure'() {
        given:
            CompletableFuture<String> future = failedFuture(new IllegalStateException(MSG))
  
        when:
            Observable<String> observable = Futures.toObservable(future)
  
        then:
            observable
                    .onErrorReturn({ th -> th.message } as Func1)
                    .toBlocking()
                    .toIterable()
                    .toList() == [MSG]
    
  
    CompletableFuture failedFuture(Exception error) {
        CompletableFuture future = new CompletableFuture()
        future.completeExceptionally(error)
        return future
    }
  
}

Первый тест еще не реализованного Futures.toObservable() преобразует Future в Observable и обеспечивает правильное распространение значения. Второй тест создал Future , заменяет сбой сообщением об исключении и обеспечивает распространение исключения. Реализация намного короче:

01
02
03
04
05
06
07
08
09
10
11
public static <T> Observable<T> toObservable(CompletableFuture<T> future) {
    return Observable.create(subscriber ->
            future.whenComplete((result, error) -> {
                if (error != null) {
                    subscriber.onError(error);
                } else {
                    subscriber.onNext(result);
                    subscriber.onCompleted();
                }
            }));
}

NB: Observable.fromFuture() существует, однако мы хотим в полной мере ComplatableFuture асинхронные операторы ComplatableFuture .

От Observable<T> до CompletableFuture<List<T>>

На самом деле существует два способа преобразования Observable в Future — создание CompletableFuture<List<T>> или CompletableFuture<T> (если мы предположим, что в Observable есть только один элемент). Давайте начнем с первого случая, описанного со следующими тестами:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def 'should convert Observable with many items to Future of list'() {
    given:
        Observable<Integer> observable = Observable>just(1, 2, 3)
  
    when:
        CompletableFuture<List<Integer>> future = Futures>fromObservable(observable)
  
    then:
        future>get() == [1, 2, 3]
}
  
def 'should return failed Future when after few items exception was emitted'() {
    given:
        Observable<Integer> observable = Observable>just(1, 2, 3)
                >concatWith(Observable>error(new IllegalStateException(MSG)))
  
    when:
        Futures>fromObservable(observable)
  
    then:
        def e = thrown(Exception)
        e>message == MSG
}

Очевидно, что Future не завершится, пока источник Observable сигнализирует о конце потока. Таким образом, Observable.never() никогда не завершит перенос Future , а не завершит его пустым списком. Реализация намного короче и слаще

1
2
3
4
5
6
7
8
public static <T> CompletableFuture<List<T>> fromObservable(Observable<T> observable) {
    final CompletableFuture<List<T>> future = new CompletableFuture<>();
    observable
            .doOnError(future::completeExceptionally)
            .toList()
            .forEach(future::complete);
    return future;
}

Ключ Observable.toList() который удобно конвертировать из Observable<T> и Observable<List<T>> . Последний генерирует один элемент типа List<T> когда источник Observable<T> заканчивается.

От Observable<T> к CompletableFuture<T>

Особый случай предыдущего преобразования происходит, когда мы знаем, что CompletableFuture<T> вернет ровно один элемент. В этом случае мы можем преобразовать его непосредственно в CompletableFuture<T> , а не в CompletableFuture<List<T>> с одним элементом. Сначала тесты:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def 'should convert Observable with single item to Future'() {
    given:
        Observable<Integer> observable = Observable.just(1)
  
    when:
        CompletableFuture<Integer> future = Futures.fromSingleObservable(observable)
  
    then:
        future.get() == 1
}
  
def 'should create failed Future when Observable fails'() {
    given:
        Observable<String> observable = Observable.<String> error(new IllegalStateException(MSG))
  
    when:
        Futures.fromSingleObservable(observable)
  
    then:
        def e = thrown(Exception)
        e.message == MSG
}
  
def 'should fail when single Observable produces too many items'() {
    given:
        Observable<Integer> observable = Observable.just(1, 2)
  
    when:
        Futures.fromSingleObservable(observable)
  
    then:
        def e = thrown(Exception)
        e.message.contains("too many elements")
}

Опять же, реализация довольно проста и почти идентична:

1
2
3
4
5
6
7
8
public static <T> CompletableFuture<T> fromSingleObservable(Observable<T> observable) {
    final CompletableFuture<T> future = new CompletableFuture<>();
    observable
        .doOnError(future::completeExceptionally)
        .single()
        .forEach(future::complete);
    return future;
}

Приведенные выше методы хелперов еще не полностью надежны, но если вам когда-нибудь понадобится выполнить преобразование между асинхронными вычислениями в стиле JDK 8 и RxJava, этой статьи должно быть достаточно для начала работы.

Ссылка: Преобразование между Completablefuture и Observable от нашего партнера JCG Томаша Нуркевича из блога Java и соседей .