Первоначально написано Майкл Nitschinger
В этом блоге объясняется, почему мы выбираем RxJava в качестве одного из неотъемлемых компонентов нашего нового Java SDK.
мотивация
Существует множество способов разработки API, и у каждого есть свой набор преимуществ (и недостатков). В процессе разработки наших совершенно новых API-интерфейсов одним из главных вопросов было то, как мы представляем его пользователю.
Один вопрос, который мы не должны были задавать себе, был: он должен быть синхронным или асинхронным? Мы твердо убеждены, что асинхронные API-интерфейсы — это единственный разумный способ получить производительность и масштабируемость, которые вам часто нужны, а также гораздо проще перейти от асинхронной синхронизации к другой, чем наоборот. Текущий стабильный SDK (1.4.3 на момент написания статьи) уже интенсивно использует Futures различными способами для предоставления асинхронных ответов, и это восходит к 2006/7 году, когда spymemcached изначально ввел эту концепцию в свой API.
Хорошо известно, что интерфейс Java Future очень ограничен по сравнению с другими решениями (такими как Scala Futures). Кроме того, это также немного сложнее для кода, если вам нужно построить асинхронные потоки данных, где одно вычисление зависит от другого, и вы хотите, чтобы все это было асинхронным. В последних версиях мы добавили поддержку слушателей, которые немного улучшают ситуацию, но все еще не являются идеальным решением.
За последние несколько лет появились другие библиотеки и шаблоны, которые мы внимательно следили. Одна из зрелых концепций известна как Reactive Extensions, происходящих из Microsoft и .NET. Он основан на идее, что приложения должны быть ориентированы на события и реагировать на эти события асинхронно. Он определяет очень богатый набор операторов для того, что вы можете делать с данными (модифицировать, комбинировать, фильтровать и так далее). Недавно Netflix перенес его на Java и назвал его RxJava (обратите внимание, что хотя проект в настоящее время находится в пространстве имен Netflix, он будет перемещен в «io.reactivex» раньше, чем позже). Он очень стабилен и также предоставляет адаптеры для других языков JVM, таких как Scala, Groovy и JRuby, что хорошо согласуется с нашими планами по расширению поддержки.
Концепт
Основная идея Rx вращается вокруг Observables и ее наблюдателей. Если вы не сталкивались с этой концепцией, вы можете думать о Observable как об асинхронном двоюродном брате, основанном на толчке (или, более формально, называемом двойным), в Iterable. Более конкретно, вот их отношение:
Мероприятие | Повторяемый (тянуть) | Наблюдаемый (толчок) |
---|---|---|
восстановить данные | T следующий () | onNext (Т) |
обнаружить ошибку | исключение | OnError (Исключение) |
полный | возвращается | OnCompleted () |
Каждый раз, когда данные помещаются в Observable, каждое наблюдение, подписанное на него, получает данные в своем методе onNext (). Если наблюдаемое в конце концов завершено (что не всегда должно быть так). вызывается метод onCompleted. Теперь в любом месте процесса, если происходит ошибка, вызывается метод onError и Observable также считается завершенным.
Если вам нравится грамматика, контракт выглядит так:
OnNext* (OnCompleted | OnError)?
В частности, обратите внимание, что нет различий, если возвращается только 1 или N данных, это обычно можно сделать из методов, которые вы вызываете, и того, как они документированы. В любом случае это не меняет ваш процесс программирования. Поскольку это немного абстрактно, давайте рассмотрим конкретный пример. В классе CouchbaseCluster есть метод openBucket, который инициализирует все необходимые ресурсы, а затем возвращает экземпляр Bucket для работы с вами. Теперь вы можете представить, что открытие сокетов, получение конфигурации и т. Д. Занимает некоторое время, так что это идеальный кандидат. API блокировки будет выглядеть так:
interface Cluster { Bucket openBucket(String name, String password); }
Как мы можем сделать это асинхронным? Нам нужно обернуть это в Observable:
interface Cluster { Observable<Bucket> openBucket(String name, String password); }
Итак, теперь мы возвращаем наблюдаемую, которая в конечном итоге вернется с экземпляром корзины, который мы можем использовать. Давайте добавим наблюдателя:
cluster.openBucket().subscribe(new Observer<Bucket>() { @Override public void onCompleted() { System.out.println("Observable done!"); } @Override public void onError(Throwable e) { System.err.println("Something happened"); e.printStackTrace(); } @Override public void onNext(Bucket bucket) { System.out.println("Received bucket: " + bucket); } });
Обратите внимание, что эти методы вызываются в другом потоке, поэтому если вы оставите код, подобный этому, и выйдете из основного потока позже, вы, вероятно, ничего не увидите. Хотя теперь вы можете написать весь остальной код в методе onNext, это, вероятно, не лучший способ сделать это. Поскольку корзина
хочет открыть заранее, вы можете заблокировать ее, а затем продолжить работу с остальным кодом. Каждая наблюдаемая может быть преобразована в наблюдаемую блокировку, которая выглядит как итерируемая:
BlockingObservable<Bucket> blockingObservable = cluster.openBucket().toBlocking();
Вы найдете много методов для итерации по полученному thata блокирующим способом, но есть и сокращенные методы, если вы ожидаете только одно единственное значение (что, как мы знаем, имеет место для нас):
Bucket bucket = cluster.openBucket().toBlocking().single();
Внутри у нас происходит то, что значение, вызываемое в onNext, сохраняется для нас и возвращается после вызова onComplete. если вызывается onError, метод throwable генерируется напрямую, и вы можете его поймать.
Объединяющие API
Теперь то, что вы уже видели, едва касается поверхности. Открытие ковша вполне может быть выполнено и с помощью одного Future <Bucket>. Observables начинают сиять, когда вам нужно работать с более чем одним возвращенным результатом. В этом случае Future <T> больше не отвечает требованиям, а Future <Collection <T >> или что-то подобное не имеет такого же контракта. Поскольку Observables подразумевают, что может быть возвращено более одного T, API могут выглядеть одинаково, даже если иногда возвращается один, а иногда более одного Ts.
Опять же, давайте посмотрим на конкретный пример. SDK предоставляет метод get, который возвращает один документ. Это выглядит так:
interface Bucket { Observable<JsonDocument> get(String id); }
Но мы также поддерживаем Querying (Views, N1QL), который потенциально возвращает более одного результата (или даже никакого). Благодаря контракту Observable мы можем создать API-интерфейс следующим образом:
interface Bucket { Observable<ViewResult> query(ViewQuery query); }
Увидеть? Контракт неявно говорит: «Если вы передадите запрос, вы получите N ViewResults обратно», поскольку вы знаете, как должен вести себя Observable. И для большей картины, есть еще больше методов, которые интуитивно ведут себя так, как вы ожидаете.
interface Bucket { <D extends Document<?>> Observable<D> insert(D document); <D extends Document<?>> Observable<D> upsert(D document); <D extends Document<?>> Observable<D> replace(D document); Observable<ViewResult> query(ViewQuery query); Observable<QueryResult> query(Query query); Observable<QueryResult> query(String query); Observable<Boolean> flush(); }
Асинхронизируйте мой поток данных!
До сих пор мы видели, что Observables могут сделать для нас и как они помогают нам в создании связных, простых и в то же время асинхронных API. Но Observables действительно сияют своими аспектами сочетаемости. С помощью Observables вы можете многое сделать, и мы не можем рассказать обо всех в этом посте. RxJava имеет очень хорошую справочную документацию, которую можно найти здесь, так что проверьте ее. Он использует мраморные диаграммы, чтобы показать, как работают асинхронные потоки данных, а также то, что мы хотим предоставить как часть нашей документации в будущем.
Давайте рассмотрим практический пример: вы хотите загрузить документ из couchbase (который представляет собой полноценный JSON-объект с подробностями пользователя), но вы просто хотите сделать что-то с именем ниже в вашем коде. Мы можем использовать функцию map для сопоставления из JsonDocument с именем String:
bucket .get("user::1") .map(new Func1<JsonDocument, String>() { @Override public String call(JsonDocument jsonDocument) { return jsonDocument.content().getString("firstname"); } }) .subscribe(new Action1<String>() { @Override public void call(String firstname) { System.out.println(firstname); } });
Здесь есть два важных аспекта: каждый цепочечный метод также выполняется асинхронно, поэтому он не блокирует исходящий поток. Как только вызов get против couchbase возвращается, мы сопоставляем имя из документа JSON и, наконец, распечатываем его. Вам не нужно предоставлять полноценный Observer, если вас интересует только значение onNext, вы можете просто реализовать его (как показано здесь). Смотрите перегруженные методы для большего количества примеров.
Также обратите внимание, что здесь я намеренно показываю анонимные классы в стиле Java 6/7. Мы также поддерживаем Java 8, но об этом позже. Теперь, как мы можем расширить эту цепочку, если мы хотим распечатать имя, только если оно начинается с «а»?
bucket .get("user::1") .map(new Func1<JsonDocument, String>() { @Override public String call(JsonDocument jsonDocument) { return jsonDocument.content().getString("firstname"); } }) .filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s.startsWith("a"); } }) .subscribe(new Action1<String>() { @Override public void call(String firstname) { System.out.println(firstname); } });
Конечно, достаточно простого оператора if, но вы можете представить, что ваш код для фильтрации может быть гораздо более сложным (и, вероятно, вызывать что-то еще). В качестве последнего примера преобразования наблюдаемых, мы собираемся сделать кое-что, что происходит очень часто: вы загружаете документ, изменяете его содержимое, а затем сохраняете его обратно в couchbase:
bucket .get("user::1") .map(new Func1<JsonDocument, JsonDocument>() { @Override public JsonDocument call(JsonDocument original) { original.content().put("firstname", "SomethingElse"); return original; } }) .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() { @Override public Observable<JsonDocument> call(JsonDocument modified) { return bucket.replace(modified); } }).subscribe();
FlatMap behaves very much like map, the difference is that it returns an observable itself, so its perfectly suited to map over asynchronous operations.
One other aspect is that with Observables, sophisticated error handling is right at your fingertips. Let’s implement an example which applies a timeout of 2 seconds and if the call does not return hands back something else instead:
bucket .get("user::1") .timeout(2, TimeUnit.SECONDS) .onErrorReturn(new Func1<Throwable, JsonDocument>() { @Override public JsonDocument call(Throwable throwable) { return JsonDocument.create("user::anonymous", JsonObject.empty().put("firstname", "john-doe")); } });
Here a dummy document is returned (pretending some reasonable defaults for our example) if the get call does not return in 2 seconds. This is just a simple example, but you can do a lot with exceptions, like retrying, branching out to other observables and so forth. Please refer to the official documentation (and Rx’s documentation) for how to use them properly.
Wait, there is more
There are much more features available, like combining (merging, zipping, concat) of different observables, batching the results up in time intervals, do side-effects and others. Once you get over the initial (small) hurdle of understanding the concept, it feels very natural and we promise you don’t want to go back (if we are wrong, though, you can always block on an Observable or convert it into a future).
RxJava also has decent Java 8 support, so if you are a lucky one who is able to use it in their projects already you can simplify an example from above to this:
bucket .get("user::1") .map(jsonDocument -> jsonDocument.content().getString("firstname")) .filter(s -> s.startsWith("a")) .subscribe(System.out::println);
Neat, right? RxJava also provides different language adaptors on top of it, at the time of writing Scala, Clojure, Groovy, JRuby and Kotlin. They can be used to provide even more language-specific integration and we are also planning to use some of them to enhance couchbase support for each of those languages as we see demand. Our topmost priority aside from the java SDK is definitely Scala, so be on the lookout for some announcements sooner than later!
We hope that you are now as excited as we are and looking forward to your feedback and questions through the usual channels!