В этой статье будут затронуты несколько тем: реактивное программирование, HTTP, анализ JSON и интеграция с социальным API. Все в одном сценарии использования: мы будем загружать и обрабатывать новые события meetup.com в режиме реального времени с помощью неблокирующей библиотеки RxNetty , сочетая мощь инфраструктуры Netty и гибкость библиотеки RxJava . Meetup предоставляет общедоступный потоковый API, который в режиме реального времени передает каждую отдельную Meetup, зарегистрированную по всему миру. Просто зайдите на stream.meetup.com/2/open_events и посмотрите, как медленно появляются фрагменты JSON на вашем экране. Каждый раз, когда кто-то создает новое событие, самосодержащий JSON отправляется с сервера в ваш браузер. Это означает, что такой запрос никогда не заканчивается, вместо этого мы продолжаем получать частичные данные столько, сколько мы хотим. Мы уже рассматривали подобный сценарий в Превращении Twitter4J в Наблюдаемую RxJava . Каждое новое событие встречи публикует отдельный документ JSON, похожий на этот (многие детали опущены):
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
{ "id" : "219088449", "name" : "Silver Wings Brunch", "time" : 1421609400000, "mtime" : 1417814004321, "duration" : 900000, "rsvp_limit" : 0, "status" : "upcoming", "group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co", "state" : "CA" ... }, "venue" : { "address_1" : "26860 Ortega Highway", "city" : "San Juan Capistrano", "country" : "US" ... }, "venue_visibility" : "public", "visibility" : "public", "yes_rsvp_count" : 1 ...} |
Каждый раз, когда наше HTTP-соединение с длинным опросом (с Transfer-Encoding: chunked response header) отправляет такой фрагмент JSON, мы хотим его проанализировать и как-то пройти дальше. Мы ненавидим обратные вызовы, поэтому RxJava кажется разумной альтернативой (подумайте: Observable<Event> ).
Шаг 1: Получение необработанных данных с помощью RxNetty
Мы не можем использовать обычный HTTP-клиент, поскольку он сфокусирован на семантике запрос-ответ. Здесь нет ответа, мы просто оставляем открытое соединение навсегда и потребляем данные, когда оно приходит. RxJava имеет готовую библиотеку RxApacheHttp , но она принимает тип содержимого text/event-stream . Вместо этого мы будем использовать довольно низкоуровневую универсальную библиотеку RxNetty. Это оболочка для Netty (да!) И способна реализовывать произвольные TCP / IP (включая HTTP) и UDP клиенты и серверы. Если вы не знаете Netty, он ориентирован на пакет, а не на поток, поэтому мы можем ожидать одно событие Netty для каждого нажатия Meetup. API, конечно, не прост, но имеет смысл, как только вы это сделаете:
|
01
02
03
04
05
06
07
08
09
10
11
|
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443) .pipelineConfigurator(new HttpClientPipelineConfigurator<>()) .withSslEngineFactory(DefaultFactories.trustAll()) .build(); final Observable<HttpClientResponse> responses = httpClient.submit(HttpClientRequest.createGet("/2/open_events"));final Observable byteBufs = responses.flatMap(AbstractHttpContentHolder::getContent);final Observable chunks = byteBufs.map(content -> content.toString(StandardCharsets.UTF_8)); |
Сначала мы создаем HttpClient и настраиваем SSL (имейте в виду, что trustAll() отношении серверных сертификатов, вероятно, не лучший trustAll() параметр). Позже мы submit() GET-запрос и получаем Observable<HttpClientResponse<ByteBuf>> взамен. ByteBuf — это абстракция Netty для ByteBuf байтов, отправленных или полученных по проводам. Эта заметка немедленно расскажет нам о каждой части данных, полученных от Meetup. После извлечения ByteBuf из ответа мы превращаем его в String содержащую вышеупомянутый JSON. Пока все хорошо, это работает.
Шаг 2. Выравнивание пакетов с документами JSON
Netty очень мощная, потому что она не скрывает сложность, присущую неплотным абстракциям. Каждый раз, когда что-то принимается по протоколу TCP / IP, мы получаем уведомление. Вы можете полагать, что когда сервер отправляет 100 байтов, Netty на стороне клиента уведомит нас об этих полученных 100 байтах. Однако стек TCP / IP может свободно разделять и объединять данные, которые вы отправляете по проводам, тем более что предполагается, что это поток, поэтому то, как он разбивается на пакеты, не имеет значения. Это предостережение в значительной степени объясняется в документации Netty . Что это значит для нас? Когда Meetup отправляет одно событие, мы можем получить только одну String в виде chunks . Но точно так же он может быть разделен на произвольное количество пакетов, таким образом, chunks будут испускать несколько String . Еще хуже, если Meetup отправляет два события сразу за другим, они могут помещаться в один пакет. В этом случае chunks будут генерировать одну String с двумя независимыми документами JSON. На самом деле мы не можем предполагать какое-либо выравнивание между строками JSON и полученными сетевыми пакетами. Все, что мы знаем, это то, что отдельные документы JSON, представляющие события, разделяются символами новой строки. Удивительно, RxJavaString официального дополнения RxJavaString есть метод именно для этого:
|
1
|
Observable jsonChunks = StringObservable.split(chunks, "\n"); |
На самом деле есть еще более простой StringObservable.byLine(chunks) , но он использует зависящий от платформы конец строки. Что делает split() лучше всего объясняется в официальной документации :
Теперь мы можем безопасно проанализировать каждую String , jsonChunks :
Шаг 3: Разбор JSON
Интересно, что этот шаг не так прост. Признаюсь, мне нравились времена WSDL, потому что я мог легко и предсказуемо генерировать модель Java, которая следует контракту веб-сервиса. JSON, особенно учитывая незначительное проникновение на рынок JSON-схемы , по сути является Диким Западом интеграции. Обычно вы остаетесь с неофициальной документацией или образцами запросов и ответов. Нет информации о типе или формате, поля обязательны для заполнения и т. Д. Кроме того, поскольку я неохотно работаю с картами карт (привет, коллеги-программисты Clojure), чтобы работать с REST-сервисами на основе JSON, я сам должен написать POJO для отображения. Ну, есть обходные пути. Сначала я взял один репрезентативный пример JSON, созданный потоковым API Meetup, и поместил его в src/main/json/meetup/event.json . Затем я использовал jsonschema2pojo-maven-plugin (также существуют версии Gradle и Ant ). Название плагина сбивает с толку, он также может работать с примером JSON, а не только со схемой, для создания моделей Java:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
<plugin> <groupId>org.jsonschema2pojo</groupId> <artifactId>jsonschema2pojo-maven-plugin</artifactId> <version>0.4.7</version> <configuration> <sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory> <targetPackage>com.nurkiewicz.meetup.generated</targetPackage> <includeHashcodeAndEquals>true</includeHashcodeAndEquals> <includeToString>true</includeToString> <initializeCollections>true</initializeCollections> <sourceType>JSON</sourceType> <useCommonsLang3>true</useCommonsLang3> <useJodaDates>true</useJodaDates> <useLongIntegers>true</useLongIntegers> <outputDirectory>target/generated-sources</outputDirectory> </configuration> <executions> <execution> <id>generate-sources</id> <phase>generate-sources</phase> <goals> <goal>generate</goal> </goals> </execution> </executions></plugin> |
На этом этапе Maven создаст совместимые с Джексоном Event.java , Venue.java , Group.java и т. Д.
|
1
2
3
4
5
6
7
|
private Event parseEventJson(String jsonStr) { try { return objectMapper.readValue(jsonStr, Event.class); } catch (IOException e) { throw new UncheckedIOException(e); }} |
Это просто работает, сладкий
|
1
|
final Observable<event> events = jsonChunks.map(this::parseEventJson);</event> |
Шаг 4: ??? [1]
Шаг 5: ПРИБЫЛЬ !!!
Имея Observable<Event> мы можем реализовать несколько действительно интересных вариантов использования. Хотите найти имена всех встреч в Польше, которые были только что созданы? Конечно!
|
1
2
3
4
5
|
events .filter(event -> event.getVenue() != null) .filter(event -> event.getVenue().getCountry().equals("pl")) .map(Event::getName) .forEach(System.out::println); |
Ищете статистику, сколько событий создается в минуту? Нет проблем!
|
1
2
3
4
|
events .buffer(1, TimeUnit.MINUTES) .map(List::size) .forEach(count -> log.info("Count: {}", count)); |
Или, может быть, вы хотите постоянно искать самые дальние встречи в будущем, пропуская их ближе, чем уже найденные?
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
events .filter(event -> event.getTime() != null) .scan(this::laterEventFrom) .distinct() .map(Event::getTime) .map(Instant::ofEpochMilli) .forEach(System.out::println); //... private Event laterEventFrom(Event first, Event second) { return first.getTime() > second.getTime() ? first : second;} |
Этот код отфильтровывает события без известного времени, генерирует либо текущее, либо предыдущее ( scan() ), в зависимости от того, какое событие было позже, отфильтровывает дубликаты и отображает время. Эта крошечная программа, работающая в течение нескольких минут, уже нашла одну только что созданную встречу, запланированную на ноябрь 2015 года — и на момент написания этой статьи это декабрь 2014 года. Возможности бесконечны.
Я надеюсь, что дал вам хорошее представление о том, как вы можете легко объединить различные технологии: реактивное программирование для написания сверхбыстрого сетевого кода, безопасный для анализа тип JSON без стандартного кода и RxJava для быстрой обработки потоков событий. Наслаждайтесь!
| Ссылка: | Доступ к потоковому API Meetup с помощью RxNetty от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей . |
