Если ваше приложение для Android собирается собрать эти пятизвездочные обзоры в Google Play, то оно должно быть в состоянии выполнять несколько задач.
Как минимум, современные пользователи мобильных устройств ожидают, что смогут по-прежнему взаимодействовать с вашим приложением, пока оно работает в фоновом режиме. Это может показаться простым, но по умолчанию Android является однопоточным, поэтому, если вы собираетесь соответствовать ожиданиям своей аудитории, рано или поздно вам придется создавать дополнительные потоки.
В предыдущей статье этой серии мы познакомились с RxJava, реактивной библиотекой для JVM, которая может помочь вам создавать приложения для Android, которые реагируют на данные и события по мере их возникновения. Но вы также можете использовать эту библиотеку для одновременного реагирования на данные и события.
В этом посте я собираюсь показать вам, как вы можете использовать операторы RxJava, чтобы наконец сделать параллелизм на Android безболезненным. К концу этой статьи вы узнаете, как использовать операторы RxJava для создания дополнительных потоков, указать работу, которая должна выполняться в этих потоках, а затем опубликовать результаты обратно в важнейший основной поток пользовательского интерфейса Android — и все это с помощью всего лишь несколько строк кода.
И, поскольку ни одна технология не идеальна, я также расскажу вам о большой потенциальной опасности добавления библиотеки RxJava в ваши проекты — прежде чем показать, как использовать операторы, чтобы эта проблема никогда не возникала в ваших собственных проектах Android.
Представляющие операторы
RxJava обладает огромной коллекцией операторов, которые в основном предназначены для того, чтобы помочь вам изменять, фильтровать, объединять и преобразовывать данные, передаваемые вашими Observable
. Вы найдете полный список операторов RxJava в официальных документах, и хотя никто не ожидает, что вы запомните каждый отдельный оператор , стоит потратить некоторое время на чтение этого списка, просто чтобы вы имели приблизительное представление о различных данных. преобразования, которые вы можете выполнить.
Список операторов RxJava уже довольно исчерпывающий, но если вы не можете найти идеальный оператор для преобразования данных, которое вы имели в виду, то вы всегда можете объединить несколько операторов в цепочку. Применение оператора к Observable
обычно возвращает другой Observable
, так что вы можете просто продолжать применять операторы, пока не получите желаемые результаты.
Слишком много операторов RxJava, чтобы охватить их в одной статье, и официальные документы RxJava уже проделали хорошую работу по представлению всех операторов, которые вы можете использовать для преобразования данных, поэтому я сосредоточусь на двух операторах, которые имеют Наибольший потенциал для облегчения вашей жизни разработчика Android: subscribeOn()
и observeOn()
.
Многопоточность с операторами RxJava
Если ваше приложение обеспечивает максимально возможное взаимодействие с пользователем, оно должно иметь возможность выполнять интенсивные или длительные задачи и выполнять несколько задач одновременно, не блокируя важнейший основной поток пользовательского интерфейса Android.
Например, представьте, что вашему приложению нужно получить некоторую информацию из двух разных баз данных. Если вы выполняете обе эти задачи одна за другой в главном потоке Android, то это не только займет значительное количество времени, но и пользовательский интерфейс не будет отвечать до тех пор, пока ваше приложение не завершит извлечение каждого отдельного фрагмента информации из обеих баз данных. , Не совсем хороший пользовательский опыт!
Гораздо лучшим решением является создание двух дополнительных потоков, в которых вы можете выполнять обе эти задачи одновременно, не блокируя основной поток пользовательского интерфейса. Такой подход означает, что работа будет выполнена в два раза быстрее, и пользователь сможет продолжать взаимодействовать с пользовательским интерфейсом вашего приложения на протяжении всего. Потенциально ваши пользователи могут даже не знать, что ваше приложение выполняет некоторую интенсивную и длительную работу в фоновом режиме — вся информация о базе данных просто появится в пользовательском интерфейсе вашего приложения, как по волшебству!
Из коробки Android предоставляет несколько инструментов, которые можно использовать для создания дополнительных потоков, в том числе Service
и IntentService
, но эти решения сложны в реализации и могут быстро привести к сложному, подробному коду. Кроме того, если вы не реализуете многопоточность правильно, вы можете оказаться в приложении, которое теряет память и выдает всевозможные ошибки.
Чтобы сделать многопоточность на Android еще более вызывающей головную боль, основной поток пользовательского интерфейса Android — это единственный поток, который может обновлять пользовательский интерфейс вашего приложения. Если вы хотите обновить пользовательский интерфейс своего приложения с результатом работы, выполненной в любом другом потоке , то обычно вам нужно создать Handler
в основном потоке пользовательского интерфейса, а затем использовать этот Handler
для передачи данных из фонового потока в Основная тема. Это означает больше кода, больше сложности и больше возможностей для ошибок проникать в ваш проект.
Но в RxJava есть два оператора, которые могут помочь вам избежать этой сложности и ошибок.
Обратите внимание, что вы используете эти операторы в сочетании с Schedulers
, которые по сути являются компонентами, которые позволяют указывать потоки . Пока просто думайте о планировщике как о синониме слова thread .
-
subscribeOn(Scheduler)
: по умолчаниюObservable
отправляет свои данные в поток, в котором была объявлена подписка, т. е. где вы.subscribe
метод.subscribe
. В Android это, как правило, основной поток пользовательского интерфейса. Вы можете использовать операторsubscribeOn()
чтобы определить другойScheduler
котором должен выполнятьсяObservable
и отправлять свои данные. -
observeOn(Scheduler)
: вы можете использовать этот оператор для перенаправления излучений вашегоObservable
в другойScheduler
, эффективно изменяя поток, в который отправляются уведомленияObservable
, и, соответственно, поток, в котором используются его данные.
RxJava поставляется с несколькими планировщиками, которые вы можете использовать для создания различных потоков, в том числе:
-
Schedulers.io()
: предназначен для использования в задачах, связанных с вводом-выводом. -
Schedulers.computation()
: предназначен для использования в вычислительных задачах. По умолчанию количество потоков в планировщике вычислений ограничено количеством процессоров, доступных на вашем устройстве. -
Schedulers.newThread()
: создает новый поток.
Теперь у вас есть обзор всех движущихся частей, давайте посмотрим на некоторые примеры использования subscribeOn()
и observeOn()
и увидим некоторые планировщики в действии.
subscribeOn()
В Android вы, как правило, будете использовать subscribeOn()
и сопровождающий Scheduler
чтобы изменить поток, в котором выполняется некоторая длительная или интенсивная работа, поэтому нет риска заблокировать основной поток пользовательского интерфейса. Например, вы можете решить импортировать большой объем данных в планировщик io()
или выполнить некоторые вычисления в планировщике computation()
.
В следующем коде мы создаем новый поток, в котором Observable
выполнит свои операции и выдаст значения 1
, 2
и 3
.
1
2
3
|
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.newThread())
.subscribe(Observer);
|
Хотя это все, что вам нужно для создания потока и начала генерирования данных в этом потоке, вам может потребоваться подтверждение того, что эта наблюдаемая действительно работает в новом потоке. Один из способов — напечатать название потока, который ваше приложение использует в Android Studio. Logcat Monitor.
Для удобства в предыдущем посте « Начало работы с RxJava» мы создали приложение, которое отправляет сообщения в Logcat Monitor на разных этапах в течение жизненного цикла Observable, поэтому мы можем повторно использовать большую часть этого кода.
Откройте проект, который вы создали в этом посте, и настройте свой код так, чтобы он использовал вышеприведенную Observable
качестве исходного Observable
. Затем добавьте оператор subscribeOn()
и укажите, что сообщения, отправляемые в Logcat, должны содержать имя текущего потока.
Ваш готовый проект должен выглядеть примерно так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class MainActivity extends AppCompatActivity {
public static final String TAG = «MainActivity»;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.newThread())
.subscribe(Observer);
}
Observer<Integer> Observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, «onSubscribe» + Thread.currentThread().getName());
}
@Override
public void onNext(Integer value) {
Log.e(TAG, «onNext: » + value + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, «onError: «);
}
@Override
public void onComplete() {
Log.e(TAG, «onComplete: All Done!» + Thread.currentThread().getName());
}
};
}
|
Убедитесь, что Logcat Monitor в Android Studio открыт (выбрав вкладку Android Monitor , а затем Logcat ), а затем запустите свой проект на физическом устройстве Android или AVD. Вы должны увидеть следующий вывод в Logcat Monitor:
Здесь вы можете видеть, что .subscribe
вызывается в основном потоке пользовательского интерфейса, но наблюдаемый работает в совершенно другом потоке.
Оператор subscribeOn()
будет иметь одинаковый эффект независимо от того, где вы помещаете его в наблюдаемую цепочку; однако вы не можете использовать несколько операторов subscribeOn()
в одной цепочке. Если вы включаете более одного subscribeOn()
, ваша цепочка будет использовать только метод subscribeOn()
, ближайший к наблюдаемому источнику.
observeOn()
В отличие от subscribeOn()
, где вы observeOn()
в вашу цепочку observeOn()
, как следует, так как этот оператор изменяет только поток, который используется наблюдаемыми, которые появляются вниз по течению .
Например, если вы вставили следующее в свою цепочку, то все наблюдаемые, которые появляются в цепочке с этого момента, будут использовать новую нить.
1
|
.observeOn(Schedulers.newThread())
|
Эта цепочка будет продолжать работать в новом потоке до тех пор, пока не встретит другой оператор observeOn()
, после чего она переключится на поток, указанный этим оператором. Вы можете контролировать поток, в который конкретные наблюдаемые отправляют свои уведомления, вставляя в вашу цепочку несколько операторов observeOn()
.
При разработке приложений для Android вы, как правило, будете использовать observeOn()
для отправки результатов работы, выполненной в фоновых потоках, в основной поток пользовательского интерфейса Android. Самый простой способ перенаправить выбросы в основной поток пользовательского интерфейса Android — это использовать AndroidSchedulers.mainThread Scheduler
, который входит в состав библиотеки RxAndroid, а не библиотеки RxJava.
Библиотека RxAndroid включает в себя специфичные для Android привязки для RxJava 2, что делает ее ценным дополнительным ресурсом для разработчиков Android (и кое-что мы рассмотрим более подробно в следующем посте этой серии).
Чтобы добавить RxAndroid в свой проект, откройте файл build.gradle уровня модуля и добавьте последнюю версию библиотеки в раздел зависимостей. На момент написания статьи последняя версия RxAndroid была 2.0.1, поэтому я добавляю следующее:
1
2
3
4
5
6
|
dependencies {
…
…
…
compile ‘io.reactivex.rxjava2:rxandroid:2.0.1’
}
|
После добавления этой библиотеки в ваш проект вы можете указать, что результаты наблюдаемого должны отправляться в основной поток пользовательского интерфейса вашего приложения, используя одну строку кода:
1
|
.observeOn(AndroidSchedulers.mainThread())
|
Учитывая, что взаимодействие с основным потоком пользовательского интерфейса вашего приложения занимает полную страницу официальных документов по Android, это огромное улучшение, которое потенциально может сэкономить вам много времени при создании многопоточных приложений Android.
Основной недостаток RxJava
Хотя у RxJava есть что предложить разработчикам Android, ни одна технология не идеальна, и у RxJava есть одна серьезная ошибка, которая может привести к сбою вашего приложения.
По умолчанию RxJava использует рабочий процесс на основе push: данные создаются восходящим потоком с помощью Observable
, а затем передаются вниз по потоку назначенному Observer
. Основная проблема, связанная с рабочим процессом на основе push, заключается в том, насколько просто для производителя (в данном случае, Observable
) слишком быстро испускать элементы для обработки потребителем ( Observer
).
Болтливый Observable
и медленный Observer
могут быстро привести к отставанию неиспользованных элементов, что может поглотить системные ресурсы и даже может привести к OutOfMemoryException
. Эта проблема известна как противодавление .
Если вы подозреваете, что в вашем приложении возникает противодавление, есть несколько возможных решений, в том числе использование оператора для уменьшения количества производимых элементов.
Создание периодов выборки с помощью sample()
и throttlefirst()
Если Observable
объект испускает большое количество элементов, то для назначенного Observer
может быть необязательно получать каждый из этих элементов.
Если вы можете безопасно игнорировать некоторые выбросы в Observable
, то есть несколько операторов, которые вы можете использовать для создания периодов выборки, а затем конкретные значения, выбранные в эти периоды:
- Оператор
sample()
проверяет выходные данные Observable с указанными вами интервалами, а затем принимает самый последний элемент, который был передан в течение этого периода выборки. Например, если вы.sample(5, SECONDS)
в свой проект, то наблюдатель получит последнее значение, которое было отправлено в течение каждого.sample(5, SECONDS)
интервала. - Оператор
throttleFirst()
принимает первое значение, которое было передано в течение периода выборки. Например, если вы.throttlefirst(5, SECONDS)
то наблюдатель получит первое значение, которое испускается в течение каждого.throttlefirst(5, SECONDS)
интервала.
Пакетные выбросы с buffer()
Если вы не можете безопасно пропустить какие-либо выбросы, тогда вы все равно сможете снять давление с борющегося Observer
, сгруппировав выбросы по партиям, а затем отправив их в массовом порядке . Обработка пакетных выбросов, как правило, более эффективна, чем обработка нескольких выбросов по отдельности, поэтому такой подход должен повысить скорость потребления.
Вы можете создавать пакетные выбросы с помощью оператора buffer()
. Здесь мы используем buffer()
для пакетирования всех элементов, выпущенных за три секунды:
1
2
3
|
Observable.range(0, 10)
.buffer(3, SECONDS)
.subscribe(System.out::println);
|
Кроме того, вы можете использовать buffer()
для создания партии, состоящей из определенного количества выбросов. Например, здесь мы говорим, что buffer()
объединяет выбросы в группы по четыре:
1
2
3
|
Observable.range(0, 10)
.buffer(4)
.subscribe(System.out::println);
|
Замена наблюдаемых на текучие
Альтернативный метод уменьшения количества выбросов заключается в замене Observable
, вызывающего проблемы с Flowable
.
В RxJava 2 команда RxJava решила разделить стандарт Observable
на два типа: обычный вид, который мы рассматривали на протяжении всей этой серии, и Flowable
s.
Flowable
s функционирует почти так же, как Observable
s, но с одним существенным отличием: Flowable
s отправляет столько элементов, сколько запрашивает наблюдатель. Если у вас есть Observable
который испускает больше элементов, чем может назначить назначенный наблюдатель, вы можете вместо этого рассмотреть возможность перехода на Flowable
.
Прежде чем вы сможете начать использовать Flowable
в своих проектах, вам нужно добавить следующую инструкцию импорта:
1
|
import io.reactivex.Flowable;
|
Затем вы можете создавать Flowable
используя точно такие же методы, которые использовались для создания Observable
. Например, каждый из следующих фрагментов кода создаст Flowable
, способный выдавать данные:
1
2
3
|
Flowable<String> flowable = Flowable.fromArray(new String[] {«south», «north», «west», “east”});
…
flowable.subscribe()
|
1
2
3
|
Flowable<Integer> flowable = Flowable.range(0, 20);
…
flowable.subscribe()
|
В этот момент у вас может возникнуть вопрос: зачем мне использовать Observable
s, когда я могу просто использовать Flowable
s и не беспокоиться о противодавлении? Ответ заключается в том, что Flowable
влечет за собой больше накладных расходов, чем обычный Observable
, поэтому в интересах создания высокопроизводительного приложения вы должны придерживаться Observable
s, если не подозреваете, что ваше приложение борется с противодавлением.
одиночный разряд
Flowable
— не единственный вариант Observable
который вы найдете в RxJava, поскольку библиотека также включает в себя класс Single
.
Singles
полезны, когда вам просто нужно указать одно значение. В этих сценариях создание Observable
может показаться излишним, но Single
разработан так, чтобы просто выдавать одно значение и затем завершаться, либо вызывая:
-
onSuccess()
:Single
генерирует свое единственное значение. -
onError()
: еслиSingle
не может выдать свой элемент, он передаст этот метод в результатеThrowable
.
Single
вызовет только один из этих методов, а затем немедленно прекратит работу.
Давайте рассмотрим пример Single
в действии — опять же, чтобы сэкономить время, мы повторно используем код:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
public class MainActivity extends AppCompatActivity {
public static final String TAG = «MainActivity»;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Single.just(«Hello World»)
.subscribe(getSingleObserver());
}
private SingleObserver<String> getSingleObserver() {
return new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, «onSubscribe»);
}
@Override
public void onSuccess(String value) {
Log.e(TAG, » onSuccess : » + value);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, «onError: «);
}
};
}
}
|
Запустите свой проект на AVD или физическом устройстве Android, и вы увидите следующий вывод в Logcat Monitor Android Studio:
Если вы передумаете и захотите конвертировать Single
в Observable
в любой момент, тогда снова RxJava имеет все необходимые вам операторы, включая:
-
mergeWith()
: объединяет несколькоSingles
в однуObservable
. -
concatWith()
:concatWith()
элементы, испускаемые несколькимиconcatWith()
, для формированияObservable
излучения. -
toObservable()
: преобразуетSingle
вObservable
который испускает элемент, который был первоначально сгенерирован синглом, и затем завершается.
Резюме
В этой статье мы рассмотрели некоторые операторы RxJava, которые можно использовать для создания и управления несколькими потоками, без сложности и вероятности ошибок, которые традиционно сопровождают многопоточность в Android. Мы также увидели, как вы можете использовать библиотеку RxAndroid для связи с важнейшим основным потоком пользовательского интерфейса Android, используя одну строку кода, и как гарантировать, что обратное давление не станет проблемой в вашем приложении.
В этой серии мы несколько раз касались библиотеки RxAndroid, но эта библиотека содержит специфичные для Android привязки RxJava, которые могут быть неоценимы при работе с RxJava на платформе Android, поэтому в последнем посте этой серии мы будем смотреть на библиотеку RxAndroid более подробно.
А пока ознакомьтесь с некоторыми другими нашими статьями о кодировании для Android!