Статьи

Создание с помощью API Twitter: использование потоков в реальном времени

Конечный продукт
Что вы будете создавать

Хотя API REST Twitter подходит для многих приложений, если вам нужны немедленные обновления и доступ к более широкому массиву уведомлений, API потоковой передачи Twitter необходим. Например, только потоковый API сообщит вам, когда другой пользователь добавит в избранное один из ваших твитов.

Использование потокового API требует постоянного постоянного соединения между вашим веб-сервером и Twitter. Этот тип реализации может быть незнаком для многих разработчиков PHP. Как только поступают твиты, Twitter уведомляет ваш сервер в режиме реального времени, позволяя вам сохранять их в своей базе данных без задержки опроса REST API. Использование потокового API также не подпадает под ограничения скорости API Twitter.

Вот визуализация того, как это работает:

Как работает Twitter Streaming API

Существует три варианта API потоковой передачи Twitter:

  1. Общественный поток . Это позволяет вашему приложению отслеживать общедоступные данные в Twitter, такие как общедоступные твиты, фильтры хэштегов и т. Д.
  2. Пользовательский поток . Это позволяет отслеживать поток твитов пользователя в режиме реального времени. Третья часть этой серии будет посвящена потоку пользователей.
  3. Потоки сайта . Потоки сайта позволяют вашему приложению отслеживать каналы Twitter в реальном времени для большого количества пользователей.

Работа вашей потоковой реализации заключается в том, чтобы как можно быстрее регистрировать входящие события и обрабатывать их в фоновом режиме, используя REST API для сбора более глубоких данных. Потоки сайта требуют предварительного одобрения от Twitter , которые, скорее всего, зарезервированы для крупных компаний и разработчиков.

К счастью, есть бесплатная библиотека с открытым исходным кодом Phirehose , которая реализует большинство требований API потоковой передачи. В этом руководстве будет описано, как интегрировать Phirehose в наше приложение Birdcage с открытым исходным кодом.

Phirehose — это фантастическая PHP-реализация с открытым исходным кодом требований API Twitter Stream, написанная Фенн Бэйли . Как он описывает это, Phirehose предназначен для:

  • обеспечить простой интерфейс для API Twitter Streaming для приложений PHP
  • соблюдать рекомендации API потоков для обработки ошибок, переподключения и т. д.
  • поощрять хорошо себя вести потоковые клиенты API
  • работать независимо от расширений PHP (т.е. общей памяти, PCNTL и т. д.)

Я обнаружил, что библиотека работает без нареканий. Здесь больше документации по Phirehose .

Он предназначен для поддержания связи с Твиттером и реагирования на ленту данных Твиттера при работе в течение неограниченного времени без перерыва. Он не предназначен для выполнения подробной обработки твитов и гидратации данных, которые мы описали во второй части этой серии . Это можно сделать отдельно.

Как правило, вы не можете запустить типичную веб-задачу cron как неопределенную операцию keep-alive. Лучше создать демон командной строки.

Одной из мощных функций, предлагаемых Yii, является возможность запуска консольных приложений из командной строки. Это позволит нам запустить приложение командной строки keep-alive, которое использует всю созданную нами среду Birdcage PHP и MySQL.

В каталоге /app/ вне корневого каталога, доступного через Интернет, мы добавим файл stream.php который запускает команду потоковой консоли Phirehose:

1
2
3
4
5
6
<?php
defined(‘YII_DEBUG’) or define(‘YII_DEBUG’,true);
$yii=dirname(__FILE__).’/../framework/yii.php’;
$config=dirname(__FILE__).’/protected/config/main.php’;
require_once($yii);
Yii::createConsoleApplication($config)->run();

Далее мы StreamCommand.php фактический командный файл StreamCommand.php в StreamCommand.php /app/protected/commands :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
<?php
class StreamCommand extends CConsoleCommand
{
    // test with php ./app/stream.php Stream
    public function run($args)
    {
        // get Twitter user account keys
        $result = Account::model()->findByPk(1);
        $c = new Consumer($result[‘oauth_token’],$result[‘oauth_token_secret’],Phirehose::METHOD_USER);
        // load Twitter App keys
        $app = UserSetting::model()->loadPrimarySettings();
        $c->consumerKey = $app[‘twitter_key’];
        $c->consumerSecret = $app[‘twitter_secret’];
        $c->consume();
    }
}

Он запустит процесс Phirehose, Consumer, используя наше приложение Twitter и пользовательские ключи.

Примечание. В целях примера потоковой передачи Birdcage мы предполагаем, что была зарегистрирована только одна учетная запись Twitter и жесткий код загружает учетные данные, например account_id = 1.

Чтобы интегрировать Phirehose в Birdcage, я переместил OAuthPhirehose.php и UserstreamPhirehose.php в UserstreamPhirehose.php /app/protected/components . В моем main.php конфигурации main.php я добавил phirehose в список загруженных компонентов:

1
2
3
4
5
6
7
‘preload’=>array(
     ‘log’,
     ‘bootstrap’,
     ‘mailgun’,
     ‘phirehose’,
     ‘advanced’
     ),

Затем я создал миграцию базы данных, чтобы создать таблицу для хранения необработанных данных из потока Twitter:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class m140919_193106_create_stream_table extends CDbMigration
{
     protected $MySqlOptions = ‘ENGINE=InnoDB CHARSET=utf8 COLLATE=utf8_unicode_ci’;
     public $tablePrefix;
     public $tableName;
 
     public function before() {
       $this->tablePrefix = Yii::app()->getDb()->tablePrefix;
       if ($this->tablePrefix <> »)
         $this->tableName = $this->tablePrefix.’stream’;
     }
 
       public function safeUp()
    {
      $this->before();
      $this->createTable($this->tableName, array(
               ‘id’ => ‘pk’,
               ‘tweet_id’ => ‘bigint(20) unsigned NOT NULL’,
               ‘code’ => ‘text NULL’,
               ‘is_processed’ => ‘tinyint default 0’,
               ‘created_at’ => ‘DATETIME NOT NULL DEFAULT 0’,
               ‘modified_at’ => ‘TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP’,
                 ), $this->MySqlOptions);
                 $this->createIndex(‘tweet_id’, $this->tableName , ‘tweet_id’, true);
 
    }

Я также создал новую модель под названием Consumer.php которая расширяет OauthPhirehose требуемым методом enqueueStatus .

Мы хотим минимизировать объем обработки, которую необходимо выполнить в режиме реального времени. По сути, мы просто хотим записать данные, полученные из Twitter, в нашу базу данных — и ничего больше. Мы можем выполнять другую обработку в наших собственных фоновых задачах, не замедляя потоковое соединение Phirehose. Моя функция просто берет данные твита из входящего потока и сохраняет их в таблице потоков:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
<?php
 class Consumer extends OauthPhirehose
 {
   // This function is called automatically by the Phirehose class
   // when a new tweet is received with the JSON data in $status
   public function enqueueStatus($status) {
     $stream_item = json_decode($status);
     if (!(isset($stream_item->id_str))) { return;}
     $s = new Stream;
     $s->tweet_id = $stream_item->id_str;
     $s->code = base64_encode(serialize($stream_item));
     $s->is_processed=0;
     $s->created_at = new CDbExpression(‘NOW()’);
     $s->modified_at =new CDbExpression(‘NOW()’);
     $s->save();
     var_dump($stream_item);
   }
 }
 ?>

Мы будем полагаться на фоновые задачи, выполняемые нашим DaemonController для обработки данных в модели Bircdcage Tweet. Это описано далее ниже.

Вы можете протестировать Phirehose, используя консольную команду PHP:

php ./app/stream.php Stream

Twitter отправит поток информации о подписчике для учетной записи пользователя, а затем данные в реальном времени по мере поступления.

Чтобы активировать Phirehose как консольную команду keep-alive, всегда включенную, мы будем использовать команду nohup , например, no hangup, и перенаправим вывод в dev / null:

nohup php ./app/stream.php Stream > /dev/null 2>&1&

Ubuntu ответит идентификатором вашего процесса для будущего мониторинга и завершения:

[1] 9768

Если вы хотите проверить, что процесс запущен, отсканируйте список задач на наличие идентификатора задания:

ps -e all | grep 9768

Вы должны увидеть что-то вроде этого:

0 1000 9768 9743 20 0 273112 16916 poll_s S pts/0 0:00 php ./app/stream.php Stream

И вы можете прекратить Phirehose, убив идентификатор задания:

kill 9768

По моему опыту, в течение последних двух недель Phirehose безупречно работал с этой техникой.

Нам также необходимо создать фоновый процесс в Birdcage, который будет обрабатывать потоковые данные в наших таблицах Tweet, Mention, URL и Hashtag — как если бы они были получены из REST API.

Измените настройки файла twitter.ini для использования потоков:

1
twitter_stream = true

И мы можем использовать ту же задачу cron из второй части для запуска этой операции:

01
02
03
04
05
06
07
08
09
10
11
12
# To define the time you can provide concrete values for
# minute (m), hour (h), day of month (dom), month (mon),
# and day of week (dow) or use ‘*’ in these fields (for ‘any’).#
# Notice that tasks will be started based on the cron’s system
# daemon’s notion of time and timezones.
#
# For example, you can run a backup of all your user accounts
# at 5 am every week with:
# 0 5 * * 1 tar -zcf /var/backups/home.tgz /home/
#
# mh dom mon dow command
*/5 * * * * wget -O /dev/null http://birdcage.yourdomain.com/daemon/index

Затем, когда вызывается DaemonController, он активирует метод процесса Stream model() :

1
2
3
4
5
6
7
8
public function actionIndex() {
     // if not using twitter streams, we’ll process tweets by REST API
     if (!Yii::app()->params[‘twitter_stream’]) {
       Tweet::model()->getStreams();
     } else {
       Stream::model()->process();
     }
 }

Метод process распаковывает данные закодированного потока и анализирует каждую запись так же, как мы это делали с контентом из REST API:

01
02
03
04
05
06
07
08
09
10
11
public function process() {
     // get unprocessed tweets from stream engine
     // to do
     $account_id = 1;
     $items = Stream::model()->unprocessed()->findAll();
     foreach ($items as $i) {
       $tweet = unserialize(base64_decode($i[‘code’]));
     Tweet::model()->parse($account_id,$tweet);
     $this->setStatus($i[‘id’],self::STREAM_PROCESSED);
     }
   }

Birdcage в настоящее время игнорирует данные из потока, которые не являются твитами, например, уведомления, прямые сообщения и т. Д. Я оставлю это вам для расширения или вы можете проверить мое расширенное приложение, Birdhouse .

Я надеюсь, что вы нашли эту серию из трех частей Twitter API информативной и полезной. К настоящему времени вы узнали о OAuth, REST API, Streaming API, создании базы данных для Twitter, обработке временной шкалы с обоими типами API, правильном подсчете символов в твитах и ​​их публикации и многом другом.

Пожалуйста, оставьте любые комментарии, исправления или дополнительные идеи ниже. Вы можете просмотреть мои другие учебники Tuts + на моей странице автора или подписаться на меня в Twitter @reifman .