Учебники

RxJava — Краткое руководство

RxJava — Обзор

RxJava — это расширение ReactiveX на основе Java. Он обеспечивает реализацию или проект ReactiveX на Java. Ниже приведены ключевые характеристики RxJava.

  • Расширяет шаблон наблюдателя.

  • Поддержка последовательности данных / событий.

  • Предоставляет операторы для декларативного составления последовательностей.

  • Внутренне обрабатывает потоки, синхронизацию, безопасность потоков и параллельные структуры данных.

Расширяет шаблон наблюдателя.

Поддержка последовательности данных / событий.

Предоставляет операторы для декларативного составления последовательностей.

Внутренне обрабатывает потоки, синхронизацию, безопасность потоков и параллельные структуры данных.

Что такое ReactiveX?

ReactiveX — это проект, целью которого является предоставление концепции реактивного программирования для различных языков программирования. Реактивное программирование относится к сценарию, в котором программа реагирует как и когда появляются данные. Это концепция программирования, основанная на событиях, и события могут распространяться на регистры наблюдателей.

В соответствии с Reactive , они объединили лучшее из шаблона Observer, шаблона Iterator и функционального шаблона.

Шаблон Observer сделан правильно. ReactiveX — это сочетание лучших идей из шаблона Observer, шаблона Iterator и функционального программирования.

Функциональное программирование

Функциональное программирование вращается вокруг создания программного обеспечения с использованием чистых функций. Чистая функция не зависит от предыдущего состояния и всегда возвращает один и тот же результат для тех же переданных параметров. Чистые функции помогают избежать проблем, связанных с общими объектами, изменяемыми данными и побочными эффектами, часто встречающимися в многопоточных средах.

Реактивное программирование

Реактивное программирование относится к программированию на основе событий, когда потоки данных поступают асинхронно и обрабатываются по прибытии.

Функциональное реактивное программирование

RxJava реализует обе концепции вместе, где данные потоков изменяются со временем, и функция потребителя реагирует соответственно.

Реактивный Манифест

Reactive Manifesto — это онлайновый документ, подтверждающий высокий стандарт систем прикладного программного обеспечения. Согласно манифесту, ниже приведены ключевые атрибуты реактивного программного обеспечения —

  • Отзывчивый — всегда должен отвечать своевременно.

  • Управляемый сообщениями — следует использовать асинхронную передачу сообщений между компонентами, чтобы они имели слабую связь.

  • Эластичный — должен оставаться отзывчивым даже при высокой нагрузке.

  • Эластичный — должен оставаться отзывчивым, даже если какой-либо компонент (ы) выходит из строя.

Отзывчивый — всегда должен отвечать своевременно.

Управляемый сообщениями — следует использовать асинхронную передачу сообщений между компонентами, чтобы они имели слабую связь.

Эластичный — должен оставаться отзывчивым даже при высокой нагрузке.

Эластичный — должен оставаться отзывчивым, даже если какой-либо компонент (ы) выходит из строя.

Ключевые компоненты RxJava

RxJava имеет два ключевых компонента: Observables и Observer.

  • Observable — представляет объект, похожий на Stream, который может выдавать ноль или более данных, может отправлять сообщение об ошибке, скорость которого может контролироваться во время передачи набора данных, может отправлять как конечные, так и бесконечные данные.

  • Наблюдатель — подписывается на данные последовательности наблюдаемой и реагирует на единицу наблюдаемой. Наблюдатели уведомляются всякий раз, когда Observable отправляет данные. Обозреватель обрабатывает данные один за другим.

Observable — представляет объект, похожий на Stream, который может выдавать ноль или более данных, может отправлять сообщение об ошибке, скорость которого может контролироваться во время передачи набора данных, может отправлять как конечные, так и бесконечные данные.

Наблюдатель — подписывается на данные последовательности наблюдаемой и реагирует на единицу наблюдаемой. Наблюдатели уведомляются всякий раз, когда Observable отправляет данные. Обозреватель обрабатывает данные один за другим.

Наблюдатель никогда не уведомляется, если элементы отсутствуют или обратный вызов для предыдущего элемента не возвращается.

RxJava — Настройка среды

Настройка локальной среды

RxJava — это библиотека для Java, поэтому самое первое требование — установить JDK на ваш компьютер.

Системные требования

JDK 1,5 или выше.
объем памяти Нет минимальных требований.
Дисковое пространство Нет минимальных требований.
Операционная система Нет минимальных требований.

Шаг 1 — Проверьте установку Java на вашем компьютере

Прежде всего, откройте консоль и выполните команду Java в зависимости от операционной системы, с которой вы работаете.

Операционные системы задача команда
Windows Открытая командная консоль c: \> Java-версия
Linux Открытый командный терминал $ java-версия
макинтош Открытый терминал машина: <joseph $ java -version

Давайте проверим вывод для всех операционных систем —

Операционные системы Выход
Windows

Java-версия «1.8.0_101»

Java (TM) SE Runtime Environment (сборка 1.8.0_101)

Linux

Java-версия «1.8.0_101»

Java (TM) SE Runtime Environment (сборка 1.8.0_101)

макинтош

Java-версия «1.8.0_101»

Java (TM) SE Runtime Environment (сборка 1.8.0_101)

Java-версия «1.8.0_101»

Java (TM) SE Runtime Environment (сборка 1.8.0_101)

Java-версия «1.8.0_101»

Java (TM) SE Runtime Environment (сборка 1.8.0_101)

Java-версия «1.8.0_101»

Java (TM) SE Runtime Environment (сборка 1.8.0_101)

Если у вас не установлена ​​Java в вашей системе, загрузите Java Software Development Kit (SDK) по следующей ссылке https://www.oracle.com . Мы принимаем Java 1.8.0_101 в качестве установленной версии для этого урока.

Шаг 2 — Установите JAVA Environment

Установите переменную среды JAVA_HOME, чтобы она указывала на местоположение базовой директории, где установлена ​​Java на вашем компьютере. Например.

Операционные системы Выход
Windows Установите переменную среды JAVA_HOME в C: \ Program Files \ Java \ jdk1.8.0_101
Linux экспорт JAVA_HOME = / usr / local / java-current
макинтош export JAVA_HOME = / Библиотека / Java / Главная

Добавьте местоположение компилятора Java в системный путь.

Операционные системы Выход
Windows Добавьте строку C: \ Program Files \ Java \ jdk1.8.0_101 \ bin в конце системной переменной Path .
Linux экспорт PATH = $ PATH: $ JAVA_HOME / bin /
макинтош не требуется

Проверьте установку Java с помощью команды java -version, как описано выше.

Шаг 3 — Скачать архив RxJava2

Загрузите последнюю версию файла jar RxJava из RxJava @ MVNRepository и его зависимости Reactive Streams @ MVNRepository . На момент написания этого руководства мы загрузили rxjava-2.2.4.jar, реактивный-streams-1.0.2.jar и скопировали его в папку C: \> RxJava.

Операционные системы Название архива
Windows rxjava-2.2.4.jar, реактивные потоки-1.0.2.jar
Linux rxjava-2.2.4.jar, реактивные потоки-1.0.2.jar
макинтош rxjava-2.2.4.jar, реактивные потоки-1.0.2.jar

Шаг 4 — Установите среду RxJava

Установите переменную окружения RX_JAVA, чтобы она указывала на местоположение базовой директории, где RxJava jar хранится на вашем компьютере. Предположим, что мы сохранили rxjava-2.2.4.jar и реактивный поток-1.0.2.jar в папке RxJava.

Sr.No ОС и описание
1

Windows

Установите переменную окружения RX_JAVA в C: \ RxJava

2

Linux

export RX_JAVA = / usr / local / RxJava

3

макинтош

export RX_JAVA = / Библиотека / RxJava

Windows

Установите переменную окружения RX_JAVA в C: \ RxJava

Linux

export RX_JAVA = / usr / local / RxJava

макинтош

export RX_JAVA = / Библиотека / RxJava

Шаг 5 — Установите переменную CLASSPATH

Установите переменную среды CLASSPATH, чтобы она указывала на местоположение jar RxJava.

Sr.No ОС и описание
1

Windows

Задайте для переменной среды CLASSPATH значение% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ реактивный-потоки-1.0.2.jar;.;

2

Linux

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: реактивный-потоки-1.0.2.jar :.

3

макинтош

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: реактивный-потоки-1.0.2.jar :.

Windows

Задайте для переменной среды CLASSPATH значение% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ реактивный-потоки-1.0.2.jar;.;

Linux

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: реактивный-потоки-1.0.2.jar :.

макинтош

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: реактивный-потоки-1.0.2.jar :.

Шаг 6 — Проверка установки RxJava

Создайте класс TestRx.java, как показано ниже —

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

Шаг 7 — Проверьте результат

Скомпилируйте классы, используя компилятор javac следующим образом:

C:\RxJava>javac Tester.java

Проверьте вывод.

Hello World!

RxJava — Как работает Observable

Observables представляет источники данных, где их слушают наблюдатели (подписчики) . Короче говоря, Observable испускает предметы, а подписчик затем потребляет эти предметы.

наблюдаемый

  • Observable предоставляет данные, как только подписчик начинает слушать.

  • Наблюдаемый может испускать любое количество предметов.

  • Наблюдаемый может испускать только сигнал завершения, а также без элемента.

  • Наблюдаемый может успешно завершиться.

  • Наблюдаемое может никогда не прекратиться. Например, кнопка может быть нажата любое количество раз.

  • Наблюдаемый может выдать ошибку в любой момент времени.

Observable предоставляет данные, как только подписчик начинает слушать.

Наблюдаемый может испускать любое количество предметов.

Наблюдаемый может испускать только сигнал завершения, а также без элемента.

Наблюдаемый может успешно завершиться.

Наблюдаемое может никогда не прекратиться. Например, кнопка может быть нажата любое количество раз.

Наблюдаемый может выдать ошибку в любой момент времени.

подписчик

  • Наблюдаемый может иметь несколько подписчиков.

  • Когда Observable испускает элемент, вызывается каждый метод подписчика onNext ().

  • Когда Observable заканчивает излучать элементы, вызывается каждый метод onComplete () подписчика.

  • Если Observable выдает ошибку, вызывается каждый метод onError () подписчика.

Наблюдаемый может иметь несколько подписчиков.

Когда Observable испускает элемент, вызывается каждый метод подписчика onNext ().

Когда Observable заканчивает излучать элементы, вызывается каждый метод onComplete () подписчика.

Если Observable выдает ошибку, вызывается каждый метод onError () подписчика.

RxJava — Создание наблюдаемых

Ниже приведены базовые классы для создания наблюдаемых.

  • Текучий — 0..N потоков, испускает 0 или n элементов. Поддерживает Reactive-Streams и обратное давление.

  • Наблюдаемый — 0..N течет, но нет обратного давления.

  • Одиночный — 1 предмет или ошибка. Может рассматриваться как реактивная версия вызова метода.

  • Completable — пункт не выброшен Используется как сигнал для завершения или ошибки. Может рассматриваться как реактивная версия Runnable.

  • MayBe — Либо нет предмета, либо 1 предмет выброшен. Может рассматриваться как реактивная версия Optional.

Текучий — 0..N потоков, испускает 0 или n элементов. Поддерживает Reactive-Streams и обратное давление.

Наблюдаемый — 0..N течет, но нет обратного давления.

Одиночный — 1 предмет или ошибка. Может рассматриваться как реактивная версия вызова метода.

Completable — пункт не выброшен Используется как сигнал для завершения или ошибки. Может рассматриваться как реактивная версия Runnable.

MayBe — Либо нет предмета, либо 1 предмет выброшен. Может рассматриваться как реактивная версия Optional.

Ниже приведены удобные методы для создания наблюдаемых в классе Observable.

  • just (T item) — Возвращает Observable, который сигнализирует о данном (постоянная ссылка) элементе и затем завершается.

  • fromIterable (Iterable source) — преобразует последовательность Iterable в ObservableSource, который испускает элементы в последовательности.

  • fromArray (T … items) — преобразует массив в ObservableSource, который испускает элементы в массиве.

  • fromCallable (Callable supplier) — возвращает Observable, который, когда наблюдатель подписывается на него, вызывает указанную вами функцию и затем генерирует значение, возвращаемое этой функцией.

  • fromFuture (Будущее будущее) — преобразует будущее в ObservableSource.

  • интервал (long initialDelay, long period, TimeUnit unit) — Возвращает наблюдаемое, которое испускает 0L после initialDelay и постоянно увеличивающиеся числа после каждого периода времени после этого.

just (T item) — Возвращает Observable, который сигнализирует о данном (постоянная ссылка) элементе и затем завершается.

fromIterable (Iterable source) — преобразует последовательность Iterable в ObservableSource, который испускает элементы в последовательности.

fromArray (T … items) — преобразует массив в ObservableSource, который испускает элементы в массиве.

fromCallable (Callable supplier) — возвращает Observable, который, когда наблюдатель подписывается на него, вызывает указанную вами функцию и затем генерирует значение, возвращаемое этой функцией.

fromFuture (Будущее будущее) — преобразует будущее в ObservableSource.

интервал (long initialDelay, long period, TimeUnit unit) — Возвращает наблюдаемое, которое испускает 0L после initialDelay и постоянно увеличивающиеся числа после каждого периода времени после этого.

RxJava — Единственная наблюдаемая

Класс Single представляет ответ с одним значением. Одна наблюдаемая может выдавать только одно успешное значение или ошибку. Он не генерирует событие onComplete.

Декларация класса

Ниже приводится объявление для класса io.reactivex.Single <T>

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

протокол

Ниже приведен последовательный протокол, в котором работает Single Observable:

onSubscribe (onSuccess | onError)?

Один пример

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Hello World

RxJava — MayBe Observable

Класс MayBe представляет отложенный ответ. Наблюдаемый MayBe может испускать либо одно успешное значение, либо нет значения.

Декларация класса

Ниже приводится объявление для класса io.reactivex.Single <T>

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

протокол

Ниже приведен последовательный протокол, с которым работает MayBe Observable.

onSubscribe (onSuccess | onError | OnComplete)?

Пример MayBe

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Hello World

RxJava — Completable Observable

Класс Completable представляет отложенный ответ. Наблюдаемое завершение может указывать либо на успешное завершение, либо на ошибку.

Декларация класса

Ниже приводится объявление для класса io.reactivex.Completable

public abstract class Completable
extends Object
implements CompletableSource

протокол

Ниже приведен последовательный протокол, который работает Completable Observable:

onSubscribe (onError | onComplete)?

Завершаемый пример

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Started!
Done!

RxJava — Использование CompositeDisposable

Класс CompositeDisposable представляет контейнер, который может содержать несколько одноразовых элементов и предлагает O (1) сложность добавления и удаления одноразовых изделий.

Декларация класса

Ниже приводится объявление для класса io.reactivex.disposables.CompositeDisposable

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

Пример CompositeDisposable

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Hello
Hi

RxJava — Создание операторов

Ниже приведены операторы, которые используются для создания Observable.

Sr.No. Оператор и описание
1

Создайте

Создает Observable с нуля и позволяет методу наблюдателя вызываться программно.

2

Перенести

Не создавайте Observable, пока наблюдатель не подпишется. Создает свежую наблюдаемую для каждого наблюдателя.

3

Пустой / Никогда / Throw

Создает Observable с ограниченным поведением.

4

От

Преобразует объект / структуру данных в Observable.

5

интервал

Создает наблюдаемые исходящие целые числа в последовательности с промежутком заданного временного интервала.

6

Просто

Преобразует структуру объекта / данных в Observable для излучения объектов того же или того же типа.

7

Спектр

Создает наблюдаемые исходящие целые числа в последовательности заданного диапазона.

8

Повторение

Создает наблюдаемые исходящие целые числа последовательно.

9

Начните

Создает Observable для выдачи возвращаемого значения функции.

10

таймер

Создает Observable для испускания одного предмета после заданной задержки.

Создайте

Создает Observable с нуля и позволяет методу наблюдателя вызываться программно.

Перенести

Не создавайте Observable, пока наблюдатель не подпишется. Создает свежую наблюдаемую для каждого наблюдателя.

Пустой / Никогда / Throw

Создает Observable с ограниченным поведением.

От

Преобразует объект / структуру данных в Observable.

интервал

Создает наблюдаемые исходящие целые числа в последовательности с промежутком заданного временного интервала.

Просто

Преобразует структуру объекта / данных в Observable для излучения объектов того же или того же типа.

Спектр

Создает наблюдаемые исходящие целые числа в последовательности заданного диапазона.

Повторение

Создает наблюдаемые исходящие целые числа последовательно.

Начните

Создает Observable для выдачи возвращаемого значения функции.

таймер

Создает Observable для испускания одного предмета после заданной задержки.

Пример создания оператора

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

ABCDEFG

RxJava — Операторы преобразования

Ниже приведены операторы, которые используются для преобразования элемента, испускаемого из наблюдаемой.

Sr.No. Оператор и описание
1

буфер

Периодически собирает предметы из Observable в связки, а затем испускает связки, а не предметы.

2

FlatMap

Используется во вложенных наблюдаемых объектах. Преобразует предметы в Observables. Затем сгладьте предметы в единый Observable.

3

Группа по

Разделите Observable на набор Observables, организованных по ключу, чтобы испустить другую группу предметов.

4

карта

Примените функцию к каждому излучаемому элементу, чтобы преобразовать его.

5

сканирование

Примените функцию к каждому излучаемому элементу последовательно, а затем введите последующее значение.

6

Окно

Периодически собирает предметы из Observable в окна Observable, а затем испускает окна, а не предметы.

буфер

Периодически собирает предметы из Observable в связки, а затем испускает связки, а не предметы.

FlatMap

Используется во вложенных наблюдаемых объектах. Преобразует предметы в Observables. Затем сгладьте предметы в единый Observable.

Группа по

Разделите Observable на набор Observables, организованных по ключу, чтобы испустить другую группу предметов.

карта

Примените функцию к каждому излучаемому элементу, чтобы преобразовать его.

сканирование

Примените функцию к каждому излучаемому элементу последовательно, а затем введите последующее значение.

Окно

Периодически собирает предметы из Observable в окна Observable, а затем испускает окна, а не предметы.

Пример оператора преобразования

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

ABCDEFG

RxJava — операторы фильтрации

Ниже приведены операторы, которые используются для выборочного выброса элемента (ов) из наблюдаемой.

Sr.No. Оператор и описание
1

Debounce

Испускает предметы только тогда, когда происходит тайм-аут, не испуская другой предмет.

2

отчетливый

Издает только уникальные предметы.

3

ElementAt

испускать только элемент с индексом n, испускаемым Observable.

4

Фильтр

Издает только те элементы, которые передают данную функцию предиката.

5

Первый

Издает первый элемент или первый элемент, который соответствует заданным критериям.

6

IgnoreElements

Не испускает никаких предметов из Observable, но отмечает завершение.

7

Прошлой

Испускает последний элемент из Observable.

8

Образец

Издает самый последний элемент с заданным интервалом времени.

9

Пропускать

Пропускает первые n элементов из наблюдаемой.

10

SkipLast

Пропускает последние n элементов из наблюдаемой.

11

принимать

берет первые n предметов из наблюдаемой.

12

TakeLast

берет последние n предметов из наблюдаемой.

Debounce

Испускает предметы только тогда, когда происходит тайм-аут, не испуская другой предмет.

отчетливый

Издает только уникальные предметы.

ElementAt

испускать только элемент с индексом n, испускаемым Observable.

Фильтр

Издает только те элементы, которые передают данную функцию предиката.

Первый

Издает первый элемент или первый элемент, который соответствует заданным критериям.

IgnoreElements

Не испускает никаких предметов из Observable, но отмечает завершение.

Прошлой

Испускает последний элемент из Observable.

Образец

Издает самый последний элемент с заданным интервалом времени.

Пропускать

Пропускает первые n элементов из наблюдаемой.

SkipLast

Пропускает последние n элементов из наблюдаемой.

принимать

берет первые n предметов из наблюдаемой.

TakeLast

берет последние n предметов из наблюдаемой.

Пример оператора фильтрации

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

ab

RxJava — Операторы объединения

Ниже приведены операторы, которые используются для создания одной наблюдаемой из нескольких наблюдаемых.

Sr.No. Оператор и описание
1 И / И / Когда

Объедините наборы предметов, используя посредников Pattern и Plan.

2 CombineLatest

Объедините последний элемент, испускаемый каждым Наблюдаемым через указанную функцию, и выпустите полученный элемент.

3 Присоединиться

Объедините элементы, испускаемые двумя Наблюдаемыми, если они испускаются в течение периода времени второго испускаемого Наблюдаемого элемента.

4 сливаться

Объединяет предметы, испускаемые Observables.

5 StartWith

Испускать указанную последовательность предметов перед тем, как начать испускать предметы из источника Observable

6 переключатель

Издает самые последние предметы, испускаемые Observables.

7 застежкамолния

Объединяет элементы Observables на основе функции и испускает полученные элементы.

Объедините наборы предметов, используя посредников Pattern и Plan.

Объедините последний элемент, испускаемый каждым Наблюдаемым через указанную функцию, и выпустите полученный элемент.

Объедините элементы, испускаемые двумя Наблюдаемыми, если они испускаются в течение периода времени второго испускаемого Наблюдаемого элемента.

Объединяет предметы, испускаемые Observables.

Испускать указанную последовательность предметов перед тем, как начать испускать предметы из источника Observable

Издает самые последние предметы, испускаемые Observables.

Объединяет элементы Observables на основе функции и испускает полученные элементы.

Пример оператора объединения

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

g1g2g3g4g5g6

RxJava — Утилиты

Ниже приведены операторы, которые часто полезны с Observables.

Sr.No. Оператор и описание
1

задержка

Зарегистрируйте действие для обработки наблюдаемых событий жизненного цикла.

2

Материализовать / дематериализоваться

Представляет отправленный элемент и отправленное уведомление.

3

ObserveOn

Укажите планировщик для наблюдения.

4

Сериализация

Force Observable совершать сериализованные вызовы.

5

Подписывайся

Работать на выбросы предметов и уведомлений, как полный из наблюдаемой

6

SubscribeOn

Укажите планировщик, который будет использоваться Observable, когда он подписан.

7

Интервал времени

Преобразование Обсерватории в излучение с указанием количества времени, прошедшего между выбросами.

8

Таймаут

Выдает уведомление об ошибке, если указанное время происходит без отправки какого-либо элемента.

9

Отметка

Прикрепите метку времени к каждому излучаемому элементу.

9

С помощью

Создает одноразовый ресурс или тот же срок жизни, что и у Observable.

задержка

Зарегистрируйте действие для обработки наблюдаемых событий жизненного цикла.

Материализовать / дематериализоваться

Представляет отправленный элемент и отправленное уведомление.

ObserveOn

Укажите планировщик для наблюдения.

Сериализация

Force Observable совершать сериализованные вызовы.

Подписывайся

Работать на выбросы предметов и уведомлений, как полный из наблюдаемой

SubscribeOn

Укажите планировщик, который будет использоваться Observable, когда он подписан.

Интервал времени

Преобразование Обсерватории в излучение с указанием количества времени, прошедшего между выбросами.

Таймаут

Выдает уведомление об ошибке, если указанное время происходит без отправки какого-либо элемента.

Отметка

Прикрепите метку времени к каждому излучаемому элементу.

С помощью

Создает одноразовый ресурс или тот же срок жизни, что и у Observable.

Пример утилиты оператора

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

abcdefg

RxJava — условные операторы

Ниже приводятся операторы, которые оценивают один или несколько наблюдаемых или выбрасываемых элементов.

Sr.No. Оператор и описание
1

Все

Оценивает все предметы, выбрасываемые в соответствии с заданными критериями.

2

Amb

Испускает все предметы из первой наблюдаемой только с учетом нескольких наблюдаемых.

3

Содержит

Проверяет, издает ли Observable конкретный предмет или нет.

4

DefaultIfEmpty

Издает элемент по умолчанию, если Observable ничего не излучает.

5

SequenceEqual

Проверяет, испускают ли две наблюдаемые одну и ту же последовательность предметов.

6

SkipUntil

Сбрасывает предметы, испускаемые первой Наблюдаемой, пока вторая Наблюдаемая не испустит предмет.

7

SkipWhile

Откажитесь от предметов, испускаемых Observable, пока данное условие не станет ложным.

8

TakeUntil

Сбрасывает предметы, испускаемые Наблюдаемой после того, как вторая Наблюдаемая испускает предмет или завершает свою работу.

9

TakeWhile

Сбросить предметы, испускаемые Наблюдаемым после того, как указанное условие становится ложным.

Все

Оценивает все предметы, выбрасываемые в соответствии с заданными критериями.

Amb

Испускает все предметы из первой наблюдаемой только с учетом нескольких наблюдаемых.

Содержит

Проверяет, издает ли Observable конкретный предмет или нет.

DefaultIfEmpty

Издает элемент по умолчанию, если Observable ничего не излучает.

SequenceEqual

Проверяет, испускают ли две наблюдаемые одну и ту же последовательность предметов.

SkipUntil

Сбрасывает предметы, испускаемые первой Наблюдаемой, пока вторая Наблюдаемая не испустит предмет.

SkipWhile

Откажитесь от предметов, испускаемых Observable, пока данное условие не станет ложным.

TakeUntil

Сбрасывает предметы, испускаемые Наблюдаемой после того, как вторая Наблюдаемая испускает предмет или завершает свою работу.

TakeWhile

Сбросить предметы, испускаемые Наблюдаемым после того, как указанное условие становится ложным.

Пример условного оператора

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

No Data
a

RxJava — математические операторы

Ниже приводятся операторы, которые оперируют целыми объектами, испускаемыми Observable.

Sr.No. Оператор и описание
1

Средний

Оценивает средние значения всех элементов и выдает результат.

2

Concat

Выдает все предметы из нескольких наблюдаемых без чередования.

3

подсчитывать

Подсчитывает все предметы и испускает результат.

4

Максимум

Оценивает максимально ценный предмет из всех предметов и выдает результат.

5

Min

Оценивает минимальный предмет из всех предметов и выдает результат.

6

уменьшить

Примените функцию к каждому элементу и верните результат.

7

сумма

Оценивает сумму всех предметов и выдает результат.

Средний

Оценивает средние значения всех элементов и выдает результат.

Concat

Выдает все предметы из нескольких наблюдаемых без чередования.

подсчитывать

Подсчитывает все предметы и испускает результат.

Максимум

Оценивает максимально ценный предмет из всех предметов и выдает результат.

Min

Оценивает минимальный предмет из всех предметов и выдает результат.

уменьшить

Примените функцию к каждому элементу и верните результат.

сумма

Оценивает сумму всех предметов и выдает результат.

Пример математического оператора

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

abcdefg123456

RxJava — Операторы подключения

Ниже приведены операторы, которые более точно контролируют подписку.

Sr.No. Оператор и описание
1

соединять

Поручить подключаемому Observable для передачи предметов своим подписчикам.

2

Публиковать

Преобразует наблюдаемое в подключаемое наблюдаемое.

3

RefCount

Преобразует подключаемый объект Observable в обычный объект Observable.

4

переигровка

Убедитесь, что каждый подписчик просматривает одинаковую последовательность отправленных элементов, даже после того, как Observable начал излучать элементы и подписчики подписываются позже.

соединять

Поручить подключаемому Observable для передачи предметов своим подписчикам.

Публиковать

Преобразует наблюдаемое в подключаемое наблюдаемое.

RefCount

Преобразует подключаемый объект Observable в обычный объект Observable.

переигровка

Убедитесь, что каждый подписчик просматривает одинаковую последовательность отправленных элементов, даже после того, как Observable начал излучать элементы и подписчики подписываются позже.

Пример подключения оператора

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

0
7
abcdefg

RxJava — Предметы

Согласно Реактиву , Субъект может действовать как Наблюдаемый, так и Наблюдатель.

Субъект — это своего рода мост или прокси-сервер, доступный в некоторых реализациях ReactiveX, который действует как наблюдатель и как наблюдаемый. Поскольку он является наблюдателем, он может подписаться на одну или несколько наблюдаемых, а поскольку он является наблюдаемым, он может проходить через элементы, которые он наблюдает, переиздавая их, а также может испускать новые элементы.

Есть четыре типа предметов —

Sr.No. Тема и описание
1

Опубликовать тему

Издает только те элементы, которые выбрасываются после подписки.

2 Повтор темы

Издает все элементы, испускаемые источником Observable, независимо от того, когда он подписался на Observable.

3

Поведение Тема

При подписке испускает самый последний элемент, а затем продолжает испускать элемент, испускаемый источником Observable.

4

Асинхронная тема

Издает последний элемент, испускаемый источником Observable после его завершения.

Опубликовать тему

Издает только те элементы, которые выбрасываются после подписки.

Издает все элементы, испускаемые источником Observable, независимо от того, когда он подписался на Observable.

Поведение Тема

При подписке испускает самый последний элемент, а затем продолжает испускать элемент, испускаемый источником Observable.

Асинхронная тема

Издает последний элемент, испускаемый источником Observable после его завершения.

RxJava — PublishSubject

PublishSubject испускает элементы для подписчиков, которые в настоящее время подписаны, и события терминала для текущих или поздних наблюдателей.

Декларация класса

Ниже приводится объявление для класса io.reactivex.subjects.PublishSubject <T>

public final class PublishSubject<T>
extends Subject<T>

Пример PublishSubject

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

abcd
d

RxJava — BehaviorSubject

BehaviorSubject испускает самый последний наблюдаемый элемент, а затем все последующие наблюдаемые элементы каждому подписчику Observer.

Декларация класса

Ниже приводится объявление для класса io.reactivex.subjects.BehaviorSubject <T>

public final class BehaviorSubject<T>
extends Subject<T>

Пример BehaviorSubject

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

abcd
cd

RxJava — ReplaySubject

ReplaySubject воспроизводит события / элементы для текущих и поздних наблюдателей.

Декларация класса

Ниже приводится объявление для класса io.reactivex.subjects.ReplaySubject <T>

public final class ReplaySubject<T>
extends Subject<T>

Пример ReplaySubject

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

abcd
abcd

RxJava — AsyncSubject

AsyncSubject выдает единственное последнее значение, за которым следует событие завершения или полученная ошибка для наблюдателей.

Декларация класса

Ниже приводится объявление для класса io.reactivex.subjects.AsyncSubject <T>

public final class  AsyncSubject<T>
extends Subject<T>

Пример AsyncSubject

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

d
d

RxJava — Планировщики

Планировщики используются в многопоточной среде для работы с наблюдаемыми операторами.

Согласно Реактиву , Планировщик используется, чтобы запланировать, как цепочка операторов будет применяться к различным потокам.

По умолчанию Observable и цепочка операторов, к которой вы обращаетесь, будут выполнять свою работу и уведомлять своих наблюдателей в том же потоке, для которого вызывается метод Subscribe. Оператор SubscribeOn изменяет это поведение, указывая другой планировщик, с которым должен работать Observable. Оператор ObserveOn указывает другой планировщик, который Observable будет использовать для отправки уведомлений своим наблюдателям.

В RxJava доступны следующие типы планировщиков:

Sr.No. Планировщик и описание
1

Schedulers.computation ()

Создает и возвращает планировщик, предназначенный для вычислительной работы. Количество потоков, которые будут запланированы, зависит от процессоров, присутствующих в системе. На каждый процессор допускается один поток. Лучше всего подходит для циклов обработки событий или операций обратного вызова.

2

Schedulers.io ()

Создает и возвращает планировщик, предназначенный для работы с IO. Пул потоков может расширяться по мере необходимости.

3

Schedulers.newThread ()

Создает и возвращает планировщик, который создает новый поток для каждой единицы работы.

4

Schedulers.trampoline ()

Создает и возвращает планировщик, который ставит в очередь работу в текущем потоке, который будет выполнен после завершения текущей работы.

4

Schedulers.from (java.util.concurrent.Executor executor)

Преобразует исполнителя в новый экземпляр планировщика.

Schedulers.computation ()

Создает и возвращает планировщик, предназначенный для вычислительной работы. Количество потоков, которые будут запланированы, зависит от процессоров, присутствующих в системе. На каждый процессор допускается один поток. Лучше всего подходит для циклов обработки событий или операций обратного вызова.

Schedulers.io ()

Создает и возвращает планировщик, предназначенный для работы с IO. Пул потоков может расширяться по мере необходимости.

Schedulers.newThread ()

Создает и возвращает планировщик, который создает новый поток для каждой единицы работы.

Schedulers.trampoline ()

Создает и возвращает планировщик, который ставит в очередь работу в текущем потоке, который будет выполнен после завершения текущей работы.

Schedulers.from (java.util.concurrent.Executor executor)

Преобразует исполнителя в новый экземпляр планировщика.

RxJava — Батутный планировщик

Метод Schedulers.trampoline () создает и возвращает планировщик, который ставит в очередь работу в текущем потоке, который будет выполнен после завершения текущей работы.

Пример Schedulers.trampoline ()

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

RxJava — Планировщик NewThread

Метод Schedulers.newThread () создает и возвращает планировщик, который создает новый поток для каждой единицы работы.

Пример Schedulers.newThread ()

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

RxJava — Планировщик вычислений

Метод Schedulers.computation () создает и возвращает планировщик, предназначенный для вычислительной работы. Количество потоков, которые будут запланированы, зависит от процессоров, присутствующих в системе. На каждый процессор допускается один поток. Лучше всего подходит для циклов обработки событий или операций обратного вызова.

Пример Schedulers.computation ()

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

RxJava — IO Scheduler

Метод Schedulers.io () создает и возвращает планировщик, предназначенный для работы, связанной с вводом-выводом. Пул потоков может расширяться по мере необходимости. Подходит для интенсивных операций ввода-вывода.

Пример Schedulers.io ()

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

RxJava — от планировщика

Метод Schedulers.from (Executor) преобразует Executor в новый экземпляр Scheduler.

Пример Schedulers.from (Исполнитель)

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

RxJava — Буферизация

Оператор буферизации позволяет собирать элементы, испускаемые Observable, в список или пакеты и генерировать эти пакеты вместо элементов. В приведенном ниже примере мы создали Observable для испускания 9 элементов, и используя буферизацию, 3 элемента будут отправлены вместе.

Пример буферизации

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done! 

RxJava — Windowing

Оператор управления окнами работает аналогично оператору буфера, но он позволяет собирать элементы, испускаемые Наблюдаемой, в другую наблюдаемую, а не в коллекцию, и излучать эти Наблюдаемые вместо коллекций. В приведенном ниже примере мы создали Observable для испускания 9 элементов, и с помощью оконного оператора 3 Observable будут испускаться вместе.

Пример работы с окнами

Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Проверьте результат

Скомпилируйте класс с помощью компилятора javac следующим образом:

C:\RxJava>javac ObservableTester.java

Теперь запустите ObservableTester следующим образом:

C:\RxJava>java ObservableTester

Он должен произвести следующий вывод —