В этой статье будут затронуты несколько тем: реактивное программирование, HTTP, анализ JSON и интеграция с социальным API. Все в одном сценарии использования: мы будем загружать и обрабатывать новые события meetup.com в режиме реального времени с помощью неблокирующей библиотеки RxNetty , сочетая мощь инфраструктуры Netty и гибкость библиотеки RxJava . Meetup предоставляет общедоступный потоковый API, который в режиме реального времени передает каждую отдельную Meetup, зарегистрированную по всему миру. Просто зайдите на stream.meetup.com/2/open_events и посмотрите, как медленно появляются фрагменты JSON на вашем экране. Каждый раз, когда кто-то создает новое событие, самосодержащий JSON отправляется с сервера в ваш браузер. Это означает, что такой запрос никогда не заканчивается, вместо этого мы продолжаем получать частичные данные столько, сколько мы хотим. Мы уже рассматривали подобный сценарий в Превращении Twitter4J в Наблюдаемую RxJava . Каждое новое событие встречи публикует отдельный документ JSON, похожий на этот (многие детали опущены):
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
{ "id" : "219088449" , "name" : "Silver Wings Brunch" , "time" : 1421609400000 , "mtime" : 1417814004321 , "duration" : 900000 , "rsvp_limit" : 0 , "status" : "upcoming" , "group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co" , "state" : "CA" ... }, "venue" : { "address_1" : "26860 Ortega Highway" , "city" : "San Juan Capistrano" , "country" : "US" ... }, "venue_visibility" : "public" , "visibility" : "public" , "yes_rsvp_count" : 1 ... } |
Каждый раз, когда наше HTTP-соединение с длинным опросом (с Transfer-Encoding: chunked
response header) отправляет такой фрагмент JSON, мы хотим его проанализировать и как-то пройти дальше. Мы ненавидим обратные вызовы, поэтому RxJava кажется разумной альтернативой (подумайте: Observable<Event>
).
Шаг 1: Получение необработанных данных с помощью RxNetty
Мы не можем использовать обычный HTTP-клиент, поскольку он сфокусирован на семантике запрос-ответ. Здесь нет ответа, мы просто оставляем открытое соединение навсегда и потребляем данные, когда оно приходит. RxJava имеет готовую библиотеку RxApacheHttp , но она принимает тип содержимого text/event-stream
. Вместо этого мы будем использовать довольно низкоуровневую универсальную библиотеку RxNetty. Это оболочка для Netty (да!) И способна реализовывать произвольные TCP / IP (включая HTTP) и UDP клиенты и серверы. Если вы не знаете Netty, он ориентирован на пакет, а не на поток, поэтому мы можем ожидать одно событие Netty для каждого нажатия Meetup. API, конечно, не прост, но имеет смысл, как только вы это сделаете:
01
02
03
04
05
06
07
08
09
10
11
|
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder( "stream.meetup.com" , 443 ) .pipelineConfigurator( new HttpClientPipelineConfigurator<>()) .withSslEngineFactory(DefaultFactories.trustAll()) .build(); final Observable<HttpClientResponse> responses = httpClient.submit(HttpClientRequest.createGet( "/2/open_events" )); final Observable byteBufs = responses.flatMap(AbstractHttpContentHolder::getContent); final Observable chunks = byteBufs.map(content -> content.toString(StandardCharsets.UTF_8)); |
Сначала мы создаем HttpClient
и настраиваем SSL (имейте в виду, что trustAll()
отношении серверных сертификатов, вероятно, не лучший trustAll()
параметр). Позже мы submit()
GET-запрос и получаем Observable<HttpClientResponse<ByteBuf>>
взамен. ByteBuf
— это абстракция Netty для ByteBuf
байтов, отправленных или полученных по проводам. Эта заметка немедленно расскажет нам о каждой части данных, полученных от Meetup. После извлечения ByteBuf
из ответа мы превращаем его в String
содержащую вышеупомянутый JSON. Пока все хорошо, это работает.
Шаг 2. Выравнивание пакетов с документами JSON
Netty очень мощная, потому что она не скрывает сложность, присущую неплотным абстракциям. Каждый раз, когда что-то принимается по протоколу TCP / IP, мы получаем уведомление. Вы можете полагать, что когда сервер отправляет 100 байтов, Netty на стороне клиента уведомит нас об этих полученных 100 байтах. Однако стек TCP / IP может свободно разделять и объединять данные, которые вы отправляете по проводам, тем более что предполагается, что это поток, поэтому то, как он разбивается на пакеты, не имеет значения. Это предостережение в значительной степени объясняется в документации Netty . Что это значит для нас? Когда Meetup отправляет одно событие, мы можем получить только одну String
в виде chunks
. Но точно так же он может быть разделен на произвольное количество пакетов, таким образом, chunks
будут испускать несколько String
. Еще хуже, если Meetup отправляет два события сразу за другим, они могут помещаться в один пакет. В этом случае chunks
будут генерировать одну String
с двумя независимыми документами JSON. На самом деле мы не можем предполагать какое-либо выравнивание между строками JSON и полученными сетевыми пакетами. Все, что мы знаем, это то, что отдельные документы JSON, представляющие события, разделяются символами новой строки. Удивительно, RxJavaString
официального дополнения RxJavaString
есть метод именно для этого:
1
|
Observable jsonChunks = StringObservable.split(chunks, "\n" ); |
На самом деле есть еще более простой StringObservable.byLine(chunks)
, но он использует зависящий от платформы конец строки. Что делает split()
лучше всего объясняется в официальной документации :
Теперь мы можем безопасно проанализировать каждую String
, jsonChunks
:
Шаг 3: Разбор JSON
Интересно, что этот шаг не так прост. Признаюсь, мне нравились времена WSDL, потому что я мог легко и предсказуемо генерировать модель Java, которая следует контракту веб-сервиса. JSON, особенно учитывая незначительное проникновение на рынок JSON-схемы , по сути является Диким Западом интеграции. Обычно вы остаетесь с неофициальной документацией или образцами запросов и ответов. Нет информации о типе или формате, поля обязательны для заполнения и т. Д. Кроме того, поскольку я неохотно работаю с картами карт (привет, коллеги-программисты Clojure), чтобы работать с REST-сервисами на основе JSON, я сам должен написать POJO для отображения. Ну, есть обходные пути. Сначала я взял один репрезентативный пример JSON, созданный потоковым API Meetup, и поместил его в src/main/json/meetup/event.json
. Затем я использовал jsonschema2pojo-maven-plugin
(также существуют версии Gradle и Ant ). Название плагина сбивает с толку, он также может работать с примером JSON, а не только со схемой, для создания моделей Java:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
< plugin > < groupId >org.jsonschema2pojo</ groupId > < artifactId >jsonschema2pojo-maven-plugin</ artifactId > < version >0.4.7</ version > < configuration > < sourceDirectory >${basedir}/src/main/json/meetup</ sourceDirectory > < targetPackage >com.nurkiewicz.meetup.generated</ targetPackage > < includeHashcodeAndEquals >true</ includeHashcodeAndEquals > < includeToString >true</ includeToString > < initializeCollections >true</ initializeCollections > < sourceType >JSON</ sourceType > < useCommonsLang3 >true</ useCommonsLang3 > < useJodaDates >true</ useJodaDates > < useLongIntegers >true</ useLongIntegers > < outputDirectory >target/generated-sources</ outputDirectory > </ configuration > < executions > < execution > < id >generate-sources</ id > < phase >generate-sources</ phase > < goals > < goal >generate</ goal > </ goals > </ execution > </ executions > </ plugin > |
На этом этапе Maven создаст совместимые с Джексоном Event.java
, Venue.java
, Group.java
и т. Д.
1
2
3
4
5
6
7
|
private Event parseEventJson(String jsonStr) { try { return objectMapper.readValue(jsonStr, Event. class ); } catch (IOException e) { throw new UncheckedIOException(e); } } |
Это просто работает, сладкий
1
|
final Observable<event> events = jsonChunks.map( this ::parseEventJson);</event> |
Шаг 4: ??? [1]
Шаг 5: ПРИБЫЛЬ !!!
Имея Observable<Event>
мы можем реализовать несколько действительно интересных вариантов использования. Хотите найти имена всех встреч в Польше, которые были только что созданы? Конечно!
1
2
3
4
5
|
events .filter(event -> event.getVenue() != null ) .filter(event -> event.getVenue().getCountry().equals( "pl" )) .map(Event::getName) .forEach(System.out::println); |
Ищете статистику, сколько событий создается в минуту? Нет проблем!
1
2
3
4
|
events .buffer( 1 , TimeUnit.MINUTES) .map(List::size) .forEach(count -> log.info( "Count: {}" , count)); |
Или, может быть, вы хотите постоянно искать самые дальние встречи в будущем, пропуская их ближе, чем уже найденные?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
events .filter(event -> event.getTime() != null ) .scan( this ::laterEventFrom) .distinct() .map(Event::getTime) .map(Instant::ofEpochMilli) .forEach(System.out::println); //... private Event laterEventFrom(Event first, Event second) { return first.getTime() > second.getTime() ? first : second; } |
Этот код отфильтровывает события без известного времени, генерирует либо текущее, либо предыдущее ( scan()
), в зависимости от того, какое событие было позже, отфильтровывает дубликаты и отображает время. Эта крошечная программа, работающая в течение нескольких минут, уже нашла одну только что созданную встречу, запланированную на ноябрь 2015 года — и на момент написания этой статьи это декабрь 2014 года. Возможности бесконечны.
Я надеюсь, что дал вам хорошее представление о том, как вы можете легко объединить различные технологии: реактивное программирование для написания сверхбыстрого сетевого кода, безопасный для анализа тип JSON без стандартного кода и RxJava для быстрой обработки потоков событий. Наслаждайтесь!
Ссылка: | Доступ к потоковому API Meetup с помощью RxNetty от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей . |