Статьи

Оперативное программирование операторов в RxJava 2

Если ваше приложение для Android собирается собрать эти пятизвездочные обзоры в Google Play, то оно должно быть в состоянии выполнять несколько задач.

Как минимум, современные пользователи мобильных устройств ожидают, что смогут по-прежнему взаимодействовать с вашим приложением, пока оно работает в фоновом режиме. Это может показаться простым, но по умолчанию Android является однопоточным, поэтому, если вы собираетесь соответствовать ожиданиям своей аудитории, рано или поздно вам придется создавать дополнительные потоки.

В предыдущей статье этой серии мы познакомились с RxJava, реактивной библиотекой для JVM, которая может помочь вам создавать приложения для Android, которые реагируют на данные и события по мере их возникновения. Но вы также можете использовать эту библиотеку для одновременного реагирования на данные и события.

В этом посте я собираюсь показать вам, как вы можете использовать операторы RxJava, чтобы наконец сделать параллелизм на Android безболезненным. К концу этой статьи вы узнаете, как использовать операторы RxJava для создания дополнительных потоков, указать работу, которая должна выполняться в этих потоках, а затем опубликовать результаты обратно в важнейший основной поток пользовательского интерфейса Android — и все это с помощью всего лишь несколько строк кода.

И, поскольку ни одна технология не идеальна, я также расскажу вам о большой потенциальной опасности добавления библиотеки RxJava в ваши проекты — прежде чем показать, как использовать операторы, чтобы эта проблема никогда не возникала в ваших собственных проектах Android.

RxJava обладает огромной коллекцией операторов, которые в основном предназначены для того, чтобы помочь вам изменять, фильтровать, объединять и преобразовывать данные, передаваемые вашими Observable . Вы найдете полный список операторов RxJava в официальных документах, и хотя никто не ожидает, что вы запомните каждый отдельный оператор , стоит потратить некоторое время на чтение этого списка, просто чтобы вы имели приблизительное представление о различных данных. преобразования, которые вы можете выполнить.

Список операторов RxJava уже довольно исчерпывающий, но если вы не можете найти идеальный оператор для преобразования данных, которое вы имели в виду, то вы всегда можете объединить несколько операторов в цепочку. Применение оператора к Observable обычно возвращает другой Observable , так что вы можете просто продолжать применять операторы, пока не получите желаемые результаты.

Слишком много операторов RxJava, чтобы охватить их в одной статье, и официальные документы RxJava уже проделали хорошую работу по представлению всех операторов, которые вы можете использовать для преобразования данных, поэтому я сосредоточусь на двух операторах, которые имеют Наибольший потенциал для облегчения вашей жизни разработчика Android: subscribeOn() и observeOn() .

Если ваше приложение обеспечивает максимально возможное взаимодействие с пользователем, оно должно иметь возможность выполнять интенсивные или длительные задачи и выполнять несколько задач одновременно, не блокируя важнейший основной поток пользовательского интерфейса Android.

Например, представьте, что вашему приложению нужно получить некоторую информацию из двух разных баз данных. Если вы выполняете обе эти задачи одна за другой в главном потоке Android, то это не только займет значительное количество времени, но и пользовательский интерфейс не будет отвечать до тех пор, пока ваше приложение не завершит извлечение каждого отдельного фрагмента информации из обеих баз данных. , Не совсем хороший пользовательский опыт!

Гораздо лучшим решением является создание двух дополнительных потоков, в которых вы можете выполнять обе эти задачи одновременно, не блокируя основной поток пользовательского интерфейса. Такой подход означает, что работа будет выполнена в два раза быстрее, и пользователь сможет продолжать взаимодействовать с пользовательским интерфейсом вашего приложения на протяжении всего. Потенциально ваши пользователи могут даже не знать, что ваше приложение выполняет некоторую интенсивную и длительную работу в фоновом режиме — вся информация о базе данных просто появится в пользовательском интерфейсе вашего приложения, как по волшебству!

Из коробки Android предоставляет несколько инструментов, которые можно использовать для создания дополнительных потоков, в том числе Service и IntentService , но эти решения сложны в реализации и могут быстро привести к сложному, подробному коду. Кроме того, если вы не реализуете многопоточность правильно, вы можете оказаться в приложении, которое теряет память и выдает всевозможные ошибки.

Чтобы сделать многопоточность на Android еще более вызывающей головную боль, основной поток пользовательского интерфейса Android — это единственный поток, который может обновлять пользовательский интерфейс вашего приложения. Если вы хотите обновить пользовательский интерфейс своего приложения с результатом работы, выполненной в любом другом потоке , то обычно вам нужно создать Handler в основном потоке пользовательского интерфейса, а затем использовать этот Handler для передачи данных из фонового потока в Основная тема. Это означает больше кода, больше сложности и больше возможностей для ошибок проникать в ваш проект.

Но в RxJava есть два оператора, которые могут помочь вам избежать этой сложности и ошибок.

Обратите внимание, что вы используете эти операторы в сочетании с Schedulers , которые по сути являются компонентами, которые позволяют указывать потоки . Пока просто думайте о планировщике как о синониме слова thread .

  • subscribeOn(Scheduler) : по умолчанию Observable отправляет свои данные в поток, в котором была объявлена ​​подписка, т. е. где вы .subscribe метод .subscribe . В Android это, как правило, основной поток пользовательского интерфейса. Вы можете использовать оператор subscribeOn() чтобы определить другой Scheduler котором должен выполняться Observable и отправлять свои данные.
  • observeOn(Scheduler) : вы можете использовать этот оператор для перенаправления излучений вашего Observable в другой Scheduler , эффективно изменяя поток, в который отправляются уведомления Observable , и, соответственно, поток, в котором используются его данные.

RxJava поставляется с несколькими планировщиками, которые вы можете использовать для создания различных потоков, в том числе:

  • Schedulers.io() : предназначен для использования в задачах, связанных с вводом-выводом.
  • Schedulers.computation() : предназначен для использования в вычислительных задачах. По умолчанию количество потоков в планировщике вычислений ограничено количеством процессоров, доступных на вашем устройстве.
  • Schedulers.newThread() : создает новый поток.

Теперь у вас есть обзор всех движущихся частей, давайте посмотрим на некоторые примеры использования subscribeOn() и observeOn() и увидим некоторые планировщики в действии.

В Android вы, как правило, будете использовать subscribeOn() и сопровождающий Scheduler чтобы изменить поток, в котором выполняется некоторая длительная или интенсивная работа, поэтому нет риска заблокировать основной поток пользовательского интерфейса. Например, вы можете решить импортировать большой объем данных в планировщик io() или выполнить некоторые вычисления в планировщике computation() .

В следующем коде мы создаем новый поток, в котором Observable выполнит свои операции и выдаст значения 1 , 2 и 3 .

1
2
3
Observable.just(1, 2, 3)
         .subscribeOn(Schedulers.newThread())
         .subscribe(Observer);

Хотя это все, что вам нужно для создания потока и начала генерирования данных в этом потоке, вам может потребоваться подтверждение того, что эта наблюдаемая действительно работает в новом потоке. Один из способов — напечатать название потока, который ваше приложение использует в Android Studio. Logcat Monitor.

Для удобства в предыдущем посте « Начало работы с RxJava» мы создали приложение, которое отправляет сообщения в Logcat Monitor на разных этапах в течение жизненного цикла Observable, поэтому мы можем повторно использовать большую часть этого кода.

Откройте проект, который вы создали в этом посте, и настройте свой код так, чтобы он использовал вышеприведенную Observable качестве исходного Observable . Затем добавьте оператор subscribeOn() и укажите, что сообщения, отправляемые в Logcat, должны содержать имя текущего потока.

Ваш готовый проект должен выглядеть примерно так:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
 
public class MainActivity extends AppCompatActivity {
  public static final String TAG = «MainActivity»;
 
  @Override
  protected void onCreate(Bundle savedInstanceState) {
      super.onCreate(savedInstanceState);
      setContentView(R.layout.activity_main);
       
      Observable.just(1, 2, 3)
              .subscribeOn(Schedulers.newThread())
              .subscribe(Observer);
  }
 
  Observer<Integer> Observer = new Observer<Integer>() {
 
      @Override
      public void onSubscribe(Disposable d) {
          Log.e(TAG, «onSubscribe» + Thread.currentThread().getName());
      }
 
      @Override
      public void onNext(Integer value) {
          Log.e(TAG, «onNext: » + value + Thread.currentThread().getName());
      }
 
 
      @Override
      public void onError(Throwable e) {
          Log.e(TAG, «onError: «);
      }
       
      @Override
      public void onComplete() {
          Log.e(TAG, «onComplete: All Done!» + Thread.currentThread().getName());
      }
 
  };
 
}

Убедитесь, что Logcat Monitor в Android Studio открыт (выбрав вкладку Android Monitor , а затем Logcat ), а затем запустите свой проект на физическом устройстве Android или AVD. Вы должны увидеть следующий вывод в Logcat Monitor:

Проверьте поток, где ваше приложение в настоящее время работает в Android Studios Logcat Monitor

Здесь вы можете видеть, что .subscribe вызывается в основном потоке пользовательского интерфейса, но наблюдаемый работает в совершенно другом потоке.

Оператор subscribeOn() будет иметь одинаковый эффект независимо от того, где вы помещаете его в наблюдаемую цепочку; однако вы не можете использовать несколько операторов subscribeOn() в одной цепочке. Если вы включаете более одного subscribeOn() , ваша цепочка будет использовать только метод subscribeOn() , ближайший к наблюдаемому источнику.

В отличие от subscribeOn() , где вы observeOn() в вашу цепочку observeOn() , как следует, так как этот оператор изменяет только поток, который используется наблюдаемыми, которые появляются вниз по течению .

Например, если вы вставили следующее в свою цепочку, то все наблюдаемые, которые появляются в цепочке с этого момента, будут использовать новую нить.

1
.observeOn(Schedulers.newThread())

Эта цепочка будет продолжать работать в новом потоке до тех пор, пока не встретит другой оператор observeOn() , после чего она переключится на поток, указанный этим оператором. Вы можете контролировать поток, в который конкретные наблюдаемые отправляют свои уведомления, вставляя в вашу цепочку несколько операторов observeOn() .

При разработке приложений для Android вы, как правило, будете использовать observeOn() для отправки результатов работы, выполненной в фоновых потоках, в основной поток пользовательского интерфейса Android. Самый простой способ перенаправить выбросы в основной поток пользовательского интерфейса Android — это использовать AndroidSchedulers.mainThread Scheduler , который входит в состав библиотеки RxAndroid, а не библиотеки RxJava.

Библиотека RxAndroid включает в себя специфичные для Android привязки для RxJava 2, что делает ее ценным дополнительным ресурсом для разработчиков Android (и кое-что мы рассмотрим более подробно в следующем посте этой серии).

Чтобы добавить RxAndroid в свой проект, откройте файл build.gradle уровня модуля и добавьте последнюю версию библиотеки в раздел зависимостей. На момент написания статьи последняя версия RxAndroid была 2.0.1, поэтому я добавляю следующее:

1
2
3
4
5
6
dependencies {
  …
  …
  …
  compile ‘io.reactivex.rxjava2:rxandroid:2.0.1’
}

После добавления этой библиотеки в ваш проект вы можете указать, что результаты наблюдаемого должны отправляться в основной поток пользовательского интерфейса вашего приложения, используя одну строку кода:

1
.observeOn(AndroidSchedulers.mainThread())

Учитывая, что взаимодействие с основным потоком пользовательского интерфейса вашего приложения занимает полную страницу официальных документов по Android, это огромное улучшение, которое потенциально может сэкономить вам много времени при создании многопоточных приложений Android.

Хотя у RxJava есть что предложить разработчикам Android, ни одна технология не идеальна, и у RxJava есть одна серьезная ошибка, которая может привести к сбою вашего приложения.

По умолчанию RxJava использует рабочий процесс на основе push: данные создаются восходящим потоком с помощью Observable , а затем передаются вниз по потоку назначенному Observer . Основная проблема, связанная с рабочим процессом на основе push, заключается в том, насколько просто для производителя (в данном случае, Observable ) слишком быстро испускать элементы для обработки потребителем ( Observer ).

Болтливый Observable и медленный Observer могут быстро привести к отставанию неиспользованных элементов, что может поглотить системные ресурсы и даже может привести к OutOfMemoryException . Эта проблема известна как противодавление .

Если вы подозреваете, что в вашем приложении возникает противодавление, есть несколько возможных решений, в том числе использование оператора для уменьшения количества производимых элементов.

Если Observable объект испускает большое количество элементов, то для назначенного Observer может быть необязательно получать каждый из этих элементов.

Если вы можете безопасно игнорировать некоторые выбросы в Observable , то есть несколько операторов, которые вы можете использовать для создания периодов выборки, а затем конкретные значения, выбранные в эти периоды:

  • Оператор sample() проверяет выходные данные Observable с указанными вами интервалами, а затем принимает самый последний элемент, который был передан в течение этого периода выборки. Например, если вы .sample(5, SECONDS) в свой проект, то наблюдатель получит последнее значение, которое было отправлено в течение каждого .sample(5, SECONDS) интервала.
  • Оператор throttleFirst() принимает первое значение, которое было передано в течение периода выборки. Например, если вы .throttlefirst(5, SECONDS) то наблюдатель получит первое значение, которое испускается в течение каждого .throttlefirst(5, SECONDS) интервала.
Образец оператора

Если вы не можете безопасно пропустить какие-либо выбросы, тогда вы все равно сможете снять давление с борющегося Observer , сгруппировав выбросы по партиям, а затем отправив их в массовом порядке . Обработка пакетных выбросов, как правило, более эффективна, чем обработка нескольких выбросов по отдельности, поэтому такой подход должен повысить скорость потребления.

Вы можете создавать пакетные выбросы с помощью оператора buffer() . Здесь мы используем buffer() для пакетирования всех элементов, выпущенных за три секунды:

1
2
3
Observable.range(0, 10)
.buffer(3, SECONDS)
   .subscribe(System.out::println);
Оператор буфера

Кроме того, вы можете использовать buffer() для создания партии, состоящей из определенного количества выбросов. Например, здесь мы говорим, что buffer() объединяет выбросы в группы по четыре:

1
2
3
Observable.range(0, 10)
  .buffer(4)
  .subscribe(System.out::println);

Альтернативный метод уменьшения количества выбросов заключается в замене Observable , вызывающего проблемы с Flowable .

В RxJava 2 команда RxJava решила разделить стандарт Observable на два типа: обычный вид, который мы рассматривали на протяжении всей этой серии, и Flowable s.

Flowable s функционирует почти так же, как Observable s, но с одним существенным отличием: Flowable s отправляет столько элементов, сколько запрашивает наблюдатель. Если у вас есть Observable который испускает больше элементов, чем может назначить назначенный наблюдатель, вы можете вместо этого рассмотреть возможность перехода на Flowable .

Прежде чем вы сможете начать использовать Flowable в своих проектах, вам нужно добавить следующую инструкцию импорта:

1
import io.reactivex.Flowable;

Затем вы можете создавать Flowable используя точно такие же методы, которые использовались для создания Observable . Например, каждый из следующих фрагментов кода создаст Flowable , способный выдавать данные:

1
2
3
Flowable<String> flowable = Flowable.fromArray(new String[] {«south», «north», «west», “east”});
flowable.subscribe()
1
2
3
Flowable<Integer> flowable = Flowable.range(0, 20);
flowable.subscribe()

В этот момент у вас может возникнуть вопрос: зачем мне использовать Observable s, когда я могу просто использовать Flowable s и не беспокоиться о противодавлении? Ответ заключается в том, что Flowable влечет за собой больше накладных расходов, чем обычный Observable , поэтому в интересах создания высокопроизводительного приложения вы должны придерживаться Observable s, если не подозреваете, что ваше приложение борется с противодавлением.

Flowable — не единственный вариант Observable который вы найдете в RxJava, поскольку библиотека также включает в себя класс Single .

Singles полезны, когда вам просто нужно указать одно значение. В этих сценариях создание Observable может показаться излишним, но Single разработан так, чтобы просто выдавать одно значение и затем завершаться, либо вызывая:

  • onSuccess() : Single генерирует свое единственное значение.
  • onError() : если Single не может выдать свой элемент, он передаст этот метод в результате Throwable .

Single вызовет только один из этих методов, а затем немедленно прекратит работу.

Давайте рассмотрим пример Single в действии — опять же, чтобы сэкономить время, мы повторно используем код:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
 
public class MainActivity extends AppCompatActivity {
  public static final String TAG = «MainActivity»;
 
  @Override
  protected void onCreate(Bundle savedInstanceState) {
      super.onCreate(savedInstanceState);
      setContentView(R.layout.activity_main);
 
      Single.just(«Hello World»)
              .subscribe(getSingleObserver());
  }
 
  private SingleObserver<String> getSingleObserver() {
      return new SingleObserver<String>() {
          @Override
          public void onSubscribe(Disposable d) {
              Log.e(TAG, «onSubscribe»);
          }
 
          @Override
          public void onSuccess(String value) {
              Log.e(TAG, » onSuccess : » + value);
          }
 
          @Override
          public void onError(Throwable e) {
              Log.e(TAG, «onError: «);
          }
 
      };
 
  }
 
}

Запустите свой проект на AVD или физическом устройстве Android, и вы увидите следующий вывод в Logcat Monitor Android Studio:

Проверьте вывод синглов в Android Studios Logcat Monitor

Если вы передумаете и захотите конвертировать Single в Observable в любой момент, тогда снова RxJava имеет все необходимые вам операторы, включая:

  • mergeWith() : объединяет несколько Singles в одну Observable .
  • concatWith() : concatWith() элементы, испускаемые несколькими concatWith() , для формирования Observable излучения.
  • toObservable() : преобразует Single в Observable который испускает элемент, который был первоначально сгенерирован синглом, и затем завершается.

В этой статье мы рассмотрели некоторые операторы RxJava, которые можно использовать для создания и управления несколькими потоками, без сложности и вероятности ошибок, которые традиционно сопровождают многопоточность в Android. Мы также увидели, как вы можете использовать библиотеку RxAndroid для связи с важнейшим основным потоком пользовательского интерфейса Android, используя одну строку кода, и как гарантировать, что обратное давление не станет проблемой в вашем приложении.

В этой серии мы несколько раз касались библиотеки RxAndroid, но эта библиотека содержит специфичные для Android привязки RxJava, которые могут быть неоценимы при работе с RxJava на платформе Android, поэтому в последнем посте этой серии мы будем смотреть на библиотеку RxAndroid более подробно.

А пока ознакомьтесь с некоторыми другими нашими статьями о кодировании для Android!

  • Android SDK
    Что нового в Firebase? Обновления с саммита разработчиков Firebase
    Чике Мгбемена
  • Android SDK
    Как звонить и использовать SMS в приложениях для Android
    Чике Мгбемена
  • Android SDK
    6 Что нужно и чего не нужно для отличного пользовательского опыта на Android
  • Android
    Введение в Android вещи
    Пол Требилкокс-Руис