CompletableFuture<T> из Java 8 — это расширенная абстракция над обещанием, что значение типа T будет доступно в будущем . Observable<T> довольно похож, но он обещает произвольное количество предметов в будущем, от 0 до бесконечности. Эти два представления асинхронных результатов очень похожи на точку, в которой можно использовать Observable только с одним элементом вместо CompletableFuture и наоборот. С другой стороны, CompletableFuture более специализирован, и, поскольку теперь он является частью JDK, он должен стать распространенным довольно скоро. Давайте отпразднуем выпуск RxJava 1.0 с короткой статьей, показывающей, как конвертировать между ними, не теряя при этом их асинхронный и управляемый событиями характер.
От CompletableFuture<T> к Observable<T>
CompletableFuture представляет одно значение в будущем, поэтому преобразовать его в Observable довольно просто. Когда Future завершает работу с каким-то значением, Observable также немедленно выдаст это значение и закроет поток:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
class FuturesTest extends Specification { public static final String MSG = "Don't panic" def 'should convert completed Future to completed Observable'() { given: CompletableFuture<String> future = CompletableFuture.completedFuture("Abc") when: Observable<String> observable = Futures.toObservable(future) then: observable.toBlocking().toIterable().toList() == ["Abc"] } def 'should convert failed Future into Observable with failure'() { given: CompletableFuture<String> future = failedFuture(new IllegalStateException(MSG)) when: Observable<String> observable = Futures.toObservable(future) then: observable .onErrorReturn({ th -> th.message } as Func1) .toBlocking() .toIterable() .toList() == [MSG] } CompletableFuture failedFuture(Exception error) { CompletableFuture future = new CompletableFuture() future.completeExceptionally(error) return future } } |
Первый тест еще не реализованного Futures.toObservable() преобразует Future в Observable и обеспечивает правильное распространение значения. Второй тест создал Future , заменяет сбой сообщением об исключении и обеспечивает распространение исключения. Реализация намного короче:
|
01
02
03
04
05
06
07
08
09
10
11
|
public static <T> Observable<T> toObservable(CompletableFuture<T> future) { return Observable.create(subscriber -> future.whenComplete((result, error) -> { if (error != null) { subscriber.onError(error); } else { subscriber.onNext(result); subscriber.onCompleted(); } }));} |
NB: Observable.fromFuture() существует, однако мы хотим в полной мере ComplatableFuture асинхронные операторы ComplatableFuture .
От Observable<T> до CompletableFuture<List<T>>
На самом деле существует два способа преобразования Observable в Future — создание CompletableFuture<List<T>> или CompletableFuture<T> (если мы предположим, что в Observable есть только один элемент). Давайте начнем с первого случая, описанного со следующими тестами:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
def 'should convert Observable with many items to Future of list'() { given: Observable<Integer> observable = Observable>just(1, 2, 3) when: CompletableFuture<List<Integer>> future = Futures>fromObservable(observable) then: future>get() == [1, 2, 3]} def 'should return failed Future when after few items exception was emitted'() { given: Observable<Integer> observable = Observable>just(1, 2, 3) >concatWith(Observable>error(new IllegalStateException(MSG))) when: Futures>fromObservable(observable) then: def e = thrown(Exception) e>message == MSG} |
Очевидно, что Future не завершится, пока источник Observable сигнализирует о конце потока. Таким образом, Observable.never() никогда не завершит перенос Future , а не завершит его пустым списком. Реализация намного короче и слаще
|
1
2
3
4
5
6
7
8
|
public static <T> CompletableFuture<List<T>> fromObservable(Observable<T> observable) { final CompletableFuture<List<T>> future = new CompletableFuture<>(); observable .doOnError(future::completeExceptionally) .toList() .forEach(future::complete); return future;} |
Ключ Observable.toList() который удобно конвертировать из Observable<T> и Observable<List<T>> . Последний генерирует один элемент типа List<T> когда источник Observable<T> заканчивается.
От Observable<T> к CompletableFuture<T>
Особый случай предыдущего преобразования происходит, когда мы знаем, что CompletableFuture<T> вернет ровно один элемент. В этом случае мы можем преобразовать его непосредственно в CompletableFuture<T> , а не в CompletableFuture<List<T>> с одним элементом. Сначала тесты:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
def 'should convert Observable with single item to Future'() { given: Observable<Integer> observable = Observable.just(1) when: CompletableFuture<Integer> future = Futures.fromSingleObservable(observable) then: future.get() == 1} def 'should create failed Future when Observable fails'() { given: Observable<String> observable = Observable.<String> error(new IllegalStateException(MSG)) when: Futures.fromSingleObservable(observable) then: def e = thrown(Exception) e.message == MSG} def 'should fail when single Observable produces too many items'() { given: Observable<Integer> observable = Observable.just(1, 2) when: Futures.fromSingleObservable(observable) then: def e = thrown(Exception) e.message.contains("too many elements")} |
Опять же, реализация довольно проста и почти идентична:
|
1
2
3
4
5
6
7
8
|
public static <T> CompletableFuture<T> fromSingleObservable(Observable<T> observable) { final CompletableFuture<T> future = new CompletableFuture<>(); observable .doOnError(future::completeExceptionally) .single() .forEach(future::complete); return future;} |
Приведенные выше методы хелперов еще не полностью надежны, но если вам когда-нибудь понадобится выполнить преобразование между асинхронными вычислениями в стиле JDK 8 и RxJava, этой статьи должно быть достаточно для начала работы.
| Ссылка: | Преобразование между Completablefuture и Observable от нашего партнера JCG Томаша Нуркевича из блога Java и соседей . |