
Хотя API REST Twitter подходит для многих приложений, если вам нужны немедленные обновления и доступ к более широкому массиву уведомлений, API потоковой передачи Twitter необходим. Например, только потоковый API сообщит вам, когда другой пользователь добавит в избранное один из ваших твитов.
Использование потокового API требует постоянного постоянного соединения между вашим веб-сервером и Twitter. Этот тип реализации может быть незнаком для многих разработчиков PHP. Как только поступают твиты, Twitter уведомляет ваш сервер в режиме реального времени, позволяя вам сохранять их в своей базе данных без задержки опроса REST API. Использование потокового API также не подпадает под ограничения скорости API Twitter.
Вот визуализация того, как это работает:

Существует три варианта API потоковой передачи Twitter:
- Общественный поток . Это позволяет вашему приложению отслеживать общедоступные данные в Twitter, такие как общедоступные твиты, фильтры хэштегов и т. Д.
- Пользовательский поток . Это позволяет отслеживать поток твитов пользователя в режиме реального времени. Третья часть этой серии будет посвящена потоку пользователей.
- Потоки сайта . Потоки сайта позволяют вашему приложению отслеживать каналы Twitter в реальном времени для большого количества пользователей.
Работа вашей потоковой реализации заключается в том, чтобы как можно быстрее регистрировать входящие события и обрабатывать их в фоновом режиме, используя REST API для сбора более глубоких данных. Потоки сайта требуют предварительного одобрения от Twitter , которые, скорее всего, зарезервированы для крупных компаний и разработчиков.
К счастью, есть бесплатная библиотека с открытым исходным кодом Phirehose , которая реализует большинство требований API потоковой передачи. В этом руководстве будет описано, как интегрировать Phirehose в наше приложение Birdcage с открытым исходным кодом.
Библиотека Phirehose
Phirehose — это фантастическая PHP-реализация с открытым исходным кодом требований API Twitter Stream, написанная Фенн Бэйли . Как он описывает это, Phirehose предназначен для:
- обеспечить простой интерфейс для API Twitter Streaming для приложений PHP
- соблюдать рекомендации API потоков для обработки ошибок, переподключения и т. д.
- поощрять хорошо себя вести потоковые клиенты API
- работать независимо от расширений PHP (т.е. общей памяти, PCNTL и т. д.)
Я обнаружил, что библиотека работает без нареканий. Здесь больше документации по Phirehose .
Он предназначен для поддержания связи с Твиттером и реагирования на ленту данных Твиттера при работе в течение неограниченного времени без перерыва. Он не предназначен для выполнения подробной обработки твитов и гидратации данных, которые мы описали во второй части этой серии . Это можно сделать отдельно.
Запущенный пожарный шланг на неопределенный срок
Как правило, вы не можете запустить типичную веб-задачу cron как неопределенную операцию keep-alive. Лучше создать демон командной строки.
Одной из мощных функций, предлагаемых Yii, является возможность запуска консольных приложений из командной строки. Это позволит нам запустить приложение командной строки keep-alive, которое использует всю созданную нами среду Birdcage PHP и MySQL.
Создание консольной команды Yii
 В каталоге /app/ вне корневого каталога, доступного через Интернет, мы добавим файл stream.php который запускает команду потоковой консоли Phirehose: 
| 1 2 3 4 5 6 | <?php defined(‘YII_DEBUG’) or define(‘YII_DEBUG’,true); $yii=dirname(__FILE__).’/../framework/yii.php’; $config=dirname(__FILE__).’/protected/config/main.php’; require_once($yii); Yii::createConsoleApplication($config)->run(); | 
  Далее мы StreamCommand.php фактический командный файл StreamCommand.php в StreamCommand.php /app/protected/commands : 
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 | <?php class StreamCommand extends CConsoleCommand {     // test with php ./app/stream.php Stream     public function run($args)     {         // get Twitter user account keys         $result = Account::model()->findByPk(1);         $c = new Consumer($result[‘oauth_token’],$result[‘oauth_token_secret’],Phirehose::METHOD_USER);         // load Twitter App keys         $app = UserSetting::model()->loadPrimarySettings();         $c->consumerKey = $app[‘twitter_key’];         $c->consumerSecret = $app[‘twitter_secret’];         $c->consume();     } } | 
Он запустит процесс Phirehose, Consumer, используя наше приложение Twitter и пользовательские ключи.
Примечание. В целях примера потоковой передачи Birdcage мы предполагаем, что была зарегистрирована только одна учетная запись Twitter и жесткий код загружает учетные данные, например account_id = 1.
  Интегрирование Phirehose 
  Чтобы интегрировать Phirehose в Birdcage, я переместил OAuthPhirehose.php и UserstreamPhirehose.php в UserstreamPhirehose.php /app/protected/components .  В моем main.php конфигурации main.php я добавил phirehose в список загруженных компонентов: 
| 1 2 3 4 5 6 7 | ‘preload’=>array(      ‘log’,      ‘bootstrap’,      ‘mailgun’,      ‘phirehose’,      ‘advanced’      ), | 
Затем я создал миграцию базы данных, чтобы создать таблицу для хранения необработанных данных из потока Twitter:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | class m140919_193106_create_stream_table extends CDbMigration {      protected $MySqlOptions = ‘ENGINE=InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci’;      public $tablePrefix;      public $tableName;      public function before() {        $this->tablePrefix = Yii::app()->getDb()->tablePrefix;        if ($this->tablePrefix <> »)          $this->tableName = $this->tablePrefix.’stream’;      }        public function safeUp()     {       $this->before();       $this->createTable($this->tableName, array(                ‘id’ => ‘pk’,                ‘tweet_id’ => ‘bigint(20) unsigned NOT NULL’,                ‘code’ => ‘text NULL’,                ‘is_processed’ => ‘tinyint default 0’,                ‘created_at’ => ‘DATETIME NOT NULL DEFAULT 0’,                ‘modified_at’ => ‘TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP’,                  ), $this->MySqlOptions);                  $this->createIndex(‘tweet_id’, $this->tableName , ‘tweet_id’, true);     } | 
  Я также создал новую модель под названием Consumer.php которая расширяет OauthPhirehose требуемым методом enqueueStatus . 
Мы хотим минимизировать объем обработки, которую необходимо выполнить в режиме реального времени. По сути, мы просто хотим записать данные, полученные из Twitter, в нашу базу данных — и ничего больше. Мы можем выполнять другую обработку в наших собственных фоновых задачах, не замедляя потоковое соединение Phirehose. Моя функция просто берет данные твита из входящего потока и сохраняет их в таблице потоков:
| 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 | <?php  class Consumer extends OauthPhirehose  {    // This function is called automatically by the Phirehose class    // when a new tweet is received with the JSON data in $status    public function enqueueStatus($status) {      $stream_item = json_decode($status);      if (!(isset($stream_item->id_str))) { return;}      $s = new Stream;      $s->tweet_id = $stream_item->id_str;      $s->code = base64_encode(serialize($stream_item));      $s->is_processed=0;      $s->created_at = new CDbExpression(‘NOW()’);      $s->modified_at =new CDbExpression(‘NOW()’);      $s->save();      var_dump($stream_item);    }  }  ?> | 
  Мы будем полагаться на фоновые задачи, выполняемые нашим DaemonController для обработки данных в модели Bircdcage Tweet.  Это описано далее ниже. 
Активация Phirehose
Вы можете протестировать Phirehose, используя консольную команду PHP:
 php ./app/stream.php Stream  
Twitter отправит поток информации о подписчике для учетной записи пользователя, а затем данные в реальном времени по мере поступления.
Чтобы активировать Phirehose как консольную команду keep-alive, всегда включенную, мы будем использовать команду nohup , например, no hangup, и перенаправим вывод в dev / null:
 nohup php ./app/stream.php Stream > /dev/null 2>&1& 
Ubuntu ответит идентификатором вашего процесса для будущего мониторинга и завершения:
 [1] 9768 
Если вы хотите проверить, что процесс запущен, отсканируйте список задач на наличие идентификатора задания:
 ps -e all | grep 9768 
Вы должны увидеть что-то вроде этого:
 0  1000  9768  9743  20  0 273112 16916 poll_s S   pts/0    0:00 php ./app/stream.php Stream  
И вы можете прекратить Phirehose, убив идентификатор задания:
 kill 9768 
По моему опыту, в течение последних двух недель Phirehose безупречно работал с этой техникой.
  Обработка потоковых данных 
Нам также необходимо создать фоновый процесс в Birdcage, который будет обрабатывать потоковые данные в наших таблицах Tweet, Mention, URL и Hashtag — как если бы они были получены из REST API.
  Измените настройки файла twitter.ini для использования потоков: 
| 1 | twitter_stream = true | 
И мы можем использовать ту же задачу cron из второй части для запуска этой операции:
| 01 02 03 04 05 06 07 08 09 10 11 12 | # To define the time you can provide concrete values for # minute (m), hour (h), day of month (dom), month (mon), # and day of week (dow) or use ‘*’ in these fields (for ‘any’).# # Notice that tasks will be started based on the cron’s system # daemon’s notion of time and timezones. # # For example, you can run a backup of all your user accounts # at 5 am every week with: # 0 5 * * 1 tar -zcf /var/backups/home.tgz /home/ # # mh dom mon dow command */5 * * * * wget -O /dev/null http://birdcage.yourdomain.com/daemon/index | 
  Затем, когда вызывается DaemonController, он активирует метод процесса Stream model() : 
| 1 2 3 4 5 6 7 8 | public function actionIndex() {      // if not using twitter streams, we’ll process tweets by REST API      if (!Yii::app()->params[‘twitter_stream’]) {        Tweet::model()->getStreams();      } else {        Stream::model()->process();      }  } | 
Метод process распаковывает данные закодированного потока и анализирует каждую запись так же, как мы это делали с контентом из REST API:
| 01 02 03 04 05 06 07 08 09 10 11 | public function process() {      // get unprocessed tweets from stream engine      // to do      $account_id = 1;      $items = Stream::model()->unprocessed()->findAll();      foreach ($items as $i) {        $tweet = unserialize(base64_decode($i[‘code’]));      Tweet::model()->parse($account_id,$tweet);      $this->setStatus($i[‘id’],self::STREAM_PROCESSED);      }    } | 
Birdcage в настоящее время игнорирует данные из потока, которые не являются твитами, например, уведомления, прямые сообщения и т. Д. Я оставлю это вам для расширения или вы можете проверить мое расширенное приложение, Birdhouse .
В заключение
Я надеюсь, что вы нашли эту серию из трех частей Twitter API информативной и полезной. К настоящему времени вы узнали о OAuth, REST API, Streaming API, создании базы данных для Twitter, обработке временной шкалы с обоими типами API, правильном подсчете символов в твитах и их публикации и многом другом.
Пожалуйста, оставьте любые комментарии, исправления или дополнительные идеи ниже. Вы можете просмотреть мои другие учебники Tuts + на моей странице автора или подписаться на меня в Twitter @reifman .