Статьи

10 необходимых для понимания функций RxJS с примерами

Эта статья была рецензирована Флорианом Рапплом и Морицем Крёгером . Спасибо всем рецензентам SitePoint за то, что сделали контент SitePoint как можно лучше!

По мере роста интереса к функционально-реактивному программированию (FRP) RxJS стала одной из самых популярных библиотек JavaScript для программирования в этой парадигме. В этой статье мы рассмотрим то, что я считаю десятью функциями, которые должны знать RxJS.

Примечание. Эта статья предполагает знакомство с основами RxJS, как представлено в статье « Введение в функционально-реактивное программирование с использованием RxJS» .

Реактивное программирование

Реактивное программирование — это парадигма программирования, которая рассматривает потоки данных, называемые Observables, в качестве основных единиц программирования.

Потоки — или Observables , на языке RxJS — аналогичны прослушивателям событий: оба ждут, что что-то произойдет, и уведомляют вас, когда это происходит. Серия асинхронных уведомлений, которые вы получаете от слушателя onClick, является прекрасным примером потока данных.

Иными словами, Observable — это не что иное, как массив, который заполняется с течением времени .

Элементы этого массива могут поступать практически из любого места: файловая система, события DOM, вызовы API или даже преобразованные синхронные данные, например массивы. По сути, реактивное программирование — это не что иное, как использование Observables в качестве строительных блоков ваших программ.

Отношение к массивам

Массивы просты, потому что их содержимое является окончательным, если явно не изменено. В этом смысле нет ничего по существу временного в массиве.

Наблюдаемое, с другой стороны, определяется временем. Максимум, что вы можете знать о потоке, это то, что он получил [1, 2, 3] до сих пор. Вы не можете быть уверены, что когда-либо получите 4 или не получите — и это решает источник данных, а не ваша программа.

Взаимосвязь между потоками и массивами настолько глубока, что большинство реактивных расширений происходят из мира функционального программирования, где операции со списками — это хлеб с маслом.

Разогрев до RxJS

Рассмотрим слишком общее приложение для дел. Давайте посмотрим, как выглядит проблема отображения только имен незавершенных задач пользователя с помощью RxJS:

 const task_stream = // Makes a stream of all the tasks in the database getTasks(). // Get tasks only for this user filter((task) => task.user_id == user_id). // Get tasks that are incompleted filter((task) => !task.completed). // Only get name of task map((task) => task.name) /* Tasks look like this: task = { user_id : number, completed : boolean, name : string } */ 

Пока это не что иное, как массивы , но демонстрирует функциональный вкус реактивного программирования.

Его декларативный характер становится понятным с добавлением более сложных, «реальных» функций. Допустим, мы хотим:

  • Отклонить запрос в ответ на выбор пользователя для просмотра завершенных или незавершенных задач;
  • Отправляйте запрос на последний выбор только каждую секунду, чтобы не тратить пропускную способность, если пользователь быстро меняет свой выбор;
  • Повторите неудачные запросы до трех раз; и
  • Перерисовывать представление только тогда, когда сервер отправляет другой ответ с прошлого раза.
 const task_stream = parameter_stream. debounce(1000). map((parameter) => { getTasks(). retry(3). filter((task) => task.user_id === user_id). filter((task) => task.completed === parameter). map((task) => task.name) }). flatMap(Rx.Observable.from). distinctUntilChanged(). update() 

Шаг за шагом:

  • parameter_stream сообщает нам, хочет ли пользователь выполнить полные или неполные задачи, сохраняя выбор в parameter ;
  • debounce() гарантирует, что мы обращаем внимание только на последнее нажатие кнопки каждую секунду;
  • Раздел вокруг getTasks() делает то же самое, что и раньше;
  • differentUntilChanged distinctUntilChanged() гарантирует, что мы обращаем внимание только тогда, когда ответ сервера отличается от того, который был в прошлый раз; и
  • update() заботится об обновлении пользовательского интерфейса, чтобы отразить то, что мы получили от сервера.

Обработка логики debounce, retry и «различима до изменения» в императивном стиле на основе обратного вызова допустима, но она может быть как хрупкой, так и сложной.

Вывод заключается в том, что программирование с помощью RxJS позволяет:

  1. Декларативные программы;
  2. Расширяемые системы; и
  3. Простая и надежная обработка ошибок.

Мы познакомимся с каждой из функций в приведенном выше примере в нашем туре с помощью десяти обязательных функций RxJS.

Операции на простых потоках

Фундаментальные функции в простых потоках — те, которые передают простые значения, такие как строки — включают в себя:

За исключением take() и takeWhile() , они аналогичны функциям массива JavaScript более высокого порядка.

Мы применим каждый из них, решив примерную задачу: найти всех пользователей в нашей базе данных, которые имеют веб-сайт .com или .org, и вычислить среднюю длину имен их веб-сайтов.

JSONPlaceholder будет нашим источником пользователей. Вот JSON-представление пользовательских данных, с которыми мы будем работать.

1. Использование map () для преобразования данных

Использование map() в Observable идентично использованию его в массиве. Это:

  1. Принимает обратный вызов в качестве аргумента;
  2. Выполняет его для каждого элемента массива, к которому вы его вызывали; и
  3. Возвращает новый массив, в котором каждый элемент исходного массива заменяется результатом вызова функции обратного вызова.

Единственные различия при использовании map() в Observables заключаются в том, что:

  1. Вместо того, чтобы возвращать новый массив, он возвращает новый Observable; и
  2. Он выполняется каждый раз, когда Observable испускает новый предмет, вместо того, чтобы сразу и все сразу.

Мы можем использовать map() для преобразования нашего потока пользовательских данных в просто список имен их веб-сайтов:

 source. map((user) => user.website) 

Здесь мы использовали map чтобы «заменить» каждый пользовательский объект во входящем потоке на веб-сайт каждого пользователя.

RxJS также позволяет вам вызывать map() как select() . Оба имени относятся к одной и той же функции.

2. Фильтрация результатов

Как и map() , filter() во многом совпадает с Observables и с массивами. Чтобы найти каждого пользователя с адресом веб-сайта .net или .org, мы напишем:

 source. map((user) => user.website). filter((website) => (website.endsWith('net') || website.endsWith('org')); }) 

При этом выбираются только пользователи, чьи веб-сайты заканчиваются на «net» или «org».

filter() также есть псевдоним where() .

3. Сбор результатов с помощью метода less ()

reduce() позволяет нам использовать все наши индивидуальные значения и превращать их в один результат.

reduce() путаница в базовых операциях со списком, как правило, приводит к reduce() , поскольку, в отличие от filter() или map() , оно ведет себя по-разному от использования к использованию.

В общем случае, reduce() принимает коллекцию значений и превращает ее в одну точку данных. В нашем случае мы передадим ему поток имен веб-сайтов и будем использовать метод Redu reduce() чтобы преобразовать этот поток в объект, который подсчитывает количество найденных нами веб-сайтов и сумму длин их названий.

 source. map((user) => user.website). filter((website) => (website.endsWith('net') || website.endsWith('org'))). reduce((data, website) => { return { count : data.count += 1, name_length : data.name_length += website.length } }, { count : 0, name_length : 0 }) 

Здесь мы сокращаем наш поток до одного объекта, который отслеживает:

  1. Сколько сайтов мы видели; и
  2. Общая длина всех их имен.

Имейте в виду, что reduce() возвращает результат только тогда, когда источник Observable завершается. Если вы хотите узнать состояние аккумулятора каждый раз, когда поток получает новый элемент, используйте взамен scan() .

4. Ограничение результатов с помощью take ()

take() и takeWhile() основные функции на простых потоках.

take(n) читает n значений из потока, а затем отписывается.

Мы можем использовать scan() чтобы выдавать наш объект каждый раз, когда мы получаем веб-сайт, и take() только take() первые два значения.

 source. map((user) => user.website). filter((website) => (website.endsWith('net') || website.endsWith('org'))). scan((data, website) => { return { count : data.count += 1, name_length : data.name_length += website.length } }, { count : 0, name_length : 0 }). take(2); 

RxJS также предлагает takeWhile() , который позволяет вам принимать значения, пока не выполнится какой-либо логический тест. Мы можем написать вышеупомянутый поток с помощью takeWhile() следующим образом:

 source. map((user) => user.website). filter((website) => (website.endsWith('net') || website.endsWith('org'))). scan((data, website) => { return { count : data.count += 1, name_length : data.name_length += website.length } }, { count : 0, name_length : 0 }). takeWhile((data) => data.count < 3) 

Операции над потоками высшего порядка

Помимо того, что они работают с Observables, а не с массивами, эти функции практически идентичны привычным операциям со списком.

«[Если] вы знаете, как программировать на Arrays, используя дополнительные функции Array #, то вы уже знаете, как использовать RxJS!» ~ Документация RxJS

Как массивы могут содержать более сложные данные, чем простые значения, такие как массивы или объекты, так и Observables могут испускать данные более высокого порядка, такие как Promises или другие Observables. Здесь вступают в игру более специализированные инструменты.

5. Сжатие потоков с помощью flatMap ()

, , , На самом деле, мы уже используем один!

Мы сделали вызовы fromPromise() и flatMap() когда мы определили наш source поток:

 const source = // Take a Promise and convert it to an Observable Rx.Observable.fromPromise(makeRequest(ENDPOINT)) // Flatten Promise .flatMap(Rx.Observable.from); 

Это использует три части новой техники:

  1. fromPromise ;
  2. Rx.Observable.from ; и
  3. FlatMap .

Наблюдаемые от обещаний

Обещание представляет собой одно будущее значение, которое мы получим асинхронно — например, результат вызова сервера.

Одной из определяющих особенностей Обещания является то, что он представляет только одну будущую ценность. Он не может возвращать несколько асинхронных фрагментов данных; это то, что делают Observables, и это фундаментальное различие между ними.

Это означает, что когда мы используем Rx.Observable.fromPromise() , мы получаем Observable, который испускает одно значение — либо:

  1. Значение, которое обещает решить Обещание; или
  2. Ошибка, с которой Обещание отклоняется.

Когда Promise возвращает строку или число, нам не нужно делать ничего особенного. Но когда он возвращает массив, как это происходит в нашем случае, мы бы предпочли создать Observable, который испускает содержимое массива, а не сам массив как одно значение.

6. Использование flatMap ()

Этот процесс называется уплощением, о котором flatMap() . Он имеет ряд перегрузок , но мы будем использовать только самые простые и самые распространенные из них.

При использовании flatMap() мы:

  1. Вызовите flatMap() для Observable, который выдает однозначное разрешение или отклонение Promise; и
  2. Передайте ему функцию для создания нового Observable с помощью.

В нашем случае мы передаем Rx.Observable.from() , который создает последовательность из значений массива:

 Rx.Observable.from([1, 2, 3]). subscribe( onNext (value) => console.log(`Next: ${value}`)) // Prints: // Next: 1 // Next: 2 // Next: 3 

Это покрывает код нашей маленькой прелюдии:

 const source = // Create an Observable emitting the VALUE or REJECTION of a Promise... Rx.Observable.fromPromise(makeRequest(ENDPOINT)) // ...And turn it into a new Observable that emits every item of the // array the Promise resolves to. .flatMap(Rx.Observable.from) 

RxJS также имеет псевдоним для flatMap() : selectMany() .

Составление нескольких потоков

Часто у нас будет несколько потоков, которые нам нужно собрать. Есть много способов объединить потоки, но есть несколько, которые подходят больше, чем другие.

7. Объединение потоков с помощью concat () и merge ()

Конкатенация и слияние являются двумя наиболее распространенными способами объединения потоков.

Конкатенация создает новый поток, испуская значения первого потока до его завершения, а затем испуская значения второго потока.

Слияние создает новый поток из множества потоков, испуская значения из того потока, который активен

Подумайте о том, чтобы поговорить с двумя людьми одновременно в Facebook Messenger. concat() — это сценарий, в котором вы получаете сообщения от обоих, но завершите беседу с одним человеком, прежде чем ответить другому. merge() похоже на создание группового чата и одновременный прием обоих потоков сообщений.

 source1. concat(source2). subscribe( onNext(value) => console.log(`Next: ${value}`)) // Prints 'Source 1' values first, THEN 'Source 2' source1. merge(source2). subscribe( onNext(value) => console.log(`Next: ${value}`)) // INTERLEAVES 'Source 1' and 'Source 2' values 

Поток concat() сначала напечатает все значения из source1 , и только начнет печатать значения из source2 после завершения source1 .

Поток merge() будет печатать значения из source1 и source2 мере их получения: он не будет ждать завершения первого потока, прежде чем отправлять значения из второго.

8. Используя переключатель ()

Часто мы хотим слушать Observable испускающие Observables, но обращаем внимание только на последнее излучение источника.

Чтобы еще больше расширить аналогию с Facebook Messenger, switch() — это тот случай, когда вы. , , Ну, поменяйте, кому вы отвечаете, исходя из того, кто в данный момент отправляет сообщения.

Для этого RxJS предоставляет коммутатор .

Пользовательские интерфейсы предоставляют несколько хороших вариантов использования для switch() . Если наше приложение запускает запрос каждый раз, когда пользователь выбирает то, что он хочет найти, мы можем предположить, что он хочет видеть только результаты из последнего выбора. Итак, мы используем switch() для прослушивания только результата из последнего выбора.

Пока мы на этом, мы должны следить за тем, чтобы не тратить впустую полосу пропускания, просто нажимая на сервер для последнего выбора, который пользователь делает каждую секунду. Функция, которую мы используем для этого, называется debounce()

Если вы хотите пойти в другом направлении и соблюдать только первый выбор, вы должны использовать throttle () . У него такой же API, но поведение противоположное.

9. Координирующие потоки

Что если мы хотим разрешить пользователю выполнять поиск сообщения или пользователя с определенным идентификатором?

Для демонстрации мы создадим еще одно раскрывающееся меню и позволим пользователям выбирать идентификатор элемента, который они хотят получить.

Есть два сценария. Мы запускаем запрос, когда пользователь:

  1. Изменяет любой выбор; или
  2. Изменяет оба выбора.

Отвечать на изменения в любом потоке с помощьюlateLatest ()

В первом случае нам нужно создать поток, который запускает сетевой запрос:

  1. Какую бы конечную точку пользователь не выбрал последним; и
  2. Какой идентификатор пользователь выбрал последним.

, , , И делайте это всякий раз, когда пользователь обновляет любой выбор.

Это то, что combineLatest() для:

 // User's selection for either POSTS or USERS data const endpoint_stream = Rx.Observable.fromEvent(select_endpoint, 'click'). map(event => event.target). map(target => (target.options[target.selectedIndex].text.toLowerCase())); // Which item ID the user wants to retrieve const id_stream = Rx.Observable.fromEvent(select_id, 'click'). map(event => event.target). map(target => (target.options[target.selectedIndex].text)); // Emits a pair of the most recent selections from BOTH streams // when EITHER emits a value const complete_endpoint_stream = endpoint_stream.combineLatest(id_stream); 

Всякий раз, когда какой-либо из потоков выдает значение, combineLatest() , что combineLatest() принимает combineLatest() значение и combineLatest() его с последним элементом, который испускал другой поток, и испускает пару в массиве.

Это легче визуализировать на диаграмме:

 // stream1 : Emits 1 // stream2 : Emits 1 combined : Emits [1, 1] // stream2: Emits 2 combined : Emits [1, 2] // stream2: Emits 3 combined : Emits [1, 3] 

Отвечать только на изменения в обоих потоках с zip

Чтобы подождать, пока пользователь не обновит свой выбор для полей id и конечной точки, замените combineLatest() на zip() .

Опять же, это легче понять с помощью диаграммы:

 // stream1 : Emits A // stream2 : Emits 1 zipped : Emits [A, 1] // stream2: Emits 2 zipped : Emits NOTHING // stream2: Emits 3 zipped : Emits NOTHING // stream1: Emits B zipped : Emits [B, 2] // stream1: Emits C zipped : Emits [C, 3] 

В отличие от combineLatest() , zip() ждет, пока оба Observables не выпустят что-то новое, прежде чем выдать свой массив обновленных значений.

10. takeUntil

Наконец, takeUntil() позволяет нам прослушивать первый поток, пока второй не начнет излучать значения.

 source1. takeUntil(source2); 

Это полезно, когда вам нужно координировать потоки, но не обязательно объединять их.

Завершение

Простой факт добавления измерения времени к массивам открывает двери для совершенно нового мышления о программах.

RxJS — это гораздо больше, чем то, что мы видели здесь, но этого достаточно, чтобы удивительно далеко продвинуться.

Начните работать с RxJS Lite , держите документацию под рукой и потратьте время, чтобы испачкать руки. Прежде чем вы это знаете, все будет выглядеть как поток. , , Потому что все есть.