Статьи

Асинхронные абстракции с использованием rx-java

Одним из больших преимуществ использования Rx-java для меня было то, что код выглядит одинаково независимо от того, являются ли базовые вызовы синхронными или асинхронными, и, следовательно, заголовок этой записи.

Рассмотрим очень простой вариант использования клиентского кода, который выполняет три медленно выполняющихся вызова и объединяет результаты в список:

1
2
3
4
String op1 = service1.operation();
String op2 = service2.operation();
String op3 = service3.operation();
Arrays.asList(op1, op2, op3)

Поскольку вызовы синхронны, время, необходимое для этого, будет аддитивным. Для имитации медленного вызова ниже приведен тип реализации в каждом из вызовов метода:

1
2
3
4
5
6
public String operation() {
    logger.info("Start: Executing slow task in Service 1");
    Util.delay(7000);
    logger.info("End: Executing slow task in Service 1");
    return "operation1"
}

Итак, первая попытка использования rx-java с этими реализациями состоит в том, чтобы просто заставить эти длительные операции возвращать универсальный тип Observable , плохая реализация будет выглядеть так:

1
2
3
4
5
6
public Observable<string> operation() {
    logger.info("Start: Executing slow task in Service 1");
    Util.delay(7000);
    logger.info("End: Executing slow task in Service 1");
    return Observable.just("operation 1");
}

Таким образом, реализация вызывающего абонента меняется на следующее:

1
2
3
4
5
Observable<String> op1 = service1.operation();
Observable<String> op2 = service2.operation();
Observable<String> op3 = service3.operation();
 
Observable<List<String>> lst = Observable.merge(op1, op2, op3).toList();

Посмотрите, как вызывающая сторона формирует результаты, используя метод слияния .

Однако на этом этапе вызовы на каждый из вызовов службы все еще являются синхронными, чтобы сделать вызов асинхронным, вызовы службы можно выполнить с использованием пула потоков следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
public class Service1 {
    private static final Logger logger = LoggerFactory.getLogger(Service1.class);
    public Observable<String> operation() {
        return Observable.<String>create(s -> {
            logger.info("Start: Executing slow task in Service 1");
            Util.delay(7000);
            s.onNext("operation 1");
            logger.info("End: Executing slow task in Service 1");
            s.onCompleted();
        }).subscribeOn(Schedulers.computation());
    }
}

subscribeOn использует указанный планировщик для выполнения фактической операции.

Прелесть подхода в том, что вызывающий код этой службы вообще не изменяется, реализация там остается точно такой же, как и раньше, тогда как вызовы службы теперь асинхронны. Если вы заинтересованы в дальнейшем изучении этого примера, вот репозиторий Github с рабочими примерами.

Ссылка: Выполните асинхронные абстракции, используя rx-java от нашего партнера по JCG Биджу Кунджуммен из блога all and sundry.