CompletableFuture<T>
из Java 8 — это расширенная абстракция над обещанием, что значение типа T
будет доступно в будущем . Observable<T>
довольно похож, но он обещает произвольное количество предметов в будущем, от 0 до бесконечности. Эти два представления асинхронных результатов очень похожи на точку, в которой можно использовать Observable
только с одним элементом вместо CompletableFuture
и наоборот. С другой стороны, CompletableFuture
более специализирован, и, поскольку теперь он является частью JDK, он должен стать распространенным довольно скоро. Давайте отпразднуем выпуск RxJava 1.0 с короткой статьей, показывающей, как конвертировать между ними, не теряя при этом их асинхронный и управляемый событиями характер.
От CompletableFuture<T>
к Observable<T>
CompletableFuture
представляет одно значение в будущем, поэтому преобразовать его в Observable
довольно просто. Когда Future
завершает работу с каким-то значением, Observable
также немедленно выдаст это значение и закроет поток:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
class FuturesTest extends Specification { public static final String MSG = "Don't panic" def 'should convert completed Future to completed Observable' () { given: CompletableFuture<String> future = CompletableFuture.completedFuture( "Abc" ) when: Observable<String> observable = Futures.toObservable(future) then: observable.toBlocking().toIterable().toList() == [ "Abc" ] } def 'should convert failed Future into Observable with failure' () { given: CompletableFuture<String> future = failedFuture( new IllegalStateException(MSG)) when: Observable<String> observable = Futures.toObservable(future) then: observable .onErrorReturn({ th -> th.message } as Func1) .toBlocking() .toIterable() .toList() == [MSG] } CompletableFuture failedFuture(Exception error) { CompletableFuture future = new CompletableFuture() future.completeExceptionally(error) return future } } |
Первый тест еще не реализованного Futures.toObservable()
преобразует Future
в Observable
и обеспечивает правильное распространение значения. Второй тест создал Future
, заменяет сбой сообщением об исключении и обеспечивает распространение исключения. Реализация намного короче:
01
02
03
04
05
06
07
08
09
10
11
|
public static <T> Observable<T> toObservable(CompletableFuture<T> future) { return Observable.create(subscriber -> future.whenComplete((result, error) -> { if (error != null ) { subscriber.onError(error); } else { subscriber.onNext(result); subscriber.onCompleted(); } })); } |
NB: Observable.fromFuture()
существует, однако мы хотим в полной мере ComplatableFuture
асинхронные операторы ComplatableFuture
.
От Observable<T>
до CompletableFuture<List<T>>
На самом деле существует два способа преобразования Observable
в Future
— создание CompletableFuture<List<T>>
или CompletableFuture<T>
(если мы предположим, что в Observable
есть только один элемент). Давайте начнем с первого случая, описанного со следующими тестами:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
def 'should convert Observable with many items to Future of list' () { given: Observable<Integer> observable = Observable>just( 1 , 2 , 3 ) when: CompletableFuture<List<Integer>> future = Futures>fromObservable(observable) then: future>get() == [ 1 , 2 , 3 ] } def 'should return failed Future when after few items exception was emitted' () { given: Observable<Integer> observable = Observable>just( 1 , 2 , 3 ) >concatWith(Observable>error( new IllegalStateException(MSG))) when: Futures>fromObservable(observable) then: def e = thrown(Exception) e>message == MSG } |
Очевидно, что Future
не завершится, пока источник Observable
сигнализирует о конце потока. Таким образом, Observable.never()
никогда не завершит перенос Future
, а не завершит его пустым списком. Реализация намного короче и слаще
1
2
3
4
5
6
7
8
|
public static <T> CompletableFuture<List<T>> fromObservable(Observable<T> observable) { final CompletableFuture<List<T>> future = new CompletableFuture<>(); observable .doOnError(future::completeExceptionally) .toList() .forEach(future::complete); return future; } |
Ключ Observable.toList()
который удобно конвертировать из Observable<T>
и Observable<List<T>>
. Последний генерирует один элемент типа List<T>
когда источник Observable<T>
заканчивается.
От Observable<T>
к CompletableFuture<T>
Особый случай предыдущего преобразования происходит, когда мы знаем, что CompletableFuture<T>
вернет ровно один элемент. В этом случае мы можем преобразовать его непосредственно в CompletableFuture<T>
, а не в CompletableFuture<List<T>>
с одним элементом. Сначала тесты:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
def 'should convert Observable with single item to Future' () { given: Observable<Integer> observable = Observable.just( 1 ) when: CompletableFuture<Integer> future = Futures.fromSingleObservable(observable) then: future.get() == 1 } def 'should create failed Future when Observable fails' () { given: Observable<String> observable = Observable.<String> error( new IllegalStateException(MSG)) when: Futures.fromSingleObservable(observable) then: def e = thrown(Exception) e.message == MSG } def 'should fail when single Observable produces too many items' () { given: Observable<Integer> observable = Observable.just( 1 , 2 ) when: Futures.fromSingleObservable(observable) then: def e = thrown(Exception) e.message.contains( "too many elements" ) } |
Опять же, реализация довольно проста и почти идентична:
1
2
3
4
5
6
7
8
|
public static <T> CompletableFuture<T> fromSingleObservable(Observable<T> observable) { final CompletableFuture<T> future = new CompletableFuture<>(); observable .doOnError(future::completeExceptionally) .single() .forEach(future::complete); return future; } |
Приведенные выше методы хелперов еще не полностью надежны, но если вам когда-нибудь понадобится выполнить преобразование между асинхронными вычислениями в стиле JDK 8 и RxJava, этой статьи должно быть достаточно для начала работы.
Ссылка: | Преобразование между Completablefuture и Observable от нашего партнера JCG Томаша Нуркевича из блога Java и соседей . |