Статьи

Реактивное программирование Kotlin с RxJava и RxKotlin

Став официально поддерживаемым языком для разработки под Android, популярность Kotlin среди разработчиков под Android быстро растет, и Google сообщает о 6-кратном увеличении числа приложений, созданных с помощью Kotlin.

Если вы ранее использовали RxJava или RxAndroid и хотите переключиться на Kotlin, или хотите начать реактивное программирование с Kotlin, это руководство для вас. Мы рассмотрим основы создания RxJava 2.0 Observers , Observables и потоков данных в Kotlin, а затем рассмотрим, как можно обрезать тонну шаблонного кода из ваших проектов, комбинируя RxJava с функциями расширения Kotlin.

Использование RxJava с Kotlin может помочь вам создавать высокореактивные приложения с меньшим количеством кода, но ни один язык программирования не идеален, поэтому я также поделюсь обходным решением проблемы преобразования SAM, с которой сталкиваются многие разработчики, когда они впервые начинают использовать RxJava 2.0 с Kotlin.

Чтобы подвести итоги, мы создадим приложение, которое демонстрирует, как вы можете использовать RxJava для решения некоторых проблем, с которыми вы сталкиваетесь в реальных проектах Android.

Если это ваш первый вкус RxJava, то по пути я также предоставлю всю справочную информацию, необходимую для понимания основных понятий RxJava. Даже если вы никогда раньше не экспериментировали с RxJava, к концу этой статьи у вас будет четкое понимание того, как использовать эту библиотеку в ваших проектах, и вы создадите несколько рабочих приложений, используя RxJava, RxKotlin, RxAndroid и RxBinding.

RxJava — это реализация библиотеки ReactiveX с открытым исходным кодом, которая помогает создавать приложения в стиле реактивного программирования. Хотя RxJava предназначен для обработки синхронных и асинхронных потоков данных, он не ограничивается «традиционными» типами данных. Определение «данных» в RxJava довольно широкое и включает в себя такие вещи, как кэши, переменные, свойства и даже события пользовательского ввода, такие как щелчки и пролистывания. То, что ваше приложение не обрабатывает огромные числа и не выполняет сложные преобразования данных, не означает, что оно не может извлечь выгоду из RxJava!

Чтобы немного узнать об использовании RxJava для приложений Android, вы можете посмотреть некоторые другие мои посты здесь на Envato Tuts +.

  • Android SDK
    Начните с RxJava 2 для Android
  • Android SDK
    Приложения RxJava 2 для Android: RxBinding и RxLifecycle

Так как же работает RxJava?

RxJava расширяет шаблон проектирования программного обеспечения Observer, который основан на концепции Observers и Observables. Чтобы создать базовый конвейер данных RxJava, вам необходимо:

  • Создайте Observable.
  • Дайте наблюдаемой некоторые данные для излучения.
  • Создать Обозревателя.
  • Подпишите Наблюдателя на Обсерваторию.

Как только в Observable появится хотя бы один Observer, он начнет передавать данные. Каждый раз, когда Observable испускает часть данных, он уведомляет назначенного им Observer, вызывая метод onNext() , и затем Observer обычно выполняет некоторые действия в ответ на эту передачу данных. Когда Observable закончит передачу данных, он уведомит Обозревателя, вызвав onComplete() . Затем Observable завершится, и поток данных закончится.

Если возникает исключение, то onError() , и Observable немедленно прекращает работу, не отправляя больше данных и не вызывая onComplete() .

Но RxJava — это не просто передача данных из Observable в Observer! RxJava имеет огромный набор операторов, которые вы можете использовать для фильтрации, объединения и преобразования этих данных. Например, представьте, что в вашем приложении есть кнопка « Оплатить сейчас» , которая обнаруживает события onClick , и вы беспокоитесь, что нетерпеливый пользователь может нажать кнопку несколько раз, в результате чего ваше приложение будет обрабатывать несколько платежей.

RxJava позволяет вам преобразовывать эти события onClick в поток данных, которым вы затем можете манипулировать, используя различные операторы RxJava. В этом конкретном примере вы можете использовать оператор debounce() для фильтрации выбросов данных, которые происходят в быстрой последовательности, поэтому, даже если пользователь нажмет кнопку Pay Now , ваше приложение будет регистрировать только один платеж.

Мы видели, как RxJava может помочь вам решить конкретную проблему в конкретном приложении, но что он вообще может предложить для проектов Android?

RxJava может помочь упростить ваш код, давая вам возможность написать то, чего вы хотите достичь, вместо того, чтобы писать список инструкций, с которыми ваше приложение должно работать. Например, если вы хотите игнорировать все выбросы данных, которые происходят в течение одного 500-миллисекундного периода, вы должны написать:

1
.debounce(500, TimeUnit.MILLISECONDS)

Кроме того, поскольку RxJava обрабатывает почти все как данные, он предоставляет шаблон, который можно применять к широкому кругу событий: создать Observable, создать Observer, подписать Observer на Observable, промыть и повторить. Этот формульный подход приводит к гораздо более простому, удобочитаемому коду.

Другим важным преимуществом для разработчиков Android является то, что RxJava может избавить большую часть боли от многопоточности на Android. Сегодняшние мобильные пользователи ожидают, что их приложения смогут работать в многозадачном режиме, даже если это будет так просто, как загрузка данных в фоновом режиме, оставаясь при этом реагирующей на пользовательский ввод.

В Android есть несколько встроенных решений для создания и управления несколькими потоками, но ни одно из них не является особенно простым в реализации, и они могут быстро привести к сложному, подробному коду, который трудно читать и подвержен ошибкам.

В RxJava вы создаете и управляете дополнительными потоками, используя комбинацию операторов и планировщиков. Вы можете легко изменить поток, в котором выполняется работа, используя оператор subscribeOn плюс планировщик. Например, здесь мы планируем работу, которая будет выполняться в новом потоке:

1
.subscribeOn(Schedulers.newThread())

Вы можете указать, где должны быть опубликованы результаты этой работы, используя оператор observeOn . Здесь мы публикуем результаты в важнейшем основном потоке пользовательского интерфейса Android с помощью планировщика AndroidSchedulers.mainThread , который доступен как часть библиотеки RxAndroid:

1
.observeOn(AndroidSchedulers.mainThread())

По сравнению со встроенными в Android решениями многопоточности подход RxJava гораздо более лаконичен и понятен.

Опять же, вы можете узнать больше о том, как работает RxJava, и о преимуществах добавления этой библиотеки в ваш проект, в моей статье « Начало работы с RxJava 2 для Android» .

Поскольку Kotlin на 100% совместим с Java, вы можете без проблем использовать большинство библиотек Java в своих проектах Kotlin, и библиотека RxJava не является исключением.

Существует специальная библиотека RxKotlin , которая является оболочкой Kotlin вокруг обычной библиотеки RxJava. Эта обертка предоставляет расширения, которые оптимизируют RxJava для среды Kotlin и могут еще больше уменьшить объем стандартного кода, который вам нужно написать.

Поскольку вы можете использовать RxJava в Kotlin без необходимости использования RxKotlin, мы будем использовать RxJava в этой статье, если не указано иное.

Observers и Observables являются строительными блоками RxJava, поэтому давайте начнем с создания:

  • Простой Observable, который генерирует короткий поток данных в ответ на событие нажатия кнопки.
  • Наблюдаемая, которая реагирует на эти данные, печатая различные сообщения в Logcat Android Studio.

Создайте новый проект с настройками по вашему выбору, но убедитесь, что вы установили флажок Включить поддержку Kotlin при появлении запроса. Затем откройте файл build.gradle вашего проекта и добавьте библиотеку RxJava в качестве зависимости проекта:

1
2
3
4
5
6
7
8
dependencies {
  implementation fileTree(dir: ‘libs’, include: [‘*.jar’])
  implementation «org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version»
  implementation ‘androidx.appcompat:appcompat:1.0.0-alpha1’
  implementation ‘androidx.constraintlayout:constraintlayout:1.1.0’
  implementation ‘io.reactivex.rxjava2:rxjava:2.1.9’
 
}

Затем откройте файл activity_main.xml вашего проекта и добавьте кнопку, которая запустит поток данных:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
<?xml version=»1.0″ encoding=»utf-8″?>
<LinearLayout xmlns:android=»http://schemas.android.com/apk/res/android»
  xmlns:tools=»http://schemas.android.com/tools»
  android:layout_width=»match_parent»
  android:layout_height=»match_parent»
  android:orientation=»vertical»
  tools:context=».MainActivity» >
 
  <Button
      android:id=»@+id/button»
      android:layout_width=»wrap_content»
      android:layout_height=»wrap_content»
      android:text=»Start RxJava stream» />
 
</LinearLayout>

Существует несколько различных способов создания Observable, но одним из самых простых является использование оператора just() для преобразования объекта или списка объектов в Observable.

В следующем коде мы создаем Observable ( myObservable ) и даем ему элементы 1, 2, 3, 4 и 5 для излучения. Мы также создаем Observer ( myObserver ), подписываем его на myObservable , а затем говорим ему печатать сообщение в Logcat каждый раз, когда он получает новую эмиссию.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import kotlinx.android.synthetic.main.activity_main.*
 
class MainActivity : AppCompatActivity() {
 
  private var TAG = «MainActivity»
 
  override fun onCreate(savedInstanceState: Bundle?) {
      super.onCreate(savedInstanceState)
      setContentView(R.layout.activity_main)
 
//Start the stream when the button is clicked//
 
      button.setOnClickListener { startRStream() }
 
  }
 
  private fun startRStream() {
 
//Create an Observable//
 
      val myObservable = getObservable()
 
//Create an Observer//
 
      val myObserver = getObserver()
 
//Subscribe myObserver to myObservable//
 
      myObservable
              .subscribe(myObserver)
  }
 
  private fun getObserver(): Observer<String> {
      return object : Observer<String> {
          override fun onSubscribe(d: Disposable) {
          }
 
//Every time onNext is called, print the value to Android Studio’s Logcat//
 
          override fun onNext(s: String) {
              Log.d(TAG, «onNext: $s»)
          }
 
//Called if an exception is thrown//
 
          override fun onError(e: Throwable) {
              Log.e(TAG, «onError: » + e.message)
          }
 
//When onComplete is called, print the following to Logcat//
 
          override fun onComplete() {
              Log.d(TAG, «onComplete»)
          }
      }
  }
 
//Give myObservable some data to emit//
 
  private fun getObservable(): Observable<String> {
      return Observable.just(«1», «2», «3», «4», «5»)
  }
}

Теперь вы можете проверить это приложение:

  • Установите свой проект на физический смартфон или планшет Android или виртуальное устройство Android (AVD).
  • Нажмите кнопку запуска потока RxJava .
  • Откройте монитор Logcat в Android Studio, выбрав вкладку « Монитор Android » (где курсор расположен на следующем снимке экрана), а затем выберите вкладку « Logcat ».

В этот момент Observable начнет излучать свои данные, а Observer напечатает свои сообщения в Logcat. Ваш вывод Logcat должен выглядеть примерно так:

Check Android Studios Logcat Monitor

Вы можете скачать этот проект с GitHub, если вы хотите попробовать его сами.

Теперь, когда мы увидели, как настроить простой конвейер RxJava в Kotlin, давайте посмотрим, как вы можете добиться этого за меньшее количество кода, используя функции расширения RxKotlin.

Чтобы использовать библиотеку RxKotlin, вам нужно добавить ее как зависимость проекта:

01
02
03
04
05
06
07
08
09
10
11
12
dependencies {
  implementation fileTree(dir: ‘libs’, include: [‘*.jar’])
  implementation «org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version»
  implementation ‘androidx.appcompat:appcompat:1.0.0-alpha1’
  implementation ‘androidx.constraintlayout:constraintlayout:1.1.0’
  implementation ‘io.reactivex.rxjava2:rxjava:2.1.9’
 
//Add the following//
 
  implementation ‘io.reactivex.rxjava2:rxkotlin:2.2.0’
 
}

В следующем примере мы используем функцию расширения toObservable() чтобы преобразовать List в Observable. Мы также используем функцию расширения subscribeBy() , поскольку она позволяет нам создавать Observer, используя именованные аргументы, что приводит к более четкому коду.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
import kotlinx.android.synthetic.main.activity_main.*
 
class MainActivity : AppCompatActivity() {
 
  override fun onCreate(savedInstanceState: Bundle?) {
      super.onCreate(savedInstanceState)
      setContentView(R.layout.activity_main)
 
//Start the stream when the button is clicked//
 
      button.setOnClickListener { startRStream() }
 
  }
 
  private fun startRStream() {
 
      val list = listOf(«1», «2», «3», «4», «5»)
 
//Apply the toObservable() extension function//
 
      list.toObservable()
 
//Construct your Observer using the subscribeBy() extension function//
 
              .subscribeBy(
 
                      onNext = { println(it) },
                      onError = { it.printStackTrace() },
                      onComplete = { println(«onComplete!») }
 
              )
  }
}

Вот вывод, который вы должны увидеть:

Каждый раз, когда вызывается onNext, эмиссия данных печатается в Android Studios Logcat.

RxKotlin также предоставляет важный обходной путь для проблемы преобразования SAM, которая может возникнуть, когда в данном методе Java есть несколько перегрузок параметров SAM. Эта неоднозначность SAM приводит в замешательство компилятор Kotlin, так как он не может определить, какой интерфейс он должен конвертировать, и в результате ваш проект не удастся скомпилировать.

Эта неоднозначность SAM является особой проблемой при использовании RxJava 2.0 с Kotlin, поскольку многие операторы RxJava принимают несколько типов, совместимых с SAM.

Давайте посмотрим на проблему преобразования SAM в действии. В следующем коде мы используем оператор zip() для объединения вывода двух Observables:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import androidx.appcompat.app.AppCompatActivity
import android.os.Bundle
import io.reactivex.Observable
import kotlinx.android.synthetic.main.activity_main.*
 
class MainActivity : AppCompatActivity() {
 
  override fun onCreate(savedInstanceState: Bundle?) {
      super.onCreate(savedInstanceState)
      setContentView(R.layout.activity_main)
 
//Start the stream when the button is clicked//
 
      button.setOnClickListener { startRStream() }
       
  }
 
  private fun startRStream() {
 
      val numbers = Observable.range(1, 6)
 
      val strings = Observable.just(«One», «Two», «Three»,
 
              «Four», «Five», «Six» )
 
      val zipped = Observable.zip(strings, numbers) { s, n -> «$s $n» }
      zipped.subscribe(::println)
  }
}

Это заставит компилятор Kotlin выдавать ошибку вывода типа. Однако RxKotlin предоставляет вспомогательные методы и функции расширения для затронутых операторов, включая Observables.zip() , который мы используем в следующем коде:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables
import kotlinx.android.synthetic.main.activity_main.*
 
class MainActivity : AppCompatActivity() {
 
  override fun onCreate(savedInstanceState: Bundle?) {
      super.onCreate(savedInstanceState)
      setContentView(R.layout.activity_main)
 
//Start the stream when the button is clicked//
 
      button.setOnClickListener { startRStream() }
 
  }
 
  private fun startRStream() {
 
      val numbers = Observable.range(1, 6)
 
      val strings = Observable.just(«One», «Two», «Three»,
 
              «Four», «Five», «Six» )
 
      val zipped = Observables.zip(strings, numbers) { s, n -> «$s $n» }
      zipped.subscribe(::println)
  }
 
 
}

Вот вывод этого кода:

Переключитесь на оператор Observableszip, и компилятор Kotlin больше не будет выдавать ошибку вывода типа

В этом руководстве я показал вам, как начать использовать библиотеку RxJava в ваших проектах Kotlin, включая использование ряда дополнительных вспомогательных библиотек, таких как RxKotlin и RxBinding. Мы рассмотрели, как вы можете создавать простые наблюдатели и наблюдаемые в Kotlin, вплоть до оптимизации RxJava для платформы Kotlin, используя функции расширения.

До сих пор мы использовали RxJava для создания простых Observable, которые генерируют данные, и Observers, которые печатают эти данные в Logcat в Android Studio — но вы не будете использовать RxJava в реальном мире!

В следующем посте мы рассмотрим, как RxJava может помочь в решении реальных проблем, с которыми вы столкнетесь при разработке приложений для Android. Мы будем использовать RxJava с Kotlin для создания классического экрана регистрации.