RxJava — Обзор
RxJava — это расширение ReactiveX на основе Java. Он обеспечивает реализацию или проект ReactiveX на Java. Ниже приведены ключевые характеристики RxJava.
-
Расширяет шаблон наблюдателя.
-
Поддержка последовательности данных / событий.
-
Предоставляет операторы для декларативного составления последовательностей.
-
Внутренне обрабатывает потоки, синхронизацию, безопасность потоков и параллельные структуры данных.
Расширяет шаблон наблюдателя.
Поддержка последовательности данных / событий.
Предоставляет операторы для декларативного составления последовательностей.
Внутренне обрабатывает потоки, синхронизацию, безопасность потоков и параллельные структуры данных.
Что такое ReactiveX?
ReactiveX — это проект, целью которого является предоставление концепции реактивного программирования для различных языков программирования. Реактивное программирование относится к сценарию, в котором программа реагирует как и когда появляются данные. Это концепция программирования, основанная на событиях, и события могут распространяться на регистры наблюдателей.
В соответствии с Reactive , они объединили лучшее из шаблона Observer, шаблона Iterator и функционального шаблона.
Шаблон Observer сделан правильно. ReactiveX — это сочетание лучших идей из шаблона Observer, шаблона Iterator и функционального программирования.
Функциональное программирование
Функциональное программирование вращается вокруг создания программного обеспечения с использованием чистых функций. Чистая функция не зависит от предыдущего состояния и всегда возвращает один и тот же результат для тех же переданных параметров. Чистые функции помогают избежать проблем, связанных с общими объектами, изменяемыми данными и побочными эффектами, часто встречающимися в многопоточных средах.
Реактивное программирование
Реактивное программирование относится к программированию на основе событий, когда потоки данных поступают асинхронно и обрабатываются по прибытии.
Функциональное реактивное программирование
RxJava реализует обе концепции вместе, где данные потоков изменяются со временем, и функция потребителя реагирует соответственно.
Реактивный Манифест
Reactive Manifesto — это онлайновый документ, подтверждающий высокий стандарт систем прикладного программного обеспечения. Согласно манифесту, ниже приведены ключевые атрибуты реактивного программного обеспечения —
-
Отзывчивый — всегда должен отвечать своевременно.
-
Управляемый сообщениями — следует использовать асинхронную передачу сообщений между компонентами, чтобы они имели слабую связь.
-
Эластичный — должен оставаться отзывчивым даже при высокой нагрузке.
-
Эластичный — должен оставаться отзывчивым, даже если какой-либо компонент (ы) выходит из строя.
Отзывчивый — всегда должен отвечать своевременно.
Управляемый сообщениями — следует использовать асинхронную передачу сообщений между компонентами, чтобы они имели слабую связь.
Эластичный — должен оставаться отзывчивым даже при высокой нагрузке.
Эластичный — должен оставаться отзывчивым, даже если какой-либо компонент (ы) выходит из строя.
Ключевые компоненты RxJava
RxJava имеет два ключевых компонента: Observables и Observer.
-
Observable — представляет объект, похожий на Stream, который может выдавать ноль или более данных, может отправлять сообщение об ошибке, скорость которого может контролироваться во время передачи набора данных, может отправлять как конечные, так и бесконечные данные.
-
Наблюдатель — подписывается на данные последовательности наблюдаемой и реагирует на единицу наблюдаемой. Наблюдатели уведомляются всякий раз, когда Observable отправляет данные. Обозреватель обрабатывает данные один за другим.
Observable — представляет объект, похожий на Stream, который может выдавать ноль или более данных, может отправлять сообщение об ошибке, скорость которого может контролироваться во время передачи набора данных, может отправлять как конечные, так и бесконечные данные.
Наблюдатель — подписывается на данные последовательности наблюдаемой и реагирует на единицу наблюдаемой. Наблюдатели уведомляются всякий раз, когда Observable отправляет данные. Обозреватель обрабатывает данные один за другим.
Наблюдатель никогда не уведомляется, если элементы отсутствуют или обратный вызов для предыдущего элемента не возвращается.
RxJava — Настройка среды
Настройка локальной среды
RxJava — это библиотека для Java, поэтому самое первое требование — установить JDK на ваш компьютер.
Системные требования
JDK | 1,5 или выше. |
---|---|
объем памяти | Нет минимальных требований. |
Дисковое пространство | Нет минимальных требований. |
Операционная система | Нет минимальных требований. |
Шаг 1 — Проверьте установку Java на вашем компьютере
Прежде всего, откройте консоль и выполните команду Java в зависимости от операционной системы, с которой вы работаете.
Операционные системы | задача | команда |
---|---|---|
Windows | Открытая командная консоль | c: \> Java-версия |
Linux | Открытый командный терминал | $ java-версия |
макинтош | Открытый терминал | машина: <joseph $ java -version |
Давайте проверим вывод для всех операционных систем —
Операционные системы | Выход |
---|---|
Windows |
Java-версия «1.8.0_101» Java (TM) SE Runtime Environment (сборка 1.8.0_101) |
Linux |
Java-версия «1.8.0_101» Java (TM) SE Runtime Environment (сборка 1.8.0_101) |
макинтош |
Java-версия «1.8.0_101» Java (TM) SE Runtime Environment (сборка 1.8.0_101) |
Java-версия «1.8.0_101»
Java (TM) SE Runtime Environment (сборка 1.8.0_101)
Java-версия «1.8.0_101»
Java (TM) SE Runtime Environment (сборка 1.8.0_101)
Java-версия «1.8.0_101»
Java (TM) SE Runtime Environment (сборка 1.8.0_101)
Если у вас не установлена Java в вашей системе, загрузите Java Software Development Kit (SDK) по следующей ссылке https://www.oracle.com . Мы принимаем Java 1.8.0_101 в качестве установленной версии для этого урока.
Шаг 2 — Установите JAVA Environment
Установите переменную среды JAVA_HOME, чтобы она указывала на местоположение базовой директории, где установлена Java на вашем компьютере. Например.
Операционные системы | Выход |
---|---|
Windows | Установите переменную среды JAVA_HOME в C: \ Program Files \ Java \ jdk1.8.0_101 |
Linux | экспорт JAVA_HOME = / usr / local / java-current |
макинтош | export JAVA_HOME = / Библиотека / Java / Главная |
Добавьте местоположение компилятора Java в системный путь.
Операционные системы | Выход |
---|---|
Windows | Добавьте строку C: \ Program Files \ Java \ jdk1.8.0_101 \ bin в конце системной переменной Path . |
Linux | экспорт PATH = $ PATH: $ JAVA_HOME / bin / |
макинтош | не требуется |
Проверьте установку Java с помощью команды java -version, как описано выше.
Шаг 3 — Скачать архив RxJava2
Загрузите последнюю версию файла jar RxJava из RxJava @ MVNRepository и его зависимости Reactive Streams @ MVNRepository . На момент написания этого руководства мы загрузили rxjava-2.2.4.jar, реактивный-streams-1.0.2.jar и скопировали его в папку C: \> RxJava.
Операционные системы | Название архива |
---|---|
Windows | rxjava-2.2.4.jar, реактивные потоки-1.0.2.jar |
Linux | rxjava-2.2.4.jar, реактивные потоки-1.0.2.jar |
макинтош | rxjava-2.2.4.jar, реактивные потоки-1.0.2.jar |
Шаг 4 — Установите среду RxJava
Установите переменную окружения RX_JAVA, чтобы она указывала на местоположение базовой директории, где RxJava jar хранится на вашем компьютере. Предположим, что мы сохранили rxjava-2.2.4.jar и реактивный поток-1.0.2.jar в папке RxJava.
Sr.No | ОС и описание |
---|---|
1 |
Windows Установите переменную окружения RX_JAVA в C: \ RxJava |
2 |
Linux export RX_JAVA = / usr / local / RxJava |
3 |
макинтош export RX_JAVA = / Библиотека / RxJava |
Windows
Установите переменную окружения RX_JAVA в C: \ RxJava
Linux
export RX_JAVA = / usr / local / RxJava
макинтош
export RX_JAVA = / Библиотека / RxJava
Шаг 5 — Установите переменную CLASSPATH
Установите переменную среды CLASSPATH, чтобы она указывала на местоположение jar RxJava.
Sr.No | ОС и описание |
---|---|
1 |
Windows Задайте для переменной среды CLASSPATH значение% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ реактивный-потоки-1.0.2.jar;.; |
2 |
Linux export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: реактивный-потоки-1.0.2.jar :. |
3 |
макинтош export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: реактивный-потоки-1.0.2.jar :. |
Windows
Задайте для переменной среды CLASSPATH значение% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ реактивный-потоки-1.0.2.jar;.;
Linux
export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: реактивный-потоки-1.0.2.jar :.
макинтош
export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: реактивный-потоки-1.0.2.jar :.
Шаг 6 — Проверка установки RxJava
Создайте класс TestRx.java, как показано ниже —
import io.reactivex.Flowable; public class TestRx { public static void main(String[] args) { Flowable.just("Hello World!") .subscribe(System.out::println); } }
Шаг 7 — Проверьте результат
Скомпилируйте классы, используя компилятор javac следующим образом:
C:\RxJava>javac Tester.java
Проверьте вывод.
Hello World!
RxJava — Как работает Observable
Observables представляет источники данных, где их слушают наблюдатели (подписчики) . Короче говоря, Observable испускает предметы, а подписчик затем потребляет эти предметы.
наблюдаемый
-
Observable предоставляет данные, как только подписчик начинает слушать.
-
Наблюдаемый может испускать любое количество предметов.
-
Наблюдаемый может испускать только сигнал завершения, а также без элемента.
-
Наблюдаемый может успешно завершиться.
-
Наблюдаемое может никогда не прекратиться. Например, кнопка может быть нажата любое количество раз.
-
Наблюдаемый может выдать ошибку в любой момент времени.
Observable предоставляет данные, как только подписчик начинает слушать.
Наблюдаемый может испускать любое количество предметов.
Наблюдаемый может испускать только сигнал завершения, а также без элемента.
Наблюдаемый может успешно завершиться.
Наблюдаемое может никогда не прекратиться. Например, кнопка может быть нажата любое количество раз.
Наблюдаемый может выдать ошибку в любой момент времени.
подписчик
-
Наблюдаемый может иметь несколько подписчиков.
-
Когда Observable испускает элемент, вызывается каждый метод подписчика onNext ().
-
Когда Observable заканчивает излучать элементы, вызывается каждый метод onComplete () подписчика.
-
Если Observable выдает ошибку, вызывается каждый метод onError () подписчика.
Наблюдаемый может иметь несколько подписчиков.
Когда Observable испускает элемент, вызывается каждый метод подписчика onNext ().
Когда Observable заканчивает излучать элементы, вызывается каждый метод onComplete () подписчика.
Если Observable выдает ошибку, вызывается каждый метод onError () подписчика.
RxJava — Создание наблюдаемых
Ниже приведены базовые классы для создания наблюдаемых.
-
Текучий — 0..N потоков, испускает 0 или n элементов. Поддерживает Reactive-Streams и обратное давление.
-
Наблюдаемый — 0..N течет, но нет обратного давления.
-
Одиночный — 1 предмет или ошибка. Может рассматриваться как реактивная версия вызова метода.
-
Completable — пункт не выброшен Используется как сигнал для завершения или ошибки. Может рассматриваться как реактивная версия Runnable.
-
MayBe — Либо нет предмета, либо 1 предмет выброшен. Может рассматриваться как реактивная версия Optional.
Текучий — 0..N потоков, испускает 0 или n элементов. Поддерживает Reactive-Streams и обратное давление.
Наблюдаемый — 0..N течет, но нет обратного давления.
Одиночный — 1 предмет или ошибка. Может рассматриваться как реактивная версия вызова метода.
Completable — пункт не выброшен Используется как сигнал для завершения или ошибки. Может рассматриваться как реактивная версия Runnable.
MayBe — Либо нет предмета, либо 1 предмет выброшен. Может рассматриваться как реактивная версия Optional.
Ниже приведены удобные методы для создания наблюдаемых в классе Observable.
-
just (T item) — Возвращает Observable, который сигнализирует о данном (постоянная ссылка) элементе и затем завершается.
-
fromIterable (Iterable source) — преобразует последовательность Iterable в ObservableSource, который испускает элементы в последовательности.
-
fromArray (T … items) — преобразует массив в ObservableSource, который испускает элементы в массиве.
-
fromCallable (Callable supplier) — возвращает Observable, который, когда наблюдатель подписывается на него, вызывает указанную вами функцию и затем генерирует значение, возвращаемое этой функцией.
-
fromFuture (Будущее будущее) — преобразует будущее в ObservableSource.
-
интервал (long initialDelay, long period, TimeUnit unit) — Возвращает наблюдаемое, которое испускает 0L после initialDelay и постоянно увеличивающиеся числа после каждого периода времени после этого.
just (T item) — Возвращает Observable, который сигнализирует о данном (постоянная ссылка) элементе и затем завершается.
fromIterable (Iterable source) — преобразует последовательность Iterable в ObservableSource, который испускает элементы в последовательности.
fromArray (T … items) — преобразует массив в ObservableSource, который испускает элементы в массиве.
fromCallable (Callable supplier) — возвращает Observable, который, когда наблюдатель подписывается на него, вызывает указанную вами функцию и затем генерирует значение, возвращаемое этой функцией.
fromFuture (Будущее будущее) — преобразует будущее в ObservableSource.
интервал (long initialDelay, long period, TimeUnit unit) — Возвращает наблюдаемое, которое испускает 0L после initialDelay и постоянно увеличивающиеся числа после каждого периода времени после этого.
RxJava — Единственная наблюдаемая
Класс Single представляет ответ с одним значением. Одна наблюдаемая может выдавать только одно успешное значение или ошибку. Он не генерирует событие onComplete.
Декларация класса
Ниже приводится объявление для класса io.reactivex.Single <T> —
public abstract class Single<T> extends Object implements SingleSource<T>
протокол
Ниже приведен последовательный протокол, в котором работает Single Observable:
onSubscribe (onSuccess | onError)?
Один пример
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Single; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create the observable Single<String> testSingle = Single.just("Hello World"); //Create an observer Disposable disposable = testSingle .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Hello World
RxJava — MayBe Observable
Класс MayBe представляет отложенный ответ. Наблюдаемый MayBe может испускать либо одно успешное значение, либо нет значения.
Декларация класса
Ниже приводится объявление для класса io.reactivex.Single <T> —
public abstract class Maybe<T> extends Object implements MaybeSource<T>
протокол
Ниже приведен последовательный протокол, с которым работает MayBe Observable.
onSubscribe (onSuccess | onError | OnComplete)?
Пример MayBe
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Maybe.just("Hello World") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Hello World
RxJava — Completable Observable
Класс Completable представляет отложенный ответ. Наблюдаемое завершение может указывать либо на успешное завершение, либо на ошибку.
Декларация класса
Ниже приводится объявление для класса io.reactivex.Completable —
public abstract class Completable extends Object implements CompletableSource
протокол
Ниже приведен последовательный протокол, который работает Completable Observable:
onSubscribe (onError | onComplete)?
Завершаемый пример
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Completable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableCompletableObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Completable.complete() .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onStart() { System.out.println("Started!"); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Started! Done!
RxJava — Использование CompositeDisposable
Класс CompositeDisposable представляет контейнер, который может содержать несколько одноразовых элементов и предлагает O (1) сложность добавления и удаления одноразовых изделий.
Декларация класса
Ниже приводится объявление для класса io.reactivex.disposables.CompositeDisposable —
public final class CompositeDisposable extends Object implements Disposable, io.reactivex.internal.disposables.DisposableContainer
Пример CompositeDisposable
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Maybe; import io.reactivex.Single; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { CompositeDisposable compositeDisposable = new CompositeDisposable(); //Create an Single observer Disposable disposableSingle = Single.just("Hello World") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); //Create an observer Disposable disposableMayBe = Maybe.just("Hi") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); compositeDisposable.add(disposableSingle); compositeDisposable.add(disposableMayBe); //start observing compositeDisposable.dispose(); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Hello Hi
RxJava — Создание операторов
Ниже приведены операторы, которые используются для создания Observable.
Sr.No. | Оператор и описание |
---|---|
1 |
Создайте Создает Observable с нуля и позволяет методу наблюдателя вызываться программно. |
2 |
Перенести Не создавайте Observable, пока наблюдатель не подпишется. Создает свежую наблюдаемую для каждого наблюдателя. |
3 |
Пустой / Никогда / Throw Создает Observable с ограниченным поведением. |
4 |
От Преобразует объект / структуру данных в Observable. |
5 |
интервал Создает наблюдаемые исходящие целые числа в последовательности с промежутком заданного временного интервала. |
6 |
Просто Преобразует структуру объекта / данных в Observable для излучения объектов того же или того же типа. |
7 |
Спектр Создает наблюдаемые исходящие целые числа в последовательности заданного диапазона. |
8 |
Повторение Создает наблюдаемые исходящие целые числа последовательно. |
9 |
Начните Создает Observable для выдачи возвращаемого значения функции. |
10 |
таймер Создает Observable для испускания одного предмета после заданной задержки. |
Создайте
Создает Observable с нуля и позволяет методу наблюдателя вызываться программно.
Перенести
Не создавайте Observable, пока наблюдатель не подпишется. Создает свежую наблюдаемую для каждого наблюдателя.
Пустой / Никогда / Throw
Создает Observable с ограниченным поведением.
От
Преобразует объект / структуру данных в Observable.
интервал
Создает наблюдаемые исходящие целые числа в последовательности с промежутком заданного временного интервала.
Просто
Преобразует структуру объекта / данных в Observable для излучения объектов того же или того же типа.
Спектр
Создает наблюдаемые исходящие целые числа в последовательности заданного диапазона.
Повторение
Создает наблюдаемые исходящие целые числа последовательно.
Начните
Создает Observable для выдачи возвращаемого значения функции.
таймер
Создает Observable для испускания одного предмета после заданной задержки.
Пример создания оператора
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; //Using fromArray operator to create an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
ABCDEFG
RxJava — Операторы преобразования
Ниже приведены операторы, которые используются для преобразования элемента, испускаемого из наблюдаемой.
Sr.No. | Оператор и описание |
---|---|
1 |
буфер Периодически собирает предметы из Observable в связки, а затем испускает связки, а не предметы. |
2 |
FlatMap Используется во вложенных наблюдаемых объектах. Преобразует предметы в Observables. Затем сгладьте предметы в единый Observable. |
3 |
Группа по Разделите Observable на набор Observables, организованных по ключу, чтобы испустить другую группу предметов. |
4 |
карта Примените функцию к каждому излучаемому элементу, чтобы преобразовать его. |
5 |
сканирование Примените функцию к каждому излучаемому элементу последовательно, а затем введите последующее значение. |
6 |
Окно Периодически собирает предметы из Observable в окна Observable, а затем испускает окна, а не предметы. |
буфер
Периодически собирает предметы из Observable в связки, а затем испускает связки, а не предметы.
FlatMap
Используется во вложенных наблюдаемых объектах. Преобразует предметы в Observables. Затем сгладьте предметы в единый Observable.
Группа по
Разделите Observable на набор Observables, организованных по ключу, чтобы испустить другую группу предметов.
карта
Примените функцию к каждому излучаемому элементу, чтобы преобразовать его.
сканирование
Примените функцию к каждому излучаемому элементу последовательно, а затем введите последующее значение.
Окно
Периодически собирает предметы из Observable в окна Observable, а затем испускает окна, а не предметы.
Пример оператора преобразования
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; //Using map operator to transform an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
ABCDEFG
RxJava — операторы фильтрации
Ниже приведены операторы, которые используются для выборочного выброса элемента (ов) из наблюдаемой.
Sr.No. | Оператор и описание |
---|---|
1 |
Debounce Испускает предметы только тогда, когда происходит тайм-аут, не испуская другой предмет. |
2 |
отчетливый Издает только уникальные предметы. |
3 |
ElementAt испускать только элемент с индексом n, испускаемым Observable. |
4 |
Фильтр Издает только те элементы, которые передают данную функцию предиката. |
5 |
Первый Издает первый элемент или первый элемент, который соответствует заданным критериям. |
6 |
IgnoreElements Не испускает никаких предметов из Observable, но отмечает завершение. |
7 |
Прошлой Испускает последний элемент из Observable. |
8 |
Образец Издает самый последний элемент с заданным интервалом времени. |
9 |
Пропускать Пропускает первые n элементов из наблюдаемой. |
10 |
SkipLast Пропускает последние n элементов из наблюдаемой. |
11 |
принимать берет первые n предметов из наблюдаемой. |
12 |
TakeLast берет последние n предметов из наблюдаемой. |
Debounce
Испускает предметы только тогда, когда происходит тайм-аут, не испуская другой предмет.
отчетливый
Издает только уникальные предметы.
ElementAt
испускать только элемент с индексом n, испускаемым Observable.
Фильтр
Издает только те элементы, которые передают данную функцию предиката.
Первый
Издает первый элемент или первый элемент, который соответствует заданным критериям.
IgnoreElements
Не испускает никаких предметов из Observable, но отмечает завершение.
Прошлой
Испускает последний элемент из Observable.
Образец
Издает самый последний элемент с заданным интервалом времени.
Пропускать
Пропускает первые n элементов из наблюдаемой.
SkipLast
Пропускает последние n элементов из наблюдаемой.
принимать
берет первые n предметов из наблюдаемой.
TakeLast
берет последние n предметов из наблюдаемой.
Пример оператора фильтрации
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; //Using take operator to filter an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .take(2) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
ab
RxJava — Операторы объединения
Ниже приведены операторы, которые используются для создания одной наблюдаемой из нескольких наблюдаемых.
Sr.No. | Оператор и описание |
---|---|
1 | И / И / Когда
Объедините наборы предметов, используя посредников Pattern и Plan. |
2 | CombineLatest
Объедините последний элемент, испускаемый каждым Наблюдаемым через указанную функцию, и выпустите полученный элемент. |
3 | Присоединиться
Объедините элементы, испускаемые двумя Наблюдаемыми, если они испускаются в течение периода времени второго испускаемого Наблюдаемого элемента. |
4 | сливаться
Объединяет предметы, испускаемые Observables. |
5 | StartWith
Испускать указанную последовательность предметов перед тем, как начать испускать предметы из источника Observable |
6 | переключатель
Издает самые последние предметы, испускаемые Observables. |
7 | застежка — молния
Объединяет элементы Observables на основе функции и испускает полученные элементы. |
Объедините наборы предметов, используя посредников Pattern и Plan.
Объедините последний элемент, испускаемый каждым Наблюдаемым через указанную функцию, и выпустите полученный элемент.
Объедините элементы, испускаемые двумя Наблюдаемыми, если они испускаются в течение периода времени второго испускаемого Наблюдаемого элемента.
Объединяет предметы, испускаемые Observables.
Испускать указанную последовательность предметов перед тем, как начать испускать предметы из источника Observable
Издает самые последние предметы, испускаемые Observables.
Объединяет элементы Observables на основе функции и испускает полученные элементы.
Пример оператора объединения
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; //Using combineLatest operator to combine Observables public class ObservableTester { public static void main(String[] args) { Integer[] numbers = { 1, 2, 3, 4, 5, 6}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable1 = Observable.fromArray(letters); Observable<Integer> observable2 = Observable.fromArray(numbers); Observable.combineLatest(observable1, observable2, (a,b) -> a + b) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
g1g2g3g4g5g6
RxJava — Утилиты
Ниже приведены операторы, которые часто полезны с Observables.
Sr.No. | Оператор и описание |
---|---|
1 |
задержка Зарегистрируйте действие для обработки наблюдаемых событий жизненного цикла. |
2 |
Материализовать / дематериализоваться Представляет отправленный элемент и отправленное уведомление. |
3 |
ObserveOn Укажите планировщик для наблюдения. |
4 |
Сериализация Force Observable совершать сериализованные вызовы. |
5 |
Подписывайся Работать на выбросы предметов и уведомлений, как полный из наблюдаемой |
6 |
SubscribeOn Укажите планировщик, который будет использоваться Observable, когда он подписан. |
7 |
Интервал времени Преобразование Обсерватории в излучение с указанием количества времени, прошедшего между выбросами. |
8 |
Тайм — аут Выдает уведомление об ошибке, если указанное время происходит без отправки какого-либо элемента. |
9 |
Отметка Прикрепите метку времени к каждому излучаемому элементу. |
9 |
С помощью Создает одноразовый ресурс или тот же срок жизни, что и у Observable. |
задержка
Зарегистрируйте действие для обработки наблюдаемых событий жизненного цикла.
Материализовать / дематериализоваться
Представляет отправленный элемент и отправленное уведомление.
ObserveOn
Укажите планировщик для наблюдения.
Сериализация
Force Observable совершать сериализованные вызовы.
Подписывайся
Работать на выбросы предметов и уведомлений, как полный из наблюдаемой
SubscribeOn
Укажите планировщик, который будет использоваться Observable, когда он подписан.
Интервал времени
Преобразование Обсерватории в излучение с указанием количества времени, прошедшего между выбросами.
Тайм — аут
Выдает уведомление об ошибке, если указанное время происходит без отправки какого-либо элемента.
Отметка
Прикрепите метку времени к каждому излучаемому элементу.
С помощью
Создает одноразовый ресурс или тот же срок жизни, что и у Observable.
Пример утилиты оператора
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; //Using subscribe operator to subscribe to an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable.subscribe( letter -> result.append(letter)); System.out.println(result); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
abcdefg
RxJava — условные операторы
Ниже приводятся операторы, которые оценивают один или несколько наблюдаемых или выбрасываемых элементов.
Sr.No. | Оператор и описание |
---|---|
1 |
Все Оценивает все предметы, выбрасываемые в соответствии с заданными критериями. |
2 |
Amb Испускает все предметы из первой наблюдаемой только с учетом нескольких наблюдаемых. |
3 |
Содержит Проверяет, издает ли Observable конкретный предмет или нет. |
4 |
DefaultIfEmpty Издает элемент по умолчанию, если Observable ничего не излучает. |
5 |
SequenceEqual Проверяет, испускают ли две наблюдаемые одну и ту же последовательность предметов. |
6 |
SkipUntil Сбрасывает предметы, испускаемые первой Наблюдаемой, пока вторая Наблюдаемая не испустит предмет. |
7 |
SkipWhile Откажитесь от предметов, испускаемых Observable, пока данное условие не станет ложным. |
8 |
TakeUntil Сбрасывает предметы, испускаемые Наблюдаемой после того, как вторая Наблюдаемая испускает предмет или завершает свою работу. |
9 |
TakeWhile Сбросить предметы, испускаемые Наблюдаемым после того, как указанное условие становится ложным. |
Все
Оценивает все предметы, выбрасываемые в соответствии с заданными критериями.
Amb
Испускает все предметы из первой наблюдаемой только с учетом нескольких наблюдаемых.
Содержит
Проверяет, издает ли Observable конкретный предмет или нет.
DefaultIfEmpty
Издает элемент по умолчанию, если Observable ничего не излучает.
SequenceEqual
Проверяет, испускают ли две наблюдаемые одну и ту же последовательность предметов.
SkipUntil
Сбрасывает предметы, испускаемые первой Наблюдаемой, пока вторая Наблюдаемая не испустит предмет.
SkipWhile
Откажитесь от предметов, испускаемых Observable, пока данное условие не станет ложным.
TakeUntil
Сбрасывает предметы, испускаемые Наблюдаемой после того, как вторая Наблюдаемая испускает предмет или завершает свою работу.
TakeWhile
Сбросить предметы, испускаемые Наблюдаемым после того, как указанное условие становится ложным.
Пример условного оператора
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; //Using defaultIfEmpty operator to operate on an Observable public class ObservableTester { public static void main(String[] args) { final StringBuilder result = new StringBuilder(); Observable.empty() .defaultIfEmpty("No Data") .subscribe(s -> result.append(s)); System.out.println(result); String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result1 = new StringBuilder(); Observable.fromArray(letters) .firstElement() .defaultIfEmpty("No data") .subscribe(s -> result1.append(s)); System.out.println(result1); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
No Data a
RxJava — математические операторы
Ниже приводятся операторы, которые оперируют целыми объектами, испускаемыми Observable.
Sr.No. | Оператор и описание |
---|---|
1 |
Средний Оценивает средние значения всех элементов и выдает результат. |
2 |
Concat Выдает все предметы из нескольких наблюдаемых без чередования. |
3 |
подсчитывать Подсчитывает все предметы и испускает результат. |
4 |
Максимум Оценивает максимально ценный предмет из всех предметов и выдает результат. |
5 |
Min Оценивает минимальный предмет из всех предметов и выдает результат. |
6 |
уменьшить Примените функцию к каждому элементу и верните результат. |
7 |
сумма Оценивает сумму всех предметов и выдает результат. |
Средний
Оценивает средние значения всех элементов и выдает результат.
Concat
Выдает все предметы из нескольких наблюдаемых без чередования.
подсчитывать
Подсчитывает все предметы и испускает результат.
Максимум
Оценивает максимально ценный предмет из всех предметов и выдает результат.
Min
Оценивает минимальный предмет из всех предметов и выдает результат.
уменьшить
Примените функцию к каждому элементу и верните результат.
сумма
Оценивает сумму всех предметов и выдает результат.
Пример математического оператора
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; //Using concat operator to operate on multiple Observables public class ObservableTester { public static void main(String[] args) throws InterruptedException { Integer[] numbers = { 1, 2, 3, 4, 5, 6}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable1 = Observable.fromArray(letters); Observable<Integer> observable2 = Observable.fromArray(numbers); Observable.concat(observable1, observable2) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
abcdefg123456
RxJava — Операторы подключения
Ниже приведены операторы, которые более точно контролируют подписку.
Sr.No. | Оператор и описание |
---|---|
1 |
соединять Поручить подключаемому Observable для передачи предметов своим подписчикам. |
2 |
Публиковать Преобразует наблюдаемое в подключаемое наблюдаемое. |
3 |
RefCount Преобразует подключаемый объект Observable в обычный объект Observable. |
4 |
переигровка Убедитесь, что каждый подписчик просматривает одинаковую последовательность отправленных элементов, даже после того, как Observable начал излучать элементы и подписчики подписываются позже. |
соединять
Поручить подключаемому Observable для передачи предметов своим подписчикам.
Публиковать
Преобразует наблюдаемое в подключаемое наблюдаемое.
RefCount
Преобразует подключаемый объект Observable в обычный объект Observable.
переигровка
Убедитесь, что каждый подписчик просматривает одинаковую последовательность отправленных элементов, даже после того, как Observable начал излучать элементы и подписчики подписываются позже.
Пример подключения оператора
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; import io.reactivex.observables.ConnectableObservable; //Using connect operator on a ConnectableObservable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); ConnectableObservable<String> connectable = Observable.fromArray(letters).publish(); connectable.subscribe(letter -> result.append(letter)); System.out.println(result.length()); connectable.connect(); System.out.println(result.length()); System.out.println(result); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
0 7 abcdefg
RxJava — Предметы
Согласно Реактиву , Субъект может действовать как Наблюдаемый, так и Наблюдатель.
Субъект — это своего рода мост или прокси-сервер, доступный в некоторых реализациях ReactiveX, который действует как наблюдатель и как наблюдаемый. Поскольку он является наблюдателем, он может подписаться на одну или несколько наблюдаемых, а поскольку он является наблюдаемым, он может проходить через элементы, которые он наблюдает, переиздавая их, а также может испускать новые элементы.
Есть четыре типа предметов —
Sr.No. | Тема и описание |
---|---|
1 |
Опубликовать тему Издает только те элементы, которые выбрасываются после подписки. |
2 | Повтор темы
Издает все элементы, испускаемые источником Observable, независимо от того, когда он подписался на Observable. |
3 |
Поведение Тема При подписке испускает самый последний элемент, а затем продолжает испускать элемент, испускаемый источником Observable. |
4 |
Асинхронная тема Издает последний элемент, испускаемый источником Observable после его завершения. |
Опубликовать тему
Издает только те элементы, которые выбрасываются после подписки.
Издает все элементы, испускаемые источником Observable, независимо от того, когда он подписался на Observable.
Поведение Тема
При подписке испускает самый последний элемент, а затем продолжает испускать элемент, испускаемый источником Observable.
Асинхронная тема
Издает последний элемент, испускаемый источником Observable после его завершения.
RxJava — PublishSubject
PublishSubject испускает элементы для подписчиков, которые в настоящее время подписаны, и события терминала для текущих или поздних наблюдателей.
Декларация класса
Ниже приводится объявление для класса io.reactivex.subjects.PublishSubject <T> —
public final class PublishSubject<T> extends Subject<T>
Пример PublishSubject
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.PublishSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); PublishSubject<String> subject = PublishSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be d only //as subscribed after c item emitted. System.out.println(result2); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
abcd d
RxJava — BehaviorSubject
BehaviorSubject испускает самый последний наблюдаемый элемент, а затем все последующие наблюдаемые элементы каждому подписчику Observer.
Декларация класса
Ниже приводится объявление для класса io.reactivex.subjects.BehaviorSubject <T> —
public final class BehaviorSubject<T> extends Subject<T>
Пример BehaviorSubject
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.BehaviorSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); BehaviorSubject<String> subject = BehaviorSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be cd being BehaviorSubject //(c is last item emitted before subscribe) System.out.println(result2); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
abcd cd
RxJava — ReplaySubject
ReplaySubject воспроизводит события / элементы для текущих и поздних наблюдателей.
Декларация класса
Ниже приводится объявление для класса io.reactivex.subjects.ReplaySubject <T> —
public final class ReplaySubject<T> extends Subject<T>
Пример ReplaySubject
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.ReplaySubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); ReplaySubject<String> subject = ReplaySubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be abcd being ReplaySubject //as ReplaySubject emits all the items System.out.println(result2); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
abcd abcd
RxJava — AsyncSubject
AsyncSubject выдает единственное последнее значение, за которым следует событие завершения или полученная ошибка для наблюдателей.
Декларация класса
Ниже приводится объявление для класса io.reactivex.subjects.AsyncSubject <T> —
public final class AsyncSubject<T> extends Subject<T>
Пример AsyncSubject
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects. AsyncSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be d being the last item emitted System.out.println(result1); //Output will be d being the last item emitted System.out.println(result2); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
d d
RxJava — Планировщики
Планировщики используются в многопоточной среде для работы с наблюдаемыми операторами.
Согласно Реактиву , Планировщик используется, чтобы запланировать, как цепочка операторов будет применяться к различным потокам.
По умолчанию Observable и цепочка операторов, к которой вы обращаетесь, будут выполнять свою работу и уведомлять своих наблюдателей в том же потоке, для которого вызывается метод Subscribe. Оператор SubscribeOn изменяет это поведение, указывая другой планировщик, с которым должен работать Observable. Оператор ObserveOn указывает другой планировщик, который Observable будет использовать для отправки уведомлений своим наблюдателям.
В RxJava доступны следующие типы планировщиков:
Sr.No. | Планировщик и описание |
---|---|
1 |
Schedulers.computation () Создает и возвращает планировщик, предназначенный для вычислительной работы. Количество потоков, которые будут запланированы, зависит от процессоров, присутствующих в системе. На каждый процессор допускается один поток. Лучше всего подходит для циклов обработки событий или операций обратного вызова. |
2 |
Schedulers.io () Создает и возвращает планировщик, предназначенный для работы с IO. Пул потоков может расширяться по мере необходимости. |
3 |
Schedulers.newThread () Создает и возвращает планировщик, который создает новый поток для каждой единицы работы. |
4 |
Schedulers.trampoline () Создает и возвращает планировщик, который ставит в очередь работу в текущем потоке, который будет выполнен после завершения текущей работы. |
4 |
Schedulers.from (java.util.concurrent.Executor executor) Преобразует исполнителя в новый экземпляр планировщика. |
Schedulers.computation ()
Создает и возвращает планировщик, предназначенный для вычислительной работы. Количество потоков, которые будут запланированы, зависит от процессоров, присутствующих в системе. На каждый процессор допускается один поток. Лучше всего подходит для циклов обработки событий или операций обратного вызова.
Schedulers.io ()
Создает и возвращает планировщик, предназначенный для работы с IO. Пул потоков может расширяться по мере необходимости.
Schedulers.newThread ()
Создает и возвращает планировщик, который создает новый поток для каждой единицы работы.
Schedulers.trampoline ()
Создает и возвращает планировщик, который ставит в очередь работу в текущем потоке, который будет выполнен после завершения текущей работы.
Schedulers.from (java.util.concurrent.Executor executor)
Преобразует исполнителя в новый экземпляр планировщика.
RxJava — Батутный планировщик
Метод Schedulers.trampoline () создает и возвращает планировщик, который ставит в очередь работу в текущем потоке, который будет выполнен после завершения текущей работы.
Пример Schedulers.trampoline ()
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.trampoline())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Processing Thread main Receiver Thread main, Item length 1 Processing Thread main Receiver Thread main, Item length 2 Processing Thread main Receiver Thread main, Item length 3
RxJava — Планировщик NewThread
Метод Schedulers.newThread () создает и возвращает планировщик, который создает новый поток для каждой единицы работы.
Пример Schedulers.newThread ()
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.newThread())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Processing Thread RxNewThreadScheduler-1 Receiver Thread RxNewThreadScheduler-1, Item length 1 Processing Thread RxNewThreadScheduler-2 Receiver Thread RxNewThreadScheduler-2, Item length 2 Processing Thread RxNewThreadScheduler-3 Receiver Thread RxNewThreadScheduler-3, Item length 3
RxJava — Планировщик вычислений
Метод Schedulers.computation () создает и возвращает планировщик, предназначенный для вычислительной работы. Количество потоков, которые будут запланированы, зависит от процессоров, присутствующих в системе. На каждый процессор допускается один поток. Лучше всего подходит для циклов обработки событий или операций обратного вызова.
Пример Schedulers.computation ()
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.computation())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Processing Thread RxComputationThreadPool-1 Receiver Thread RxComputationThreadPool-1, Item length 1 Processing Thread RxComputationThreadPool-2 Receiver Thread RxComputationThreadPool-2, Item length 2 Processing Thread RxComputationThreadPool-3 Receiver Thread RxComputationThreadPool-3, Item length 3
RxJava — IO Scheduler
Метод Schedulers.io () создает и возвращает планировщик, предназначенный для работы, связанной с вводом-выводом. Пул потоков может расширяться по мере необходимости. Подходит для интенсивных операций ввода-вывода.
Пример Schedulers.io ()
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.io())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 1 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 2 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 3
RxJava — от планировщика
Метод Schedulers.from (Executor) преобразует Executor в новый экземпляр Scheduler.
Пример Schedulers.from (Исполнитель)
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import java.util.Random; import java.util.concurrent.Executors; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3)))) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Processing Thread pool-1-thread-1 Processing Thread pool-3-thread-1 Receiver Thread pool-1-thread-1, Item length 1 Processing Thread pool-4-thread-1 Receiver Thread pool-4-thread-1, Item length 3 Receiver Thread pool-3-thread-1, Item length 2
RxJava — Буферизация
Оператор буферизации позволяет собирать элементы, испускаемые Observable, в список или пакеты и генерировать эти пакеты вместо элементов. В приведенном ниже примере мы создали Observable для испускания 9 элементов, и используя буферизацию, 3 элемента будут отправлены вместе.
Пример буферизации
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import java.util.List; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9); observable.subscribeOn(Schedulers.io()) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .buffer(3) .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { System.out.println("Subscribed"); } @Override public void onNext(List<Integer> integers) { System.out.println("onNext: "); for (Integer value : integers) { System.out.println(value); } } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onComplete() { System.out.println("Done! "); } }); Thread.sleep(3000); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —
Subscribed onNext: 1 2 3 onNext: 4 5 6 onNext: 7 8 9 Done!
RxJava — Windowing
Оператор управления окнами работает аналогично оператору буфера, но он позволяет собирать элементы, испускаемые Наблюдаемой, в другую наблюдаемую, а не в коллекцию, и излучать эти Наблюдаемые вместо коллекций. В приведенном ниже примере мы создали Observable для испускания 9 элементов, и с помощью оконного оператора 3 Observable будут испускаться вместе.
Пример работы с окнами
Создайте следующую Java-программу, используя любой редактор по вашему выбору, например, в C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9); observable.subscribeOn(Schedulers.io()) .delay(2, TimeUnit.SECONDS, Schedulers.io()) .window(3) .subscribe(new Observer<Observable<Integer>>() { @Override public void onSubscribe(Disposable d) { System.out.println("Subscribed"); } @Override public void onNext(Observable<Integer> integers) { System.out.println("onNext: "); integers.subscribe(value -> System.out.println(value)); } @Override public void onError(Throwable e) { System.out.println("Error"); } @Override public void onComplete() { System.out.println("Done! "); } }); Thread.sleep(3000); } }
Проверьте результат
Скомпилируйте класс с помощью компилятора javac следующим образом:
C:\RxJava>javac ObservableTester.java
Теперь запустите ObservableTester следующим образом:
C:\RxJava>java ObservableTester
Он должен произвести следующий вывод —