Статьи

Введение в функционально-реактивное программирование с RxJS

Эта статья была рецензирована Морицем Крёгером , Бруно Мота и Вилданом Софтиком . Спасибо всем рецензентам SitePoint за то, что сделали контент SitePoint как можно лучше!

Прежде чем мы углубимся в тему, мы должны ответить на ключевой вопрос: что такое реактивное программирование? На сегодняшний день самым популярным ответом является то, что реактивное программирование — это программирование с одновременными потоками данных. Большую часть времени мы обнаружим, что слово concurrent заменено асинхронным, однако позже мы увидим, что поток не должен быть асинхронным.

Легко видеть, что подход «все является потоком» может применяться непосредственно к нашим задачам программирования. В конце концов, процессор — это не что иное, как устройство, которое обрабатывает поток информации, состоящий из инструкций и данных. Наша цель — наблюдать за этим потоком и преобразовывать его в случае конкретных данных.

Принципы реактивного программирования не являются совершенно новыми для JavaScript. У нас уже есть такие вещи, как привязка свойств, шаблон EventEmitter или потоки Node.js. Иногда элегантность этих методов сопровождается снижением производительности, чрезмерно сложными абстракциями или проблемами с отладкой. Обычно эти недостатки минимальны по сравнению с преимуществами нового уровня абстракции. Наши минимальные примеры, конечно, не будут отражать обычное применение, но будут как можно более краткими и краткими.

Без лишних слов давайте запачкаем руки, поиграв с библиотекой Reactive Extensions for JavaScript (RxJS). RxJS часто использует цепочку, которая является популярной техникой, также используемой в других библиотеках, таких как jQuery. Руководство по объединению методов (в контексте Ruby) доступно в SitePoint .

Примеры потоков

Прежде чем мы углубимся в RxJS, мы должны перечислить несколько примеров для работы позже. Это также завершит введение в реактивное программирование и потоки в целом.

В целом, мы можем выделить два вида потоков: внутренние и внешние. В то время как первое может считаться искусственным и находится под нашим контролем, второе происходит из источников, находящихся вне нашего контроля. Внешние потоки могут быть вызваны (прямо или косвенно) из нашего кода.

Обычно потоки не ждут нас. Они случаются, справимся ли мы с ними или нет. Например, если мы хотим наблюдать за машинами на дороге, мы не сможем перезапустить поток машин. Поток происходит независимо от того, наблюдаем мы это или нет. В терминологии Rx мы называем это горячей наблюдаемой . Rx также вводит холодные наблюдаемые , которые ведут себя больше как стандартные итераторы, так что информация из потока состоит из всех элементов для каждого наблюдателя.

Следующие изображения иллюстрируют некоторые внешние виды потоков. Мы видим, что упоминаются (ранее запущенные) запросы и обычно настраиваемые веб-хуки, а также события пользовательского интерфейса, такие как взаимодействие мыши или клавиатуры. Наконец, мы также можем получать данные от устройств, например, датчиков GPS, акселерометра или других датчиков.

Изображение, показывающее различные типы потоков

Изображение также содержало один поток, отмеченный как Сообщения . Сообщения могут появляться в нескольких формах. Одна из самых простых форм — это общение между нашим сайтом и другим сайтом. Другие примеры включают общение с WebSockets или веб-работниками. Давайте посмотрим пример кода для последнего.

Код работника представлен ниже. Код пытается найти простые числа от 2 до 10 10 . Как только число найдено, результат сообщается.

 (function (start, end) { var n = start - 1; while (n++ < end) { var k = Math.sqrt(n); var found = false; for (var i = 2; !found && i <= k; ++i) { found = n % i === 0; } if (!found) { postMessage(n.toString()); } } })(2, 1e10); 

Классически веб-работник (предполагается, что он находится в файле prime.js ) включается следующим образом. Для краткости мы пропускаем проверки поддержки веб-работника и законности возвращаемого результата.

 var worker = new Worker('prime.js'); worker.addEventListener('message', function (ev) { var primeNumber = ev.data * 1; console.log(primeNumber); }, false); 

Более подробную информацию о веб-работниках и многопоточности с помощью JavaScript можно найти в статье Parallel JavaScript with Parallel.js .

Рассматривая приведенный выше пример, мы знаем, что простые числа следуют асимптотическому распределению среди натуральных чисел. Для x на ∞ получаем распределение x / log(x) . Это означает, что мы увидим больше чисел в начале. Здесь чеки также намного дешевле (т. Е. В начале мы получаем гораздо больше простых чисел за единицу времени, чем позже).

Это можно проиллюстрировать простой временной осью и блобами для результатов:

Шариковая диаграмма, показывающая распределение простых чисел

Несвязанный, но похожий пример можно привести, посмотрев на ввод пользователя в поле поиска. Первоначально пользователь может с энтузиазмом вводить что-то для поиска; однако, чем более конкретным становится его запрос, тем больше становится разница во времени между нажатиями клавиш. Предоставление возможности показывать результаты в реальном времени, безусловно, желательно, чтобы помочь пользователю сузить свой запрос. Однако мы не хотим выполнять запрос на каждое нажатие клавиши, тем более что первые будут выполняться очень быстро, без размышлений или необходимости специализироваться.

В обоих сценариях ответ заключается в агрегировании предыдущих событий за определенный промежуток времени. Разница между двумя описанными сценариями заключается в том, что простые числа всегда должны показываться после заданного временного интервала (т. Е. Некоторые из простых чисел просто потенциально задерживаются при представлении). В отличие от этого, поисковый запрос будет запускать новый запрос только в том случае, если в течение указанного интервала не было нажатий клавиш. Следовательно, таймер сбрасывается после обнаружения нажатия клавиши.

RxJS на помощь

Rx — это библиотека для составления асинхронных и событийных программ с использованием наблюдаемых коллекций. Он хорошо известен своим декларативным синтаксисом и компоновкой, в то же время вводя простую модель обработки времени и ошибок. Думая о наших предыдущих примерах, мы особенно заинтересованы в обработке времени. Тем не менее, мы увидим, что в RxJS можно извлечь гораздо больше пользы.

Основными строительными блоками RxJS являются наблюдаемые (производители) и наблюдатели (потребители). Мы уже упоминали два типа наблюдаемых:

  • Горячие наблюдаемые появляются, даже когда мы не подписаны на них (например, события пользовательского интерфейса).
  • Холодные наблюдаемые начинают давить только тогда, когда мы подписываемся. Они начнутся заново, если мы подпишемся снова.

Холодные наблюдаемые обычно относятся к массивам или отдельным значениям, которые были преобразованы для использования в RxJS. Например, следующий код создает холодную наблюдаемую, которая дает только одно значение перед завершением:

 var observable = Rx.Observable.create(function (observer) { observer.onNext(42); observer.onCompleted(); }); 

Мы также можем вернуть функцию, содержащую логику очистки, из наблюдаемой функции создания.

Подписка на наблюдаемое не зависит от вида наблюдаемого. Для обоих типов мы можем предоставить три функции, которые удовлетворяют базовому требованию грамматики уведомлений, состоящей из onNext , onError и onCompleted . onNext вызов onNext является обязательным.

 var subscription = observable.subscribe( function (value) { console.log('Next: %s.', value); }, function (ev) { console.log('Error: %s!', ev); }, function () { console.log('Completed!'); } ); subscription.dispose(); 

Рекомендуется прекратить подписку, используя метод dispose . Это выполнит все необходимые шаги очистки. В противном случае может быть возможно предотвратить очистку неиспользуемых ресурсов сборкой мусора.

Без subscribe наблюдаемая, содержащаяся в переменной observable является просто холодной наблюдаемой. Тем не менее, также возможно преобразовать его в горячую последовательность (т. Е. Мы выполняем псевдоподписку), используя метод publish .

 var hotObservable = observable.publish(); 

Некоторые из помощников, содержащихся в RxJS, имеют дело только с преобразованием существующих структур данных. В JavaScript мы можем различать три из них:

  1. Обещания о возвращении единичных асинхронных результатов,
  2. Функции для единичных результатов и
  3. Генераторы для предоставления итераторов.

Последний является новым с ES6 и может быть заменен массивами (даже если это плохая замена и должен рассматриваться как одно значение) для ES5 или старше.

RxJS теперь вводит тип данных для обеспечения асинхронной поддержки нескольких (возвращаемых) значений. Поэтому четыре квадранта теперь заполнены.

Диаграмма, классифицирующая четыре типа данных RxJS

В то время как итераторы должны быть извлечены, значения наблюдаемых сдвигаются. Примером может служить поток событий, в котором мы не можем принудительно вызвать следующее событие. Мы можем только ждать уведомления о цикле событий.

 var array = [1,2,3,4,5]; var source = Rx.Observable.from(array); 

Большинство помощников, создающих или работающих с наблюдаемыми, также принимают планировщик, который контролирует, когда начинается подписка и когда публикуются уведомления. Мы не будем вдаваться в подробности, так как планировщик по умолчанию работает просто отлично для большинства практических целей.

Многие операторы в RxJS вводят параллелизм, такой как throttle , interval или delay . Теперь мы еще раз посмотрим на предыдущие примеры, где эти помощники становятся необходимыми.

Примеры

Во-первых, давайте посмотрим на наш генератор простых чисел. Мы хотели объединить результаты за определенный промежуток времени, чтобы пользовательский интерфейс (особенно в начале) не сталкивался с слишком большим количеством обновлений.

Здесь мы можем использовать buffer функцию RxJS в сочетании с ранее упомянутым помощником interval .

Результат должен быть представлен следующей диаграммой. Зеленые пятна появляются после определенного интервала времени (заданного временем, использованным для построения interval ). Буфер агрегирует все видимые синие пятна в течение такого интервала.

Шариковая диаграмма задержки простого числа

Кроме того, мы могли бы также представить map , которая помогает нам преобразовывать данные. Например, мы можем захотеть преобразовать полученные аргументы события, чтобы получить переданные данные в виде числа.

 var worker = new Worker('prime.js'); var observable = Rx.Observable.fromEvent(worker, 'message') .map(function (ev) { return ev.data * 1; }) .buffer(Rx.Observable.interval(500)) .where(function (x) { return x.length > 0; }) .map(function (x) { return x.length; }); 

Функция fromEvent создает наблюдаемые из любого объекта, используя стандартный шаблон fromEvent событий. buffer также будет возвращать массивы нулевой длины, поэтому мы ввели функцию where чтобы уменьшить поток до непустых массивов. Наконец, в этом примере нас интересует только количество сгенерированных простых чисел. Поэтому мы отображаем буфер для получения его длины.

Другим примером является окно поискового запроса, которое должно регулироваться для запуска запросов только после определенного времени простоя. Есть две функции, которые могут быть полезны в таком сценарии: Функция throttle выдает первую запись, замеченную в пределах определенного временного окна. Функция debounce возвращает последнюю запись в указанном временном окне. Временные окна также сдвигаются соответственно (то есть относительно первого / последнего элемента).

Мы хотим добиться поведения, отраженного на следующей диаграмме. Следовательно, мы собираемся использовать механизм debounce .

Шариковая диаграмма поискового запроса дроссельной заслонки

Мы хотим отбросить все предыдущие результаты и получить только последний до истечения временного окна. Предполагая, что поле ввода имеет query id, мы можем использовать следующий код:

 var q = document.querySelector('#query'); var observable = Rx.Observable.fromEvent(q, 'keyup') .debounce(300) .map(function (ev) { return ev.target.value; }) .where(function (text) { return text.length >= 3; }) .distinctUntilChanged() .map(searchFor) .switch() .where(function (obj) { return obj !== undefined; }); 

В этом коде окно установлено на 300 мс. Также мы ограничиваем запросы для значений как минимум 3 символами, которые отличаются от предыдущих запросов. Это устраняет ненужные запросы на входные данные, которые были только что исправлены путем ввода чего-либо и удаления его.

Во всем этом выражении есть две важные части. Одним из них является преобразование текста запроса в запрос с использованием searchFor , другим является функция switch () . Последний принимает любую функцию, которая возвращает вложенные наблюдаемые и выдает значения только из самой последней наблюдаемой последовательности.

Функция для создания запросов может быть определена следующим образом:

 function searchFor(text) { var xhr = new XMLHttpRequest(); xhr.open('GET', apibaseUrl + '?q=' + text, true); xhr.send(); return Rx.Observable.fromEvent(xhr, 'load').map(function (ev) { var request = ev.currentTarget; if (request.status === 200) { var response = request.responseText; return JSON.parse(response); } }); } 

Обратите внимание на вложенную наблюдаемость (которая может привести к undefined для недопустимых запросов), поэтому мы используем цепочку switch() и where() .

Выводы

RxJS делает реактивное программирование на JavaScript радостной реальностью. В качестве альтернативы есть также Bacon.js , который работает аналогично. Тем не менее, одна из лучших особенностей RxJS — это сам Rx, который доступен на многих платформах. Это делает переход на другие языки, платформы или системы довольно простым. Он также объединяет некоторые концепции реактивного программирования в наборе методов, которые являются краткими и составными. Кроме того, существует несколько очень полезных расширений, таких как RxJS-DOM , которые упрощают взаимодействие с DOM.

Где вы видите RxJS блеск?