Статьи

Доступ к потоковому API Meetup с помощью RxNetty

В этой статье будут затронуты несколько тем: реактивное программирование, HTTP, анализ JSON и интеграция с социальным API. Все в одном сценарии использования: мы будем загружать и обрабатывать новые события meetup.com в режиме реального времени с помощью неблокирующей библиотеки RxNetty , сочетая мощь инфраструктуры Netty и гибкость библиотеки RxJava . Meetup предоставляет общедоступный потоковый API, который в режиме реального времени передает каждую отдельную Meetup, зарегистрированную по всему миру. Просто зайдите на stream.meetup.com/2/open_events и посмотрите, как медленно появляются фрагменты JSON на вашем экране. Каждый раз, когда кто-то создает новое событие, самосодержащий JSON отправляется с сервера в ваш браузер. Это означает, что такой запрос никогда не заканчивается, вместо этого мы продолжаем получать частичные данные столько, сколько мы хотим. Мы уже рассматривали подобный сценарий в Превращении Twitter4J в Наблюдаемую RxJava . Каждое новое событие встречи публикует отдельный документ JSON, похожий на этот (многие детали опущены):

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
{ "id" : "219088449",
  "name" : "Silver Wings Brunch",
  "time" : 1421609400000,
  "mtime" : 1417814004321,
  "duration" : 900000,
  "rsvp_limit" : 0,
  "status" : "upcoming",
  "group" : { "name" : "Former Flight Attendants South Orange and North San Diego Co",
              "state" : "CA"
              ...
  },
  "venue" : { "address_1" : "26860 Ortega Highway",
              "city" : "San Juan Capistrano",
              "country" : "US"
              ...
  },
  "venue_visibility" : "public",
  "visibility" : "public",
  "yes_rsvp_count" : 1
  ...
}

Каждый раз, когда наше HTTP-соединение с длинным опросом (с Transfer-Encoding: chunked response header) отправляет такой фрагмент JSON, мы хотим его проанализировать и как-то пройти дальше. Мы ненавидим обратные вызовы, поэтому RxJava кажется разумной альтернативой (подумайте: Observable<Event> ).

Шаг 1: Получение необработанных данных с помощью RxNetty

Мы не можем использовать обычный HTTP-клиент, поскольку он сфокусирован на семантике запрос-ответ. Здесь нет ответа, мы просто оставляем открытое соединение навсегда и потребляем данные, когда оно приходит. RxJava имеет готовую библиотеку RxApacheHttp , но она принимает тип содержимого text/event-stream . Вместо этого мы будем использовать довольно низкоуровневую универсальную библиотеку RxNetty. Это оболочка для Netty (да!) И способна реализовывать произвольные TCP / IP (включая HTTP) и UDP клиенты и серверы. Если вы не знаете Netty, он ориентирован на пакет, а не на поток, поэтому мы можем ожидать одно событие Netty для каждого нажатия Meetup. API, конечно, не прост, но имеет смысл, как только вы это сделаете:

01
02
03
04
05
06
07
08
09
10
11
HttpClient<ByteBuf, ByteBuf> httpClient = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("stream.meetup.com", 443)
        .pipelineConfigurator(new HttpClientPipelineConfigurator<>())
        .withSslEngineFactory(DefaultFactories.trustAll())
        .build();
  
final Observable<HttpClientResponse> responses =
    httpClient.submit(HttpClientRequest.createGet("/2/open_events"));
final Observable byteBufs =
    responses.flatMap(AbstractHttpContentHolder::getContent);
final Observable chunks =
    byteBufs.map(content -> content.toString(StandardCharsets.UTF_8));

Сначала мы создаем HttpClient и настраиваем SSL (имейте в виду, что trustAll() отношении серверных сертификатов, вероятно, не лучший trustAll() параметр). Позже мы submit() GET-запрос и получаем Observable<HttpClientResponse<ByteBuf>> взамен. ByteBuf — это абстракция Netty для ByteBuf байтов, отправленных или полученных по проводам. Эта заметка немедленно расскажет нам о каждой части данных, полученных от Meetup. После извлечения ByteBuf из ответа мы превращаем его в String содержащую вышеупомянутый JSON. Пока все хорошо, это работает.

Шаг 2. Выравнивание пакетов с документами JSON

Netty очень мощная, потому что она не скрывает сложность, присущую неплотным абстракциям. Каждый раз, когда что-то принимается по протоколу TCP / IP, мы получаем уведомление. Вы можете полагать, что когда сервер отправляет 100 байтов, Netty на стороне клиента уведомит нас об этих полученных 100 байтах. Однако стек TCP / IP может свободно разделять и объединять данные, которые вы отправляете по проводам, тем более что предполагается, что это поток, поэтому то, как он разбивается на пакеты, не имеет значения. Это предостережение в значительной степени объясняется в документации Netty . Что это значит для нас? Когда Meetup отправляет одно событие, мы можем получить только одну String в виде chunks . Но точно так же он может быть разделен на произвольное количество пакетов, таким образом, chunks будут испускать несколько String . Еще хуже, если Meetup отправляет два события сразу за другим, они могут помещаться в один пакет. В этом случае chunks будут генерировать одну String с двумя независимыми документами JSON. На самом деле мы не можем предполагать какое-либо выравнивание между строками JSON и полученными сетевыми пакетами. Все, что мы знаем, это то, что отдельные документы JSON, представляющие события, разделяются символами новой строки. Удивительно, RxJavaString официального дополнения RxJavaString есть метод именно для этого:

1
Observable jsonChunks = StringObservable.split(chunks, "\n");

На самом деле есть еще более простой StringObservable.byLine(chunks) , но он использует зависящий от платформы конец строки. Что делает split() лучше всего объясняется в официальной документации :

St.split

Теперь мы можем безопасно проанализировать каждую String , jsonChunks :

Шаг 3: Разбор JSON

Интересно, что этот шаг не так прост. Признаюсь, мне нравились времена WSDL, потому что я мог легко и предсказуемо генерировать модель Java, которая следует контракту веб-сервиса. JSON, особенно учитывая незначительное проникновение на рынок JSON-схемы , по сути является Диким Западом интеграции. Обычно вы остаетесь с неофициальной документацией или образцами запросов и ответов. Нет информации о типе или формате, поля обязательны для заполнения и т. Д. Кроме того, поскольку я неохотно работаю с картами карт (привет, коллеги-программисты Clojure), чтобы работать с REST-сервисами на основе JSON, я сам должен написать POJO для отображения. Ну, есть обходные пути. Сначала я взял один репрезентативный пример JSON, созданный потоковым API Meetup, и поместил его в src/main/json/meetup/event.json . Затем я использовал jsonschema2pojo-maven-plugin (также существуют версии Gradle и Ant ). Название плагина сбивает с толку, он также может работать с примером JSON, а не только со схемой, для создания моделей Java:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<plugin>
    <groupId>org.jsonschema2pojo</groupId>
    <artifactId>jsonschema2pojo-maven-plugin</artifactId>
    <version>0.4.7</version>
    <configuration>
        <sourceDirectory>${basedir}/src/main/json/meetup</sourceDirectory>
        <targetPackage>com.nurkiewicz.meetup.generated</targetPackage>
        <includeHashcodeAndEquals>true</includeHashcodeAndEquals>
        <includeToString>true</includeToString>
        <initializeCollections>true</initializeCollections>
        <sourceType>JSON</sourceType>
        <useCommonsLang3>true</useCommonsLang3>
        <useJodaDates>true</useJodaDates>
        <useLongIntegers>true</useLongIntegers>
        <outputDirectory>target/generated-sources</outputDirectory>
    </configuration>
    <executions>
        <execution>
            <id>generate-sources</id>
            <phase>generate-sources</phase>
            <goals>
                <goal>generate</goal>
            </goals>
        </execution>
    </executions>
</plugin>

На этом этапе Maven создаст совместимые с Джексоном Event.java , Venue.java , Group.java и т. Д.

1
2
3
4
5
6
7
private Event parseEventJson(String jsonStr) {
    try {
        return objectMapper.readValue(jsonStr, Event.class);
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

Это просто работает, сладкий

1
final Observable<event> events = jsonChunks.map(this::parseEventJson);</event>

Шаг 4: ??? [1]

Шаг 5: ПРИБЫЛЬ !!!

Имея Observable<Event> мы можем реализовать несколько действительно интересных вариантов использования. Хотите найти имена всех встреч в Польше, которые были только что созданы? Конечно!

1
2
3
4
5
events
        .filter(event -> event.getVenue() != null)
        .filter(event -> event.getVenue().getCountry().equals("pl"))
        .map(Event::getName)
        .forEach(System.out::println);

Ищете статистику, сколько событий создается в минуту? Нет проблем!

1
2
3
4
events
        .buffer(1, TimeUnit.MINUTES)
        .map(List::size)
        .forEach(count -> log.info("Count: {}", count));

Или, может быть, вы хотите постоянно искать самые дальние встречи в будущем, пропуская их ближе, чем уже найденные?

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
events
        .filter(event -> event.getTime() != null)
        .scan(this::laterEventFrom)
        .distinct()
        .map(Event::getTime)
        .map(Instant::ofEpochMilli)
        .forEach(System.out::println);
  
//...
  
private Event laterEventFrom(Event first, Event second) {
    return first.getTime() > second.getTime() ?
            first :
            second;
}

Этот код отфильтровывает события без известного времени, генерирует либо текущее, либо предыдущее ( scan() ), в зависимости от того, какое событие было позже, отфильтровывает дубликаты и отображает время. Эта крошечная программа, работающая в течение нескольких минут, уже нашла одну только что созданную встречу, запланированную на ноябрь 2015 года — и на момент написания этой статьи это декабрь 2014 года. Возможности бесконечны.

Я надеюсь, что дал вам хорошее представление о том, как вы можете легко объединить различные технологии: реактивное программирование для написания сверхбыстрого сетевого кода, безопасный для анализа тип JSON без стандартного кода и RxJava для быстрой обработки потоков событий. Наслаждайтесь!

Ссылка: Доступ к потоковому API Meetup с помощью RxNetty от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей .