Статьи

Поиск в Твиттере в реальном времени через Websocket или Comet

В настоящее время Twitter поддерживает потоковый API, описанный ниже:

«API потоковой передачи Twitter обеспечивает высокопроизводительный доступ практически в реальном времени к различным подмножествам открытых и защищенных данных Twitter».

К сожалению, нет такого API, когда пришло время выполнять поиск в реальном времени. Благодаря недавно выпущенной версии Atmosphere Framework 0.6.2 давайте создадим такой API менее чем за 25 строк. В качестве дополнения, давайте воспользуемся плагином Atmosphere JQuery, чтобы выбрать лучший транспорт для взаимодействия с этим API: Websocket, http-streaming или long-polling.

Сторона сервера

Что нам нужно сделать:

  1. Укажите URI, который можно использовать для отправки поисковых запросов (хэштег)
  2. Основываясь на ключевом слове / хэштеге, подключайте и опрашивайте API поиска в Twitter каждую секунду (или больше), чтобы получить результат JSON. Поскольку мы хотим поддерживать тысячи клиентов, давайте удостоверимся, что никогда не блокируем ожидание результата, а вместо этого будем полностью асинхронными.
  3. Трансляция / Отослать обратно объект JSON к нашему набору приостановленных ответов. Под приостановленным здесь я подразумеваю соединение, которое использует длинный опрос, потоковую передачу http или Websocket. Как вам может быть известно, Atmosphere скрывает такие детали в вашем приложении, поэтому вам не нужно фокусироваться на том, как работает транспорт, а на самом приложении.
  4. Укажите URI для остановки поиска в реальном времени.

Теперь давайте построим это. Если вы не можете ждать, вы можете прочитать весь код здесь и здесь . Для нашего приложения давайте используем модули атмосферы-аннотации и атмосферы-джерси:

@Path("/search/{tagid}")
@Singleton
public class TwitterFeed {

    private final AsyncHttpClient asyncClient = new AsyncHttpClient();
    private final ConcurrentHashMap<String, Future<?>> futures
                 = new ConcurrentHashMap<String, Future<?>>();

    @GET
    public SuspendResponse<String>
             search(final @PathParam("tagid") Broadcaster feed
                    final @PathParam("tagid") String tagid) {

Мы хотим, чтобы наш класс вызывался, когда запрос принимает форму / search / {hashtag}, а хэштегом может быть все, что вы хотите найти. Затем мы определим наш первый метод, попросив Джерси добавить Broadcaster и хэштег (tagid) из URL. Для тех из вас, кто не знаком с концепцией Atmosphere, Broadcaster — это объект, который можно использовать для трансляции / отправки событий в браузер. Broadcaster содержит список соединений, которые были приостановлены, независимо от используемого транспорта: длинный опрос, потоковая передача или веб-сокет. Следовательно, вещатель может использоваться для трансляции событий в реальном времени. Для нашего текущего приложения мы создадим один Broadcaster для каждого хэштега. Следующим шагом является настройка нашего Broadcaster для опроса API поиска в Twitter, просто выполнив:

if (feed.getAtmosphereResources().size() == 0) {
   Future<?> future = feed.scheduleFixedBroadcast(new Callable<String>() {

           private final AtomicReference<String> refreshUrl = new AtomicReference<String>("");

           public String call() throws Exception {
              String query = null;
              if (!refreshUrl.get().isEmpty()) {
                  query = refreshUrl.get();
              } else {
                  query = "?q=" + tagid;
              }
              asyncClient.prepareGet(
                  "http://search.twitter.com/search.json"  + query)
                       .execute(new AsyncCompletionHandler <Integer>()) {

                          @Override
                          public Object onCompleted(Response response) throws Exception {
                            String s = response.getResponseBody();
                            JSONObject json = new JSONObject(s);
                            refreshUrl.set(json.getString("refresh_url"));
                            feed.broadcast(s).get();
                            return response.getStatusCode();
                          }
                    });
                    return "OK";
                }
            }, 1, TimeUnit.SECONDS);

            futures.put(tagid, future);
        }

Сначала мы запрашиваем у нашего Broadcaster (канал), чтобы узнать, есть ли уже соединение, которое запрашивало поиск в реальном времени. Если нет, то мы вызываем Broadcaster.scheduleFixedBroadcast (..) с Callable. Это Callable будет выполняться каждую секунду. Внутри вызываемого объекта мы используем другой активный проект с открытым исходным кодом AsynHttpClient , который позволяет выполнять вызываемый вызов асинхронно, например, мы отправляем запрос, но не блокируем ожидание ответа.

AsyncHttpClient позаботится о обратном вызове AsyncCompletionHandler, как только API Twitter отправит нам полный ответ. Мы могли бы передать ответ, но чтобы упростить пример, мы просто используем AsyncCompletionHandler, который буферизует весь ответ JSON.

Из объекта JSON мы получаем значение refresh_url, которое мы будем использовать в следующий раз, когда будем запрашивать API-интерфейс поиска в Twitter, чтобы получать только новые результаты вместо всего набора. Далее нам просто нужно сказать Атмосфере приостановить соединение и использовать этот вещатель:

return new SuspendResponse.SuspendResponseBuilder<String>()
                .broadcaster(feed)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();

Наконец, мы просто передаем результат как есть, чтобы наш клиент мог использовать JSON для чтения ответа. Мы сохраняем возвращенное будущее, поэтому позже мы можем остановить трансляцию в реальном времени:

    @GET
    @Path("stop")
    public String stopSearch(
           final @PathParam("tagid") Broadcaster feed,
           final @PathParam("tagid") String tagid) {                 

            // Resume all connections associated with an hashtag
            feed.resumeAll();
            futures.get(tagid).cancel(true);
            return "DONE";
        }
    }

Чтобы остановить поиск в реальном времени, мы просто выдаем / search / {# hashtag} / stop . Это все, что нам нужно сделать на стороне сервера. Наше серверное приложение теперь может получать запросы Websocket, long-polling или http-streaming.

Клиентская сторона

На стороне клиента вы можете использовать любую существующую библиотеку Javascript, поддерживающую Websocket или Comet, для запроса API в реальном времени. Это так же просто, как

/ search / # hashtag                 для подписки на обновления в реальном времени (попробуйте с WebSocket!)

/ search / # hashtag / stop,     чтобы остановить обновление в реальном времени

Но самый простой способ — использовать JQuery-плагин Atmosphere (конечно :-)), который поддерживает Websocket и Comet. Еще важнее то, что плагин может определять оптимальный транспорт для использования на основе того, что поддерживает клиент и сервер. Например, если приложение развернуто в Tomcat и вы используете Chrome, вы можете позволить плагину найти лучший транспорт или указать тот, который вы хотите использовать. Это так просто, как:

   $.atmosphere.subscribe(document.location.toString() + 'search/' + hashtag              
              callback,
              $.atmosphere.request = {transport: 'Websocket'});

Выше мы вызываем метод подписки, передавая URL-адрес, содержащий хэштег, обратный вызов и некоторые свойства запроса. Плагин вызывает Callback, как только поиск в реальном времени начинается на сервере. Обратный вызов выглядит так:

function callback(response) {
   if (response.transport != 'polling' &&
           response.state != 'connected' &&
           response.state != 'closed') {

           if (response.status == 200) {
               var data = response.responseBody;

               try {
                    var result =  $.parseJSON(incompleteMessage + data);  
                    incompleteMessage = "";

                     var i = 0;
                     for (i = result.results.length -1 ; i > -1; i--){
                          $('ul').prepend($('<li></li>').text("["
                               + response.transport + "] "
                               + result.results[i].from_user + " "
                               + result.results[i].text));
                   }
              } catch (err) {
                    incompleteMessage = data;
              }

Важный фрагмент кода выше — это parseJSON (incompleteMessage + data); , Размер данных, возвращаемых сервером, может различаться в зависимости от сервера, поэтому мы не можем получить весь объект JSON за один вызов обратного вызова. Поэтому нам нужно обернуть этот вызов внутри try / catch (так как parseJSON потерпит неудачу) и убедиться, что в следующий раз при вызове обратного вызова мы добавим ранее полученные данные. Этот сценарий произойдет, только если вы будете искать популярный хэштег и вы получите большое количество ответов (попробуйте #Nordiques !!!).

Вот и все на стороне клиента. Вы можете скачать образец здесь и развернуть его на любом веб-сервере, поддерживающем комету и / или веб-сокет, или ни на одном из них !! Интерфейс прост и демонстрирует автоматическое обнаружение транспорта.

Любой классный и лучше разработанный интерфейс приветствуется ? Наконец, вы можете получить больше информации об этом образце, прочитав мой доклад по JavaOne.

По любым вопросам или для загрузки клиентской и серверной среды Atmosphere перейдите на наш основной сайт и воспользуйтесь нашим форумом Nabble (подписка не требуется) или следите за командой или мной и пишите там свои вопросы! Вы также можете проверить код на Github .

 Контент от jfarcand.wordpress.com