Статьи

Превращение Twitter4J в наблюдаемую RxJava

Twitter4J – это Java-оболочка вокруг Twitter API . В то время как Twitter поддерживает простые взаимодействия запрос-ответ, в этой статье мы рассмотрим потоковые API . В отличие от модели запрос-ответ, которая всегда инициируется клиентом, потоковый API передает данные с сервера Twitter клиентам, как только они становятся доступны. Конечно, в случае с Twitter мы говорим о твитах, которые называются Status в API. осло-1

Вопрос в том, как бы вы разработали Java API для потоковой передачи? Не удивительно: обратные вызовы, обратные вызовы везде !

1
2
3
4
5
6
7
8
9
import twitter4j.*;
  
TwitterStream twitter = new TwitterStreamFactory().getInstance();
twitter.addListener(new StatusAdapter() {
  public void onStatus(Status status) {
    System.out.println(status.getUser().getName() + " : " + status.getText());
  }
});
twitter.sample();

Скажем, в дополнение к этому API мы хотели бы посчитать, сколько сообщений мы получаем в секунду. Много случайных сложностей проникает в:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
final AtomicInteger countPerSecond = new AtomicInteger();
  
twitter.addListener(new StatusAdapter() {
  public void onStatus(Status status) {
    countPerSecond.incrementAndGet();
  }
});
twitter.sample();
  
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
    final int count = countPerSecond.getAndSet(0);
    log.debug("Tweets/second: {}", count);
  }
}, 1, 1, SECONDS);

Нам нужен ScheduledExecutorService и очень осторожно относимся к безопасности потоков. Более того, этот подход не масштабируется, так как требует кода, созданного вручную, для каждого случая использования, который мы можем себе представить, например, для регулирования, объединения или накопления. Оказывается, что связывание потокового API Twitter4J (и практически любого API на основе обратного вызова в этом отношении) с Observable в RxJava довольно простое и значительно упростит дальнейшие решения.

Прежде чем мы рассмотрим, как создать новый Observable представляющий поток сообщений Twitter поверх API Twitter4J, давайте предположим, что он у нас уже есть:

1
Observable<Status> twitter = twitterObservable();  //to be implemented

Observable<Status> twitter – это поток объектов Status где каждый такой объект представляет собой один твит. Как мы решаем нашу начальную проблему подсчета твитов в секунду ( tps )?

1
2
3
Observable<Integer> tpsStream = twitter.
    buffer(1, TimeUnit.SECONDS).
    map(list -> list.size());

Это было просто! Мы берем начальный поток твитов и буферизируем их каждую секунду. По истечении одной секунды запускается только одно событие, содержащее List<Status> созданный за этот период времени. Позже мы преобразуем List в Integer , взяв его size() . Вот и все! tpsStream будет tpsStream одно число в секунду, представляющее количество твитов в секунду. Если мы внезапно осознали, что наша система перегружена этим числом, мы можем легко сэмплировать поток и выбрать только его подмножество. Например, мы хотим получать не более одного твита каждые 100 миллисекунд:

1
twitter.sample(100, MILLISECONDS)

Существует более ста доступных операторов, похожих на buffer() и sample() но я надеюсь, что вы поняли идею. Теперь, когда мы видим, насколько полезен Observable<Status> , давайте реализуем его. При определении Observable нам нужно предоставить два обработчика: один описывает, что происходит, когда клиент подписывается на данный Observable и, опционально, – как обрабатывать отписку:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
public Observable<Status> twitterObservable() {
  return Observable.create(subscriber -> {
    final TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
    twitterStream.addListener(new StatusAdapter() {
      public void onStatus(Status status) {
        subscriber.onNext(status);
      }
      public void onException(Exception ex) {
        subscriber.onError(ex);
      }
    });
    twitterStream.sample();
    return Subscriptions.create(() -> {
      twitterStream.cleanUp();
    });
  });
}

Довольно много кода написано на Java 8 (Scala и Groovy одинаково хорошо работают с RxJava). Обратный вызов, предоставляемый Observable.create() , выполняется каждый раз, когда кто-то подписывается на Observable . Оказывается, что все приведенные ниже примеры никогда не запускают этот обработчик, потому что RxJava очень ленив по своей природе, поэтому он не будет подключаться к Twitter без крайней необходимости. Например, twitter.filter(...) вернет новый Observable только с подмножеством твитов, соответствующих определенным критериям. Но до тех пор, пока вы физически не подписываетесь (используя twitter.subscribe() ) на эту Observable , на самом деле ничего не произойдет. В приведенном ниже примере соединение откладывается до тех пор, пока мы не вызовем метод subscribe() . После этого извлекается текст каждого #java твита, и если он содержит хештег #java – он будет напечатан. Все это происходит асинхронно, и весь оператор не блокируется:

1
2
3
4
twitter.
  map(Status::getText).
  filter(text -> text.contains("#java")).
  subscribe(System.out::println);

Subscriptions.create() также принимает обработчик – и, как вы можете догадаться, он сообщает, что должно произойти, когда клиент больше не интересуется Observable<Status> .

Twitter4J – это всего лишь пример того, как вы можете адаптировать API на основе обратного вызова в Observable . Другие примеры включают входящие сетевые пакеты, сообщения JMS или изменения файловой системы. Во всех случаях сценарий один и тот же.

Ссылка: Превращение Twitter4J в RxJava Observable от нашего партнера по JCG Томаша Нуркевича из блога Java и соседей .