Мое собственное понимание Hot and Cold Observable довольно шатко, но вот что я понял до сих пор!
Наблюдаемая холод
Рассмотрим API, который возвращает rx-java Observable :
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
|
import obs.Util;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import rx.Observable;import rx.schedulers.Schedulers;public class Service1 { private static final Logger logger = LoggerFactory.getLogger(Service1.class); public Observable<String> operation() { return Observable.<String>create(s -> { logger.info("Start: Executing slow task in Service 1"); Util.delay(1000); s.onNext("data 1"); logger.info("End: Executing slow task in Service 1"); s.onCompleted(); }).subscribeOn(Schedulers.computation()); }} |
Теперь первое, что следует отметить, это то, что типичный Observable ничего не делает, пока не подписан на:
По сути, если бы я сделал это:
|
1
|
Observable<String> op1 = service1.operation(); |
Ничто не будет распечатано или возвращено, если на Наблюдаемом нет подписки следующим образом:
|
1
2
3
4
5
6
7
8
9
|
Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(1);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());latch.await(); |
Итак, что произойдет, если есть несколько подписок на этот Observable:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
Observable<String> op1 = service1.operation();CountDownLatch latch = new CountDownLatch(3);op1.subscribe(s -> logger.info("From Subscriber 1: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 2: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());op1.subscribe(s -> logger.info("From Subscriber 3: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());latch.await(); |
При наличии наблюдаемой простуды код будет вызываться еще раз, и элементы будут выбрасываться снова, я получаю это на своей машине:
|
1
2
3
4
5
6
7
8
|
06:04:07.206 [RxComputationThreadPool-2] INFO o.b.Service1 - Start: Executing slow task in Service 106:04:07.208 [RxComputationThreadPool-3] INFO o.b.Service1 - Start: Executing slow task in Service 106:04:08.211 [RxComputationThreadPool-2] INFO o.b.BasicObservablesTest - From Subscriber 2: data 106:04:08.211 [RxComputationThreadPool-1] INFO o.b.BasicObservablesTest - From Subscriber 1: data 106:04:08.211 [RxComputationThreadPool-3] INFO o.b.BasicObservablesTest - From Subscriber 3: data 106:04:08.213 [RxComputationThreadPool-2] INFO o.b.Service1 - End: Executing slow task in Service 106:04:08.214 [RxComputationThreadPool-1] INFO o.b.Service1 - End: Executing slow task in Service 106:04:08.214 [RxComputationThreadPool-3] INFO o.b.Service1 - End: Executing slow task in Service 1 |
Hot Observable — используя ConnectableObservable
С другой стороны, Hot Observable не требует подписки для начала выдачи предметов. Одним из способов реализации Hot Observable является использование ConnectableObservable , который является Observable, который не излучает элементы, пока не будет вызван его метод connect, однако, как только он начинает излучать элементы, любой подписчик на него получает элементы только с точки подписки. Итак, еще раз вернемся к предыдущему примеру, но вместо этого с ConnectableObservable:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
Observable<String> op1 = service1.operation();ConnectableObservable<String> connectableObservable = op1.publish();CountDownLatch latch = new CountDownLatch(3);connectableObservable.subscribe(s -> logger.info("From Subscriber 1: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 2: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());connectableObservable.subscribe(s -> logger.info("From Subscriber 3: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());connectableObservable.connect();latch.await(); |
и следующее напечатано:
|
1
2
3
4
5
|
06:07:23.852 [RxComputationThreadPool-3] INFO o.b.Service1 - Start: Executing slow task in Service 106:07:24.860 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 1: data 106:07:24.862 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 2: data 106:07:24.862 [RxComputationThreadPool-3] INFO o.b.ConnectableObservablesTest - From Subscriber 3: data 106:07:24.862 [RxComputationThreadPool-3] INFO o.b.Service1 - End: Executing slow task in Service 1 |
Hot Observable — используя тему
Другой способ преобразовать холодную наблюдаемую в горячую — использовать тему . Субъекты ведут себя как как Наблюдаемые, так и Наблюдатели, существуют разные типы Субъектов с различным поведением. Здесь я использую Предмет, называемый PublishSubject, который имеет поведение Pub / Sub — элементы передаются всем подписчикам, слушающим его. Итак, с введенным PublishSubject код выглядит так:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
Observable<String> op1 = service1.operation();PublishSubject<String> publishSubject = PublishSubject.create();op1.subscribe(publishSubject);CountDownLatch latch = new CountDownLatch(3);publishSubject.subscribe(s -> logger.info("From Subscriber 1: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 2: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());publishSubject.subscribe(s -> logger.info("From Subscriber 3: {}", s), e -> logger.error(e.getMessage(), e), () -> latch.countDown());latch.await(); |
Посмотрите, как PublishSubject представлен как подписчик на Observable, а другие подписчики вместо этого подписываются на PublishSubject. Вывод будет аналогичен выходу из ConnectableObservable.
Это по сути дела, степень моего понимания Hot Observable. Итак, чтобы заключить, разница между холодной и горячей наблюдаемой заключается в том, когда подписчики получают испущенные элементы и когда элементы испускаются — с холодной наблюдаемой они испускаются, когда они подписаны и обычно получают все испускаемые элементы, с Горячее наблюдаемое, что элементы испускаются без подписчика, и подписчики обычно получают элементы после пункта подписки.
Ссылка
- http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
- Отличный javadoc на rx-java — http://reactivex.io/RxJava/javadoc/index.html
| Ссылка: | Горячая и холодная rx-java Наблюдаемая нашим партнером JCG Биджу Кунджумменом в блоге « Все и вся» . |