Мое собственное понимание Hot and Cold Observable довольно шатко, но вот что я понял до сих пор!
Наблюдаемая холод
Рассмотрим API, который возвращает rx-java Observable :
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
import obs.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.schedulers.Schedulers; public class Service1 { private static final Logger logger = LoggerFactory.getLogger(Service1. class ); public Observable<String> operation() { return Observable.<String>create(s -> { logger.info( "Start: Executing slow task in Service 1" ); Util.delay( 1000 ); s.onNext( "data 1" ); logger.info( "End: Executing slow task in Service 1" ); s.onCompleted(); }).subscribeOn(Schedulers.computation()); } } |
Теперь первое, что следует отметить, это то, что типичный Observable ничего не делает, пока не подписан на:
По сути, если бы я сделал это:
1
|
Observable<String> op1 = service1.operation(); |
Ничто не будет распечатано или возвращено, если на Наблюдаемом нет подписки следующим образом:
1
2
3
4
5
6
7
8
9
|
Observable<String> op1 = service1.operation(); CountDownLatch latch = new CountDownLatch( 1 ); op1.subscribe(s -> logger.info( "From Subscriber 1: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); |
Итак, что произойдет, если есть несколько подписок на этот Observable:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
Observable<String> op1 = service1.operation(); CountDownLatch latch = new CountDownLatch( 3 ); op1.subscribe(s -> logger.info( "From Subscriber 1: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); op1.subscribe(s -> logger.info( "From Subscriber 2: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); op1.subscribe(s -> logger.info( "From Subscriber 3: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); |
При наличии наблюдаемой простуды код будет вызываться еще раз, и элементы будут выбрасываться снова, я получаю это на своей машине:
1
2
3
4
5
6
7
8
|
06 : 04 : 07.206 [RxComputationThreadPool- 2 ] INFO o.b.Service1 - Start: Executing slow task in Service 1 06 : 04 : 07.208 [RxComputationThreadPool- 3 ] INFO o.b.Service1 - Start: Executing slow task in Service 1 06 : 04 : 08.211 [RxComputationThreadPool- 2 ] INFO o.b.BasicObservablesTest - From Subscriber 2 : data 1 06 : 04 : 08.211 [RxComputationThreadPool- 1 ] INFO o.b.BasicObservablesTest - From Subscriber 1 : data 1 06 : 04 : 08.211 [RxComputationThreadPool- 3 ] INFO o.b.BasicObservablesTest - From Subscriber 3 : data 1 06 : 04 : 08.213 [RxComputationThreadPool- 2 ] INFO o.b.Service1 - End: Executing slow task in Service 1 06 : 04 : 08.214 [RxComputationThreadPool- 1 ] INFO o.b.Service1 - End: Executing slow task in Service 1 06 : 04 : 08.214 [RxComputationThreadPool- 3 ] INFO o.b.Service1 - End: Executing slow task in Service 1 |
Hot Observable — используя ConnectableObservable
С другой стороны, Hot Observable не требует подписки для начала выдачи предметов. Одним из способов реализации Hot Observable является использование ConnectableObservable , который является Observable, который не излучает элементы, пока не будет вызван его метод connect, однако, как только он начинает излучать элементы, любой подписчик на него получает элементы только с точки подписки. Итак, еще раз вернемся к предыдущему примеру, но вместо этого с ConnectableObservable:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
Observable<String> op1 = service1.operation(); ConnectableObservable<String> connectableObservable = op1.publish(); CountDownLatch latch = new CountDownLatch( 3 ); connectableObservable.subscribe(s -> logger.info( "From Subscriber 1: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); connectableObservable.subscribe(s -> logger.info( "From Subscriber 2: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); connectableObservable.subscribe(s -> logger.info( "From Subscriber 3: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); connectableObservable.connect(); latch.await(); |
и следующее напечатано:
1
2
3
4
5
|
06 : 07 : 23.852 [RxComputationThreadPool- 3 ] INFO o.b.Service1 - Start: Executing slow task in Service 1 06 : 07 : 24.860 [RxComputationThreadPool- 3 ] INFO o.b.ConnectableObservablesTest - From Subscriber 1 : data 1 06 : 07 : 24.862 [RxComputationThreadPool- 3 ] INFO o.b.ConnectableObservablesTest - From Subscriber 2 : data 1 06 : 07 : 24.862 [RxComputationThreadPool- 3 ] INFO o.b.ConnectableObservablesTest - From Subscriber 3 : data 1 06 : 07 : 24.862 [RxComputationThreadPool- 3 ] INFO o.b.Service1 - End: Executing slow task in Service 1 |
Hot Observable — используя тему
Другой способ преобразовать холодную наблюдаемую в горячую — использовать тему . Субъекты ведут себя как как Наблюдаемые, так и Наблюдатели, существуют разные типы Субъектов с различным поведением. Здесь я использую Предмет, называемый PublishSubject, который имеет поведение Pub / Sub — элементы передаются всем подписчикам, слушающим его. Итак, с введенным PublishSubject код выглядит так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
Observable<String> op1 = service1.operation(); PublishSubject<String> publishSubject = PublishSubject.create(); op1.subscribe(publishSubject); CountDownLatch latch = new CountDownLatch( 3 ); publishSubject.subscribe(s -> logger.info( "From Subscriber 1: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); publishSubject.subscribe(s -> logger.info( "From Subscriber 2: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); publishSubject.subscribe(s -> logger.info( "From Subscriber 3: {}" , s), e -> logger.error(e.getMessage(), e), () -> latch.countDown()); latch.await(); |
Посмотрите, как PublishSubject представлен как подписчик на Observable, а другие подписчики вместо этого подписываются на PublishSubject. Вывод будет аналогичен выходу из ConnectableObservable.
Это по сути дела, степень моего понимания Hot Observable. Итак, чтобы заключить, разница между холодной и горячей наблюдаемой заключается в том, когда подписчики получают испущенные элементы и когда элементы испускаются — с холодной наблюдаемой они испускаются, когда они подписаны и обычно получают все испускаемые элементы, с Горячее наблюдаемое, что элементы испускаются без подписчика, и подписчики обычно получают элементы после пункта подписки.
Ссылка
- http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
- Отличный javadoc на rx-java — http://reactivex.io/RxJava/javadoc/index.html
Ссылка: | Горячая и холодная rx-java Наблюдаемая нашим партнером JCG Биджу Кунджумменом в блоге « Все и вся» . |