Статьи

Горячая и холодная rx-java наблюдаемая

Мое собственное понимание Hot and Cold Observable довольно шатко, но вот что я понял до сих пор!

Наблюдаемая холод

Рассмотрим API, который возвращает rx-java Observable :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
import obs.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
 
public class Service1 {
    private static final Logger logger = LoggerFactory.getLogger(Service1.class);
    public Observable<String> operation() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing slow task in Service 1");
            Util.delay(1000);
            s.onNext("data 1");
            logger.info("End: Executing slow task in Service 1");
            s.onCompleted();
        }).subscribeOn(Schedulers.computation());
    }
}

Теперь первое, что следует отметить, это то, что типичный Observable ничего не делает, пока не подписан на:

По сути, если бы я сделал это:

1
Observable<String> op1 = service1.operation();

Ничто не будет распечатано или возвращено, если на Наблюдаемом нет подписки следующим образом:

1
2
3
4
5
6
7
8
9
Observable<String> op1 = service1.operation();
 
CountDownLatch latch = new CountDownLatch(1);
 
op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
latch.await();

Итак, что произойдет, если есть несколько подписок на этот Observable:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
Observable<String> op1 = service1.operation();
 
CountDownLatch latch = new CountDownLatch(3);
 
op1.subscribe(s -> logger.info("From Subscriber 1: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
op1.subscribe(s -> logger.info("From Subscriber 2: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
op1.subscribe(s -> logger.info("From Subscriber 3: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
latch.await();

При наличии наблюдаемой простуды код будет вызываться еще раз, и элементы будут выбрасываться снова, я получаю это на своей машине:

1
2
3
4
5
6
7
8
06:04:07.206 [RxComputationThreadPool-2] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:04:07.208 [RxComputationThreadPool-3] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:04:08.211 [RxComputationThreadPool-2] INFO  o.b.BasicObservablesTest - From Subscriber 2: data 1
06:04:08.211 [RxComputationThreadPool-1] INFO  o.b.BasicObservablesTest - From Subscriber 1: data 1
06:04:08.211 [RxComputationThreadPool-3] INFO  o.b.BasicObservablesTest - From Subscriber 3: data 1
06:04:08.213 [RxComputationThreadPool-2] INFO  o.b.Service1 - End: Executing slow task in Service 1
06:04:08.214 [RxComputationThreadPool-1] INFO  o.b.Service1 - End: Executing slow task in Service 1
06:04:08.214 [RxComputationThreadPool-3] INFO  o.b.Service1 - End: Executing slow task in Service 1

Hot Observable — используя ConnectableObservable

С другой стороны, Hot Observable не требует подписки для начала выдачи предметов. Одним из способов реализации Hot Observable является использование ConnectableObservable , который является Observable, который не излучает элементы, пока не будет вызван его метод connect, однако, как только он начинает излучать элементы, любой подписчик на него получает элементы только с точки подписки. Итак, еще раз вернемся к предыдущему примеру, но вместо этого с ConnectableObservable:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
Observable<String> op1 = service1.operation();
 
ConnectableObservable<String> connectableObservable =  op1.publish();
 
CountDownLatch latch = new CountDownLatch(3);
 
connectableObservable.subscribe(s -> logger.info("From Subscriber 1: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
connectableObservable.subscribe(s -> logger.info("From Subscriber 2: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
connectableObservable.subscribe(s -> logger.info("From Subscriber 3: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
connectableObservable.connect();
 
latch.await();

и следующее напечатано:

1
2
3
4
5
06:07:23.852 [RxComputationThreadPool-3] INFO  o.b.Service1 - Start: Executing slow task in Service 1
06:07:24.860 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 1: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 2: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.ConnectableObservablesTest - From Subscriber 3: data 1
06:07:24.862 [RxComputationThreadPool-3] INFO  o.b.Service1 - End: Executing slow task in Service 1

Hot Observable — используя тему

Другой способ преобразовать холодную наблюдаемую в горячую — использовать тему . Субъекты ведут себя как как Наблюдаемые, так и Наблюдатели, существуют разные типы Субъектов с различным поведением. Здесь я использую Предмет, называемый PublishSubject, который имеет поведение Pub / Sub — элементы передаются всем подписчикам, слушающим его. Итак, с введенным PublishSubject код выглядит так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable<String> op1 = service1.operation();
 
PublishSubject<String> publishSubject = PublishSubject.create();
 
op1.subscribe(publishSubject);
 
CountDownLatch latch = new CountDownLatch(3);
 
publishSubject.subscribe(s -> logger.info("From Subscriber 1: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
publishSubject.subscribe(s -> logger.info("From Subscriber 2: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
publishSubject.subscribe(s -> logger.info("From Subscriber 3: {}", s),
        e -> logger.error(e.getMessage(), e),
        () -> latch.countDown());
 
 
latch.await();

Посмотрите, как PublishSubject представлен как подписчик на Observable, а другие подписчики вместо этого подписываются на PublishSubject. Вывод будет аналогичен выходу из ConnectableObservable.

Это по сути дела, степень моего понимания Hot Observable. Итак, чтобы заключить, разница между холодной и горячей наблюдаемой заключается в том, когда подписчики получают испущенные элементы и когда элементы испускаются — с холодной наблюдаемой они испускаются, когда они подписаны и обычно получают все испускаемые элементы, с Горячее наблюдаемое, что элементы испускаются без подписчика, и подписчики обычно получают элементы после пункта подписки.

Ссылка

  1. http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
  2. Отличный javadoc на rx-java — http://reactivex.io/RxJava/javadoc/index.html
Ссылка: Горячая и холодная rx-java Наблюдаемая нашим партнером JCG Биджу Кунджумменом в блоге « Все и вся» .