Статьи

Программирование потока данных с помощью Straw

Поток данных — это модель программирования, существующая с момента появления компьютеров. Несмотря на то, что программирование потоков данных в течение долгого времени затихало в неведении, оно обретает новую жизнь благодаря текущему взрыву сервисов реального времени в масштабах сети и является естественным решением для многих из этих сложных задач, стоящих перед ними.

Поток данных — это простая концепция. Небольшие узлы кода получают ввод, обрабатывают его и выводят результаты. Узлы соединены вместе, выходы на входы, образуя топологию обработки. Используя поток данных, можно легко решать сложные проблемы, системы легче масштабировать и повышать их устойчивость, и вы можете более эффективно использовать свои вычислительные ресурсы.

Straw — это инфраструктура Node.js, которая позволяет реализовать обработку потоков данных в вашем приложении. Straw был создан для обработки будущих рыночных данных в режиме реального времени и может обрабатывать около 4000 сообщений в секунду в довольно скромной системе. На производстве было обработано много миллиардов сообщений.

Эта статья познакомит вас с Straw и покажет, как использовать Twitter Firehose для получения данных из твитов. Поскольку Firehose — это непрерывный поток сообщений, он идеально подходит для обработки с помощью Straw. Мы будем использовать бесплатную публичную версию, которая имеет небольшой процент от всех твитов. Несмотря на это, этого будет достаточно.

Вступление

В Straw вы определяете топологию узлов. Каждый узел имеет вход и ноль или более выходов. Узлы могут получать сообщения. Когда узел получает сообщение, он обрабатывает его с помощью предоставленной пользователем функции. Эта функция может выводить сообщения, которые будут получены любыми подключенными узлами.

Мы собираемся написать несколько узлов — один для использования необработанных данных из Firehose и извлечения интересующих нас битов, один для отправки каждого из этих битов на узел, который их анализирует, и фактические узлы анализа. Оттуда мы отправим данные на сервер Express и через WebSockets для нашей клиентской визуализации. Есть много вещей, на которые стоит обратить внимание, поэтому вы должны установить демонстрационное приложение Haystack на свой локальный компьютер.

После того, как вы поймете, как части сочетаются друг с другом, вы должны углубиться в эту базовую демонстрацию — раскрутить репо и посмотреть, как полнофункционально вы можете его сделать. Вам нужно будет установить Redis . Вам также потребуется Bower, который можно установить с помощью следующей команды.

npm install -g bower 

Как только все необходимое программное обеспечение установлено, клонируйте стог сена с помощью следующих команд.

 git clone https://github.com/simonswain/haystack cd haystack npm install bower install 

Бегущий пожарный шланг

Для доступа к Twiter Firehose вам необходимо получить учетные данные API, создав приложение в Twitter . Это позволит вашей локальной копии Haystack подключиться к API Twitter и передавать потоковые данные из Firehose. Приложение, которое вы создадите, потребует только разрешения на чтение. После создания перейдите на вкладку «Ключи API» и скопируйте значения.

Haystack поставляется с примером файла конфигурации. Скопируйте его и введите свои учетные данные из Twitter:

 exports.twitter = { consumer<em>key: '{put yours here}', consumer</em>secret: '{put yours here}', access<em>token</em>key: '{put yours here}', access<em>token</em>secret: '{put yours here}' } 

Ваша локальная копия Haystack должна быть готова к работе. Haystack состоит из двух частей — топологии Straw для потока данных и сервера Express для веб-интерфейса. Чтобы запустить его, вам нужно открыть две отдельные оболочки. Сначала откройте оболочку и запустите топологию, используя следующую команду.

 node run 

Вы должны увидеть некоторые результаты при запуске топологии, затем список @usernames при появлении твитов. Затем откройте другую оболочку и запустите сервер Express, используя эту команду:

 node server.js 

Далее посетите сайт по http://localhost:3000 . Вы увидите экран с картой мира, пингующей твиты по мере их появления, гистограмму языков и верхние хэштеги. Все это будет обновляться в режиме реального времени.

Haystack Screenshot

Изучение соломенной топологии

Давайте посмотрим на поток данных и код, чтобы это произошло. run.js загружает нашу соломенную топлоги. Когда мы создаем нашу топологию, мы передаем ей объект, описывающий нужные нам узлы и то, как они связаны друг с другом. В следующем фрагменте показано, что узел consume-firehose имеет выход, подключенный к consume-firehose называемому raw-tweets , а узел с именем route-tweets получает свой вход от этого канала. Это означает, что любые сообщения, выводимые с помощью consume-firehose будут передаваться в route-tweets и т. Д. Через топологию. Мы также передаем детали API для Twitter в узел, чтобы он знал, какие учетные данные использовать. Вы можете передать в узел все, что захотите.

 var topo = new straw.topology({ 'consume-firehose': { 'node': __dirname + '/nodes/consume-firehose.js', 'output': 'raw-tweets', 'twitter': config.twitter }, 'route-tweets': { 'node': __dirname + '/nodes/route-tweets.js', 'input': 'raw-tweets', 'outputs': { 'geo': 'client-geo', 'lang': 'lang', 'text': 'text' } }, ... 

По соглашению мы храним код для наших узлов в каталоге nodes . Нам нужно указать абсолютный путь к каждому узлу, поэтому мы используем переменную __dirname нашего скрипта для генерации этого.

Вы можете заметить, что consume-firehose не имеет ввода. Это потому, что он фактически вводит сообщения в топологию. Также обратите внимание, что у route-tweets есть три выхода. Это позволяет ему выборочно отправлять сообщения различным нисходящим узлам.

Упрощенная версия узла consume-firehose выглядит следующим образом:

 // nodes/consume-firehose.js var straw = require('straw'); var Twitter = require('twitter'); module.exports = straw.node.extend({ initialize: function(opts, done) { this.twit = new Twitter(opts.twitter); process.nextTick(done); }, run: function(done) { var self = this; this.twit.stream('statuses/sample', function(stream) { stream.on('data', function(data) { // process data then output it self.output(data); }); }); done(false); } }); 

Здесь есть два метода. initialize() вызывается при первом создании узла. Он создает наш клиент Twitter с использованием учетных данных, которые мы передали. Второй метод, run() , вызывается при запуске топологии и связывает обратный вызов для входящих твитов, который выводит сообщение в нашу топологию (через канал raw-tweets мы создали ранее).

route-tweets — хороший пример простого узла:

 var straw = require('straw'); module.exports = straw.node.extend({ initialize: function(opts, done) { var self = this; process.nextTick(done); }, process: function(x, done) { var self = this; if (x.hasOwnProperty('geo') && x.geo && x.geo.hasOwnProperty('type') && x.geo.type == 'Point') { console.log('@' + x.user.screen_name); self.output('geo', x.geo.coordinates); } self.output('lang', x.lang); self.output('text', { lang: x.lang, text: x.text }); done(); } }); 

Метод process() вызывается всякий раз, когда приходит сообщение. Он проверяет сообщение (которое в основном представляет собой твит и его метаданные в JSON) и выводит его части на выходные данные, которые мы настроили. Не все твиты содержат данные о геолокации, поэтому мы проверяем, присутствуют ли они, и выполняем хитрый console.log() чтобы дать общее представление о действиях в нашей топологии.

Деструктурированные твиты направляются на обработку в несколько разных узлов. Straw запускает каждый узел в отдельном Unix-процессе, поэтому эффективно эта последующая работа выполняется одновременно. Поскольку Redis используется для связи, вы можете запускать свои узлы на разных машинах, если хотите.

Узел catch-langs

Мы могли бы получать огромный объем входящих данных. Мы будем публиковать обновления почти в режиме реального времени для наших веб-клиентов, но мы не хотим бомбардировать их каждым приходящим сообщением. catch-langs решает эту проблему путем подсчета входящих языков, а затем периодически выводит общее количество на счет. Когда этот узел запущен, он устанавливает интервал для управления эмиттером:

 run: function(done) { var self = this; var fn = function() { self.ping(); }; this.timer = setInterval(fn, this.opts.interval); done(false); } 

Когда приходят сообщения, мы увеличиваем количество для этого языка и отмечаем, что количество изменилось:

 process: function(x, done) { var self = this; if (!this.langs.hasOwnProperty(x)) { this.langs[x] = 0; } this.langs[x] ++; this.total++; this.changed = true; done(); } 

Каждый раз, когда срабатывает интервальный таймер, если наши счетчики изменились, мы выдаем наши итоги:

 ping: function() { var self = this; var msg; if (!this.changed) { return; } this.changed = false; msg = {}; _.each(this.langs, function(x, i) { msg[i] = (x / self.total); }); this.output(msg); } 

Экспресс-сервер

До сих пор мы использовали данные из Twitter, разобрали их и получили некоторые метрики из них. Чтобы получить данные для наших конечных пользователей, мы должны извлечь их из топологии, отправить их через WebSocket и отобразить их. Это где сервер на основе Express приходит.

Посмотрите на server.js . Это довольно стандартное, минимальное приложение Express. Он использует Socket.IO как простой способ доставки данных в режиме реального времени. Вы также можете посмотреть на sock.js как на более современную альтернативу.

Интересной частью server.js является использование функции Straw, которая называется Tap. Тэп позволяет нам использовать топологию, чтобы мы могли передавать данные с нее. Если вы посмотрите на наше определение топологии в run.js , то увидите, что есть каналы для client-langs и нескольких других узлов, но нет подключенных к ним потребителей. Они есть для нашего Tap, чтобы подключиться.

Внутри server.js у нас есть такой код (здесь немного упрощенный):

 var straw = require('straw'); var langs = new straw.tap({ 'input':'client-langs', }); langs.on('message', function(msg) { langs = msg; io.sockets.emit('langs', msg); }); 

Это включает в себя библиотеку Straw, создает из нее новый Tap, который подключен к client-langs , и связывает обработчик событий, который будет вызываться при получении сообщений по этому каналу. Когда сообщение получено, мы отправляем его с помощью Socket.IO. На стороне клиента все очень просто. Смотрите следующий код в public/js/haystack.js .

 var socket = io.connect('http://localhost:3000'); socket.on('langs', function (data) { // do something }); 

Всякий раз, когда сообщение получено, на клиент запускается обратный вызов с любой полезной нагрузкой, которую мы предоставили в data . Это используется для рисования нашей гистограммы языков, количества хэштегов и геолокированных пингов на карте.

Вывод

В заключение Haystack представляет собой компактный пример того, как использовать поток данных для обработки входящих сообщений. Это хороший пример того, как Straw используется в реальном мире. Поскольку каждый узел запускается в своем собственном процессе, его можно масштабировать без особых проблем. Но я думаю, что большее преимущество заключается в том, насколько легко можно разбить вашу проблему на маленькие, простые шаги.

Не стесняйтесь использовать Haystack в качестве основы для вашего приложения. Haystack легко расширить, добавив больше узлов обработки и визуализаций для них. Сделайте репо, и если у вас получится что-нибудь классное, отправьте запрос на извлечение — давайте посмотрим, насколько всеобъемлющим мы можем это сделать.