Или приключение по чтению данных от Кассандры.
обзор
Давайте сначала попробуем определить, что означает реактивный с точки зрения программирования.
Функциональное реактивное программирование — это парадигма программирования реактивного программирования с использованием строительных блоков функционального программирования.
Функциональное программирование — это парадигма программирования, стиль построения структуры и элементов компьютерных программ, который рассматривает вычисления как оценку математических функций, которые избегают состояния и изменчивых данных. Функциональное программирование подчеркивает функции, которые дают результаты, которые зависят только от их входных данных, а не от состояния программы.
Как мы можем заниматься функциональным программированием на Java? Java — это объектно-ориентированный язык программирования, где изменчивое состояние присутствует везде.
Любой Java-разработчик по всему миру использовал любой из интерфейсов:
java.lang.Runnable, java.util.Comparator, java.util.concurrent.Callable или java.awt.event.ActionListener. Все эти интерфейсы имеют только один объявленный метод. Эти интерфейсы известны как Single Abstract Methods или SAM. Популярный способ их использования — создание анонимных внутренних классов.
01
02
03
04
05
06
07
08
09
10
|
public class RunnableTest { public static void main(Sting[] args){ new Thread( new Runnable(){ @Override public void run(){ System.out.println( "A new thread is running ..." ); } }).start(); } } |
Функциональное программирование на Java сложно, так как функция не включена в спецификацию языка. Это станет проще в Java 8 с введением «лямбды». Но как мы можем сделать функциональное программирование на Java?
Давайте посмотрим на простой пример.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
|
@FunctionalInterface public interface Worker { public void doWork(); } public class FunctionalWorker { public static void main(String[] args){ // anonymous inner class way execute( new Worker(){ @Override public void doWork() { System.out.println ( "working ..." ); } }); // lambda's way execute(() -> System.out.println( "working in lambda's way ..." )); } public static void execute(Worker worker){ worker.doWork(); } } |
Реактивное программирование — это парадигма программирования, ориентированная на потоки данных и распространение изменений. Например, в настройках императивного программирования a: = b + c будет означать, что a присваивается результат b + c в момент вычисления выражения. Более поздние значения b или c могут быть изменены без влияния на a. В реактивном программировании значение a будет автоматически обновляться на основе новых значений.
Итак, мы должны хорошо понимать, что такое функционально-реактивное программирование, так что давайте пойдем и создадим прототип …
Чтение данных реактивно с Кассандры
Cassandra — одно из хранилищ NoSql, которое пользуется большой популярностью.
Давайте представим, что мы должны построить сервис Avatar. Этот сервис будет хранить метаинформацию аватаров и контент непосредственно в Кассандре.
Используемый нами драйвер java предоставляет нам поддержку для асинхронного запроса cassandra с помощью метода executeAsync () . Вызов этого метода вернет Future. Как все мы знаем, Java-фьючерсы блокируемы и не могут быть составлены.
Итак, у нас есть асинхронная поддержка, но нам все еще нужен способ, чтобы иметь возможность читать ее реактивно …
Netflix построил и позже открыл исходный код библиотеки RxJava, которая обеспечивает функциональное реактивное программирование для Java (плюс другие языки JVM).
Функциональный реактив предлагает эффективное выполнение и компоновку, предоставляя набор операторов, способных фильтровать, выбирать, преобразовывать, комбинировать и составлять Observable.
Тип данных Observable можно рассматривать как «push» эквивалентный Iterable, который «pull». При использовании Iterable потребитель извлекает значения из блока производителя и потока, пока эти значения не поступят. В отличие от типа Observable, производитель отправляет значения потребителю, когда они доступны. Этот подход является более гибким, поскольку значения могут поступать синхронно или асинхронно.
Тип Observable добавляет две отсутствующие семантики в шаблон Gang of Four Observer , которые доступны в типе Iterable:
- Способность производителя сигнализировать потребителю, что больше нет доступных данных.
- Способность производителя сигнализировать потребителю, что произошла ошибка.
С помощью этих двух простых дополнений мы объединили типы Iterable и Observable. Единственная разница между ними — это направление, в котором передаются данные. Это очень важно, потому что теперь любая операция, которую мы выполняем на Iterable, также может быть выполнена на Observable.
Давайте посмотрим, как мы можем объединить выполнение асинхронных запросов RxJava и Cassandra для создания Observable.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
package net.devsprint.reactive.cassandra; import java.util.concurrent.ExecutorService; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; /** * Wraps an async execution of Datastax Java driver into an observable. * */ public class ObservableCassandra { public static Observable executeAsync( final Session execution, final String query, final ExecutorService executorService) { return Observable.create( new Observable.OnSubscribeFunc() { @Override public Subscription onSubscribe( final Observer observer) { try { Futures.addCallback(execution.executeAsync(query), new FutureCallback() { @Override public void onSuccess(ResultSet result) { observer.onNext(result); observer.onCompleted(); } @Override public void onFailure(Throwable t) { observer.onError(t); } }, executorService); } catch (Throwable e) { // If any Throwable can be thrown from // executeAsync observer.onError(e); } return Subscriptions.empty(); } }); } } |
Метод executeAsync () возвращает прослушиваемое будущее Guava . Добавление обратного вызова в этом будущем позволяет нам правильно реализовать интерфейс Observer .
Простой сервис может быть реализован следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package net.devsprint.reactive.cassandra; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import rx.Observable; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; public class AvatarService { private static final String QUERY = "select * avatars" ; private static final ExecutorService executorService = Executors .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private final Session session; public AvatarService(Session session) { this .session = session; } Observable getAvatars() { return ObservableCassandra .executeAsync(session, QUERY, executorService); } } |
Предполагая, что запрос тяжелый, мы предоставляем отдельный контекст выполнения, в котором будет выполняться обратный вызов.
С этими двумя классами у нас есть служба Avatar, которая может быть запущена, но она ничего не сделает. Он начнет получать данные с Кассандры только тогда, когда будет хотя бы один подписчик. Полный пример можно найти на Reactive Cassandra .