Apache Storm — Введение
Apache Storm — это распределенная система обработки больших данных в реальном времени. Storm предназначен для обработки большого количества данных отказоустойчивым и горизонтальным масштабируемым методом. Это потоковая структура данных, которая обладает высокой скоростью приема. Хотя Storm не имеет состояния, он управляет распределенной средой и состоянием кластера через Apache ZooKeeper. Это просто, и вы можете выполнять все виды манипуляций с данными в реальном времени параллельно.
Apache Storm продолжает оставаться лидером в области анализа данных в реальном времени. Storm прост в настройке, работе и гарантирует, что каждое сообщение будет обработано в топологии хотя бы один раз.
Apache Storm против Hadoop
В основном, Hadoop и Storm Framework используются для анализа больших данных. Оба они дополняют друг друга и отличаются по некоторым аспектам. Apache Storm выполняет все операции, кроме постоянства, в то время как Hadoop хорош во всем, но отстает в вычислениях в реальном времени. В следующей таблице сравниваются атрибуты Storm и Hadoop.
Гроза | Hadoop |
---|---|
Обработка потоков в реальном времени | Пакетная обработка |
Stateless | Stateful |
Архитектура Master / Slave с координацией на основе ZooKeeper. Главный узел называется nimbus, а подчиненные — супервизорами . | Архитектура «ведущий-ведомый» с / без координации на основе ZooKeeper. Главный узел — это средство отслеживания заданий, а подчиненный узел — средство отслеживания заданий . |
Процесс потоковой передачи Storm может получать доступ к десяткам тысяч сообщений в секунду в кластере. | Распределенная файловая система Hadoop (HDFS) использует инфраструктуру MapReduce для обработки огромного объема данных, который занимает минуты или часы. |
Топология шторма работает до тех пор, пока пользователь не отключится или не произойдет непредвиденный неисправимый сбой. | Задания MapReduce выполняются в последовательном порядке и в конечном итоге завершаются. |
Оба распределены и отказоустойчивы | |
Если нимб / супервизор умирает, перезапуск заставляет его продолжать с того места, где он остановился, следовательно, ничто не затрагивается. | Если JobTracker умирает, все запущенные задания теряются. |
Примеры использования Apache Storm
Apache Storm очень известен обработкой больших потоков данных в реальном времени. По этой причине большинство компаний используют Storm как неотъемлемую часть своей системы. Вот некоторые примечательные примеры:
Twitter — Twitter использует Apache Storm для своей линейки продуктов Publisher Analytics. «Publisher Analytics Products» обрабатывает все твиты и клики в платформе Twitter. Apache Storm глубоко интегрирован с инфраструктурой Twitter.
NaviSite — NaviSite использует Storm для системы мониторинга / аудита журнала событий. Все журналы, сгенерированные в системе, будут проходить через Шторм. Storm проверит сообщение на соответствие настроенному набору регулярных выражений, и если совпадение будет найдено, то это конкретное сообщение будет сохранено в базе данных.
Wego — Wego — это механизм метапоисков путешествий, расположенный в Сингапуре. Данные, связанные с поездками, поступают из многих источников по всему миру с разными сроками. Storm помогает Wego осуществлять поиск данных в реальном времени, устраняет проблемы с параллелизмом и находит наилучшее соответствие для конечного пользователя.
Преимущества Apache Storm
Вот список преимуществ, которые предлагает Apache Storm —
-
Storm является открытым исходным кодом, надежным и удобным для пользователя. Он может быть использован как в небольших компаниях, так и в крупных корпорациях.
-
Storm отказоустойчив, гибок, надежен и поддерживает любой язык программирования.
-
Позволяет обрабатывать поток в реальном времени.
-
Storm невероятно быстр, потому что он обладает огромной способностью обрабатывать данные.
-
Storm может поддерживать производительность даже при увеличении нагрузки путем линейного добавления ресурсов. Это очень масштабируемый.
-
Storm выполняет обновление данных, и реакция доставки от начала до конца в считанные секунды или минуты зависит от проблемы. У него очень низкая задержка.
-
Шторм обладает оперативным интеллектом.
-
Storm обеспечивает гарантированную обработку данных, даже если какой-либо из подключенных узлов в кластере умирает или сообщения теряются.
Storm является открытым исходным кодом, надежным и удобным для пользователя. Он может быть использован как в небольших компаниях, так и в крупных корпорациях.
Storm отказоустойчив, гибок, надежен и поддерживает любой язык программирования.
Позволяет обрабатывать поток в реальном времени.
Storm невероятно быстр, потому что он обладает огромной способностью обрабатывать данные.
Storm может поддерживать производительность даже при увеличении нагрузки путем линейного добавления ресурсов. Это очень масштабируемый.
Storm выполняет обновление данных, и реакция доставки от начала до конца в считанные секунды или минуты зависит от проблемы. У него очень низкая задержка.
Шторм обладает оперативным интеллектом.
Storm обеспечивает гарантированную обработку данных, даже если какой-либо из подключенных узлов в кластере умирает или сообщения теряются.
Apache Storm — Основные понятия
Apache Storm считывает необработанный поток данных в реальном времени с одного конца и пропускает их через последовательность небольших блоков обработки и выводит обработанную / полезную информацию на другом конце.
Следующая диаграмма изображает основную концепцию Apache Storm.
Давайте теперь подробнее рассмотрим компоненты Apache Storm —
Компоненты | Описание |
---|---|
Кортеж | Кортеж является основной структурой данных в Storm. Это список упорядоченных элементов. По умолчанию кортеж поддерживает все типы данных. Обычно он моделируется как набор значений, разделенных запятыми, и передается в кластер Storm. |
Поток | Поток — это неупорядоченная последовательность кортежей. |
Смерчи | Источник потока. Как правило, Storm принимает входные данные из необработанных источников данных, таких как Twitter Streaming API, очередь Apache Kafka, очередь Kestrel и т. Д. В противном случае вы можете написать носители для чтения данных из источников данных. «ISpout» является основным интерфейсом для реализации spouts. Некоторыми из определенных интерфейсов являются IRichSpout, BaseRichSpout, KafkaSpout и т. Д. |
Болты | Болты являются логическими единицами обработки. Носики передают данные в процесс обработки болтов и болтов и создают новый выходной поток. Болты могут выполнять операции фильтрации, агрегации, объединения, взаимодействия с источниками данных и базами данных. Болт получает данные и испускает один или несколько болтов. «IBolt» является основным интерфейсом для реализации болтов. Некоторые из распространенных интерфейсов — IRichBolt, IBasicBolt и т. Д. |
Давайте рассмотрим пример «анализа Twitter» в реальном времени и посмотрим, как его можно смоделировать в Apache Storm. Следующая диаграмма изображает структуру.
Входные данные для «Twitter Analysis» поступают из Twitter Streaming API. Spout будет читать твиты пользователей, используя Twitter Streaming API, и выводить их как поток кортежей. Один кортеж из носика будет иметь имя пользователя в твиттере и один твит в качестве значений, разделенных запятыми. Затем этот поток кортежей будет перенаправлен на Болт, и Болт разделит твит на отдельные слова, вычислит количество слов и сохранит информацию в сконфигурированном источнике данных. Теперь мы можем легко получить результат, запросив источник данных.
Топология
Изливы и болты соединены вместе и образуют топологию. Логика приложения реального времени задается внутри топологии Storm. Проще говоря, топология — это ориентированный граф, где вершины — это вычисления, а ребра — поток данных.
Простая топология начинается с носиков. Носик испускает данные на один или несколько болтов. Болт представляет узел в топологии, имеющий наименьшую логику обработки, и вывод болта может быть передан в другой болт в качестве ввода.
Storm постоянно поддерживает топологию, пока вы не уничтожите топологию. Основная задача Apache Storm — запускать топологию и одновременно запускать любое количество топологий.
Задачи
Теперь у вас есть общее представление о носиках и болтах. Они представляют собой наименьшую логическую единицу топологии, и топология строится с использованием одного излива и набора болтов. Они должны быть выполнены правильно в определенном порядке для успешной работы топологии. Выполнение каждой струи и удара Storm называется «Задачами». Говоря простыми словами, задание — это либо выполнение излива, либо засов. В каждый момент времени каждый излив и болт может иметь несколько экземпляров, работающих в нескольких отдельных потоках.
Рабочие
Топология работает распределенным образом на нескольких рабочих узлах. Storm равномерно распределяет задачи по всем рабочим узлам. Роль рабочего узла заключается в том, чтобы прослушивать задания и запускать или останавливать процессы при каждом поступлении нового задания.
Группировка потоков
Поток данных течет от носика к болту или от одного болта к другому болту. Группировка потоков управляет маршрутизацией кортежей в топологии и помогает нам понять поток кортежей в топологии. Есть четыре встроенных группировки, как описано ниже.
Shuffle Grouping
В случайном порядке одинаковое количество кортежей распределяется случайным образом по всем рабочим, выполняющим болты. Следующая диаграмма изображает структуру.
Группировка полей
Поля с одинаковыми значениями в кортежах сгруппированы, а оставшиеся кортежи хранятся снаружи. Затем кортежи с одинаковыми значениями поля отправляются тому же рабочему, выполняющему болты. Например, если поток сгруппирован по полю «word», то кортежи с одинаковой строкой «Hello» будут перемещены к одному и тому же работнику. На следующей диаграмме показано, как работает группировка полей.
Глобальная группировка
Все потоки можно сгруппировать и направить на один болт. Эта группировка отправляет кортежи, сгенерированные всеми экземплярами источника, одному целевому экземпляру (в частности, выберите работника с самым низким ID).
Все группировки
Вся группировка отправляет одну копию каждого кортежа всем экземплярам приемного болта. Этот вид группировки используется для отправки сигналов на болты. Вся группировка полезна для операций объединения.
Apache Storm — кластерная архитектура
Одним из главных достоинств Apache Storm является то, что он является отказоустойчивым и быстрым без распределенного приложения «Single Point of Failure» (SPOF). Мы можем установить Apache Storm на столько систем, сколько необходимо для увеличения емкости приложения.
Давайте посмотрим, как спроектирован кластер Apache Storm и его внутренняя архитектура. Следующая диаграмма изображает дизайн кластера.
Apache Storm имеет два типа узлов: Nimbus (главный узел) и Supervisor (рабочий узел). Нимб является центральным компонентом Apache Storm. Основная задача Nimbus — запуск топологии Storm. Nimbus анализирует топологию и собирает задачу для выполнения. Затем он распространит задачу среди доступного руководителя.
У супервайзера будет один или несколько рабочих процессов. Руководитель делегирует задачи рабочим процессам. Рабочий процесс порождает столько исполнителей, сколько необходимо и запускает задачу. Apache Storm использует внутреннюю распределенную систему обмена сообщениями для связи между нимбомом и супервизорами.
Компоненты | Описание |
---|---|
нимб | Nimbus — главный узел кластера Storm. Все остальные узлы в кластере называются рабочими узлами . Главный узел отвечает за распределение данных между всеми рабочими узлами, назначение задач рабочим узлам и мониторинг сбоев. |
Руководитель | Узлы, которые следуют инструкциям, данным нимбом, называются супервизорами. Супервизор имеет несколько рабочих процессов и управляет рабочими процессами для выполнения задач, назначенных нимбом. |
Рабочий процесс | Рабочий процесс будет выполнять задачи, связанные с определенной топологией. Рабочий процесс не будет запускать задачу сам по себе, вместо этого он создает исполнителей и просит их выполнить определенную задачу. Рабочий процесс будет иметь несколько исполнителей. |
душеприказчик | Исполнитель — не что иное, как единственный поток, порожденный рабочим процессом. Исполнитель выполняет одну или несколько задач, но только для определенного излива или затвора. |
задача | Задача выполняет фактическую обработку данных. Итак, это либо носик, либо болт. |
ZooKeeper Framework |
Apache ZooKeeper — это сервис, используемый кластером (группой узлов) для координации между собой и поддержания общих данных с помощью надежных методов синхронизации. Nimbus не имеет состояния, поэтому от ZooKeeper зависит состояние рабочего узла. ZooKeeper помогает руководителю взаимодействовать с нимбом. Он отвечает за поддержание состояния нимба и супервайзера. |
Apache ZooKeeper — это сервис, используемый кластером (группой узлов) для координации между собой и поддержания общих данных с помощью надежных методов синхронизации. Nimbus не имеет состояния, поэтому от ZooKeeper зависит состояние рабочего узла.
ZooKeeper помогает руководителю взаимодействовать с нимбом. Он отвечает за поддержание состояния нимба и супервайзера.
Шторм без гражданства по своей природе. Несмотря на то, что природа без сохранения состояния имеет свои недостатки, она на самом деле помогает Storm обрабатывать данные в реальном времени самым лучшим и быстрым способом.
Шторм не совсем без гражданства, хотя. Он хранит свое состояние в Apache ZooKeeper. Поскольку состояние доступно в Apache ZooKeeper, сбойный нимб можно перезапустить и заставить работать с того места, где он ушел. Обычно такие сервисные инструменты мониторинга, как monit , отслеживают Nimbus и перезапускают его в случае сбоя.
Apache Storm также имеет продвинутую топологию, называемую топологией Trident, с поддержкой состояний и предоставляет высокоуровневый API, такой как Pig. Мы обсудим все эти функции в следующих главах.
Apache Storm — рабочий процесс
Работающий кластер Storm должен иметь один венчик и одного или нескольких супервизоров. Другим важным узлом является Apache ZooKeeper, который будет использоваться для координации между нимбом и супервизорами.
Давайте теперь внимательно посмотрим на рабочий процесс Apache Storm —
-
Первоначально нимб будет ждать, когда ему будет представлена «Топология шторма».
-
После отправки топологии она обработает топологию и соберет все задачи, которые необходимо выполнить, и порядок, в котором задача должна быть выполнена.
-
Затем нимб будет равномерно распределять задачи по всем доступным руководителям.
-
В определенный промежуток времени все супервизоры отправят сердцебиение в нимб, чтобы сообщить, что они все еще живы.
-
Когда супервизор умирает и не посылает сердцебиение нимбу, тогда он назначает задачи другому супервизору.
-
Когда сам нимб умирает, супервайзеры будут работать над уже назначенной задачей без каких-либо проблем.
-
Как только все задачи будут выполнены, супервизор будет ждать поступления новой задачи.
-
В то же время, мертвый нимб будет автоматически перезапущен инструментами мониторинга сервиса.
-
Перезапущенный нимб продолжится с того места, где он остановился. Аналогично, мертвый супервизор также может быть перезапущен автоматически. Поскольку и венчик, и супервизор могут быть перезапущены автоматически, и оба будут продолжать работу, как и раньше, Storm гарантированно выполнит все задачи хотя бы один раз.
-
Как только все топологии обработаны, венчик ожидает поступления новой топологии, и аналогичным образом супервизор ожидает новых задач.
Первоначально нимб будет ждать, когда ему будет представлена «Топология шторма».
После отправки топологии она обработает топологию и соберет все задачи, которые необходимо выполнить, и порядок, в котором задача должна быть выполнена.
Затем нимб будет равномерно распределять задачи по всем доступным руководителям.
В определенный промежуток времени все супервизоры отправят сердцебиение в нимб, чтобы сообщить, что они все еще живы.
Когда супервизор умирает и не посылает сердцебиение нимбу, тогда он назначает задачи другому супервизору.
Когда сам нимб умирает, супервайзеры будут работать над уже назначенной задачей без каких-либо проблем.
Как только все задачи будут выполнены, супервизор будет ждать поступления новой задачи.
В то же время, мертвый нимб будет автоматически перезапущен инструментами мониторинга сервиса.
Перезапущенный нимб продолжится с того места, где он остановился. Аналогично, мертвый супервизор также может быть перезапущен автоматически. Поскольку и венчик, и супервизор могут быть перезапущены автоматически, и оба будут продолжать работу, как и раньше, Storm гарантированно выполнит все задачи хотя бы один раз.
Как только все топологии обработаны, венчик ожидает поступления новой топологии, и аналогичным образом супервизор ожидает новых задач.
По умолчанию в кластере Storm есть два режима:
-
Локальный режим — этот режим используется для разработки, тестирования и отладки, потому что это самый простой способ увидеть, как все компоненты топологии работают вместе. В этом режиме мы можем настроить параметры, которые позволяют нам видеть, как наша топология работает в различных средах конфигурации Storm. В локальном режиме штормовые топологии выполняются на локальной машине в одной виртуальной машине Java.
-
Рабочий режим. В этом режиме мы передаем нашу топологию в работающий штормовой кластер, который состоит из множества процессов, обычно выполняемых на разных машинах. Как обсуждалось в рабочем процессе шторма, работающий кластер будет работать бесконечно, пока не будет остановлен.
Локальный режим — этот режим используется для разработки, тестирования и отладки, потому что это самый простой способ увидеть, как все компоненты топологии работают вместе. В этом режиме мы можем настроить параметры, которые позволяют нам видеть, как наша топология работает в различных средах конфигурации Storm. В локальном режиме штормовые топологии выполняются на локальной машине в одной виртуальной машине Java.
Рабочий режим. В этом режиме мы передаем нашу топологию в работающий штормовой кластер, который состоит из множества процессов, обычно выполняемых на разных машинах. Как обсуждалось в рабочем процессе шторма, работающий кластер будет работать бесконечно, пока не будет остановлен.
Storm — распределенная система обмена сообщениями
Apache Storm обрабатывает данные в режиме реального времени и обычно поступает из системы очередей сообщений. Внешняя распределенная система обмена сообщениями предоставит входные данные, необходимые для вычислений в реальном времени. Spout будет считывать данные из системы обмена сообщениями, преобразовывать их в кортежи и вводить в Apache Storm. Интересным фактом является то, что Apache Storm использует собственную распределенную систему обмена сообщениями для связи между нимбом и супервизором.
Что такое распределенная система обмена сообщениями?
Распределенный обмен сообщениями основан на концепции надежной очереди сообщений. Сообщения помещаются в очередь асинхронно между клиентскими приложениями и системами обмена сообщениями. Распределенная система обмена сообщениями обеспечивает преимущества надежности, масштабируемости и постоянства.
Большинство шаблонов обмена сообщениями следуют модели публикации-подписки (просто Pub-Sub ), где отправители сообщений называются издателями, а те, кто хочет получать сообщения, называются подписчиками .
После того как сообщение было опубликовано отправителем, подписчики могут получить выбранное сообщение с помощью опции фильтрации. Обычно у нас есть два типа фильтрации: один — тематическая фильтрация, а другой — контентная фильтрация .
Обратите внимание, что модель pub-sub может общаться только через сообщения. Это очень слабо связанная архитектура; даже отправители не знают, кто их подписчики. Многие шаблоны сообщений позволяют брокеру сообщений обмениваться публикационными сообщениями для своевременного доступа многих подписчиков. Примером из реальной жизни является Dish TV, который публикует различные каналы, такие как спортивные состязания, фильмы, музыка и т. Д., И любой может подписаться на свой собственный набор каналов и получать их, когда доступны их подписанные каналы.
В следующей таблице описаны некоторые популярные системы обмена сообщениями с высокой пропускной способностью.
Распределенная система обмена сообщениями | Описание |
---|---|
Апач Кафка | Кафка была разработана в корпорации LinkedIn и позже стала подпроектом Apache. Apache Kafka основан на устойчивой, распределенной модели публикации и подписки. Кафка быстрая, масштабируемая и высокоэффективная. |
RabbitMQ | RabbitMQ — это приложение для распределенного надежного обмена сообщениями с открытым исходным кодом. Он прост в использовании и работает на всех платформах. |
JMS (служба сообщений Java) | JMS — это API с открытым исходным кодом, который поддерживает создание, чтение и отправку сообщений из одного приложения в другое. Он обеспечивает гарантированную доставку сообщений и следует модели публикации и подписки. |
ActiveMQ | Система обмена сообщениями ActiveMQ — это API JMS с открытым исходным кодом. |
ZeroMQ | ZeroMQ — это одноранговая обработка сообщений без посредников. Он предоставляет двухтактные шаблоны сообщений маршрутизатора и дилера. |
Пустельга | Kestrel — это быстрая, надежная и простая распределенная очередь сообщений. |
Комиссионный протокол
Thrift был построен в Facebook для разработки мультиязычных сервисов и удаленного вызова процедур (RPC). Позже это стало проектом Apache с открытым исходным кодом. Apache Thrift является языком определения интерфейса и позволяет легко определять новые типы данных и реализацию сервисов поверх определенных типов данных.
Apache Thrift также является коммуникационной средой, которая поддерживает встроенные системы, мобильные приложения, веб-приложения и многие другие языки программирования. Некоторые из ключевых особенностей, связанных с Apache Thrift, это его модульность, гибкость и высокая производительность. Кроме того, он может выполнять потоковую передачу, обмен сообщениями и RPC в распределенных приложениях.
Storm широко использует Thrift Protocol для внутренней коммуникации и определения данных. Топология шторма — это просто Thrift Structs . Storm Nimbus с топологией в Apache Storm — это сервис Thrift .
Apache Storm — Установка
Давайте теперь посмотрим, как установить фреймворк Apache Storm на ваш компьютер. Здесь есть три ступени майо —
- Установите Java в вашей системе, если у вас ее еще нет.
- Установите фреймворк ZooKeeper.
- Установите фреймворк Apache Storm.
Шаг 1 — Проверка установки Java
Используйте следующую команду, чтобы проверить, установлена ли у вас Java в вашей системе.
$ java -version
Если Java уже существует, вы увидите ее номер версии. Иначе, скачайте последнюю версию JDK.
Шаг 1.1 — Скачать JDK
Загрузите последнюю версию JDK по следующей ссылке — www.oracle.com
Последняя версия — JDK 8u 60, а файл — «jdk-8u60-linux-x64.tar.gz» . Загрузите файл на свой компьютер.
Шаг 1.2 — Извлечение файлов
Обычно файлы загружаются в папку загрузок . Распакуйте настройки tar с помощью следующих команд.
$ cd /go/to/download/path $ tar -zxf jdk-8u60-linux-x64.gz
Шаг 1.3 — Перейдите в каталог opt
Чтобы сделать Java доступным для всех пользователей, переместите извлеченный контент Java в папку «/ usr / local / java».
$ su password: (type password of root user) $ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/
Шаг 1.4 — Установить путь
Чтобы установить переменные path и JAVA_HOME, добавьте следующие команды в файл ~ / .bashrc.
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60 export PATH=$PATH:$JAVA_HOME/bin
Теперь примените все изменения в текущей работающей системе.
$ source ~/.bashrc
Шаг 1.5 — Альтернативы Java
Используйте следующую команду, чтобы изменить альтернативы Java.
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
Шаг 1.6
Теперь проверьте установку Java с помощью команды проверки (java -version), описанной в шаге 1.
Шаг 2 — Установка ZooKeeper Framework
Шаг 2.1 — Скачать ZooKeeper
Чтобы установить ZooKeeper Framework на свой компьютер, перейдите по следующей ссылке и загрузите последнюю версию ZooKeeper http://zookeeper.apache.org/releases.html.
На данный момент последняя версия ZooKeeper — 3.4.6 (ZooKeeper-3.4.6.tar.gz).
Шаг 2.2 — Извлечение файла tar
Распакуйте файл tar, используя следующие команды:
$ cd opt/ $ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6 $ mkdir data
Шаг 2.3 — Создание файла конфигурации
Откройте файл конфигурации с именем «conf / zoo.cfg», используя команду «vi conf / zoo.cfg» и установив все следующие параметры в качестве отправной точки.
$ vi conf/zoo.cfg tickTime=2000 dataDir=/path/to/zookeeper/data clientPort=2181 initLimit=5 syncLimit=2
Как только файл конфигурации будет успешно сохранен, вы можете запустить сервер ZooKeeper.
Шаг 2.4 — Запустите ZooKeeper Server
Используйте следующую команду для запуска сервера ZooKeeper.
$ bin/zkServer.sh start
После выполнения этой команды вы получите ответ:
$ JMX enabled by default $ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED
Шаг 2.5 — Запустите CLI
Используйте следующую команду, чтобы запустить CLI.
$ bin/zkCli.sh
После выполнения вышеуказанной команды вы будете подключены к серверу ZooKeeper и получите следующий ответ.
Connecting to localhost:2181 ................ ................ ................ Welcome to ZooKeeper! ................ ................ WATCHER:: WatchedEvent state:SyncConnected type: None path:null [zk: localhost:2181(CONNECTED) 0]
Шаг 2.6 — остановка сервера ZooKeeper
После подключения сервера и выполнения всех операций вы можете остановить сервер ZooKeeper с помощью следующей команды.
bin/zkServer.sh stop
Вы успешно установили Java и ZooKeeper на свой компьютер. Давайте теперь посмотрим, как установить платформу Apache Storm.
Шаг 3 — Установка Apache Storm Framework
Шаг 3.1 Скачать Storm
Чтобы установить среду Storm на свой компьютер, перейдите по следующей ссылке и загрузите последнюю версию Storm http://storm.apache.org/downloads.html.
На данный момент последней версией Storm является «apache-storm-0.9.5.tar.gz».
Шаг 3.2 — Извлечение файла tar
Распакуйте файл tar, используя следующие команды:
$ cd opt/ $ tar -zxf apache-storm-0.9.5.tar.gz $ cd apache-storm-0.9.5 $ mkdir data
Шаг 3.3 — Открыть файл конфигурации
Текущая версия Storm содержит файл «conf / storm.yaml», который настраивает демоны Storm. Добавьте следующую информацию в этот файл.
$ vi conf/storm.yaml storm.zookeeper.servers: - "localhost" storm.local.dir: “/path/to/storm/data(any path)” nimbus.host: "localhost" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
После внесения всех изменений сохраните и вернитесь в терминал.
Шаг 3.4 — Запустите Нимб
$ bin/storm nimbus
Шаг 3.5 — Запустите супервизор
$ bin/storm supervisor
Шаг 3.6 Запустите интерфейс
$ bin/storm ui
После запуска приложения с пользовательским интерфейсом Storm введите URL-адрес http: // localhost: 8080 в своем любимом браузере, и вы увидите информацию о кластере Storm и его топологию. Страница должна выглядеть примерно так, как на следующем скриншоте.
Apache Storm — рабочий пример
Мы рассмотрели основные технические детали Apache Storm, и теперь пришло время написать несколько простых сценариев.
Сценарий — Анализатор журнала мобильных вызовов
Мобильный вызов и его продолжительность будут переданы в качестве ввода для Apache Storm, и Storm обработает и сгруппирует вызов между одним и тем же вызывающим абонентом и получателем и их общим количеством вызовов.
Создание носика
Носик является компонентом, который используется для генерации данных. По сути, носик будет реализовывать интерфейс IRichSpout. Интерфейс «IRichSpout» имеет следующие важные методы —
-
open — предоставляет носику среду для выполнения. Исполнители запустят этот метод для инициализации носика.
-
nextTuple — испускает сгенерированные данные через коллектор.
-
close — этот метод вызывается, когда носик отключается.
-
DeclareOutputFields — Объявляет схему вывода кортежа.
-
ack — подтверждает, что обрабатывается определенный кортеж
-
fail — указывает, что определенный кортеж не обработан и не подлежит повторной обработке.
open — предоставляет носику среду для выполнения. Исполнители запустят этот метод для инициализации носика.
nextTuple — испускает сгенерированные данные через коллектор.
close — этот метод вызывается, когда носик отключается.
DeclareOutputFields — Объявляет схему вывода кортежа.
ack — подтверждает, что обрабатывается определенный кортеж
fail — указывает, что определенный кортеж не обработан и не подлежит повторной обработке.
открыто
Подпись открытого метода выглядит следующим образом:
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
-
conf — Предоставляет конфигурацию шторма для этого излива.
-
context — предоставляет полную информацию о месте излива в топологии, его идентификаторе задачи, входной и выходной информации.
-
сборщик — позволяет нам выпустить кортеж, который будет обработан болтами.
conf — Предоставляет конфигурацию шторма для этого излива.
context — предоставляет полную информацию о месте излива в топологии, его идентификаторе задачи, входной и выходной информации.
сборщик — позволяет нам выпустить кортеж, который будет обработан болтами.
nextTuple
Сигнатура метода nextTuple выглядит следующим образом:
nextTuple()
nextTuple () периодически вызывается из того же цикла, что и методы ack () и fail (). Он должен освободить управление потоком, когда нет работы, чтобы другие методы имели возможность вызываться. Поэтому первая строка nextTuple проверяет, завершена ли обработка. Если это так, он должен оставаться в режиме ожидания не менее одной миллисекунды, чтобы снизить нагрузку на процессор перед возвратом.
близко
Подпись метода close выглядит следующим образом:
close()
declareOutputFields
Подпись метода DeclareOutputFields выглядит следующим образом:
declareOutputFields(OutputFieldsDeclarer declarer)
Объявление — используется для объявления идентификаторов выходного потока, полей вывода и т. д.
Этот метод используется для указания схемы вывода кортежа.
извед
Суть метода ack заключается в следующем:
ack(Object msgId)
Этот метод подтверждает, что определенный кортеж был обработан.
потерпеть поражение
Сигнатура метода nextTuple выглядит следующим образом:
ack(Object msgId)
Этот метод сообщает, что определенный кортеж не был полностью обработан. Storm обработает определенный кортеж.
FakeCallLogReaderSpout
В нашем сценарии нам нужно собрать данные журнала вызовов. Информация из журнала звонков содержит.
- номер звонящего
- номер получателя
- продолжительность
Поскольку у нас нет информации о журналах вызовов в реальном времени, мы будем генерировать поддельные журналы вызовов. Поддельная информация будет создана с использованием класса Random. Полный код программы приведен ниже.
Кодирование — FakeCallLogReaderSpout.java
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Создание болта
Bolt — это компонент, который принимает кортежи в качестве входных данных, обрабатывает кортежи и создает новые кортежи в качестве выходных данных. Болты будут реализовывать интерфейс IRichBolt . В этой программе для выполнения операций используются два класса болтов CallLogCreatorBolt и CallLogCounterBolt .
Интерфейс IRichBolt имеет следующие методы —
-
подготовить — предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.
-
execute — обработать один кортеж ввода.
-
очистка — вызывается, когда затвор собирается отключиться.
-
DeclareOutputFields — Объявляет схему вывода кортежа.
подготовить — предоставляет болту среду для выполнения. Исполнители запустят этот метод для инициализации носика.
execute — обработать один кортеж ввода.
очистка — вызывается, когда затвор собирается отключиться.
DeclareOutputFields — Объявляет схему вывода кортежа.
Подготовить
Подпись метода подготовки выглядит следующим образом:
prepare(Map conf, TopologyContext context, OutputCollector collector)
-
conf — Предоставляет конфигурацию шторма для этого болта.
-
контекст — предоставляет полную информацию о месте болта в топологии, его идентификаторе задачи, информации ввода и вывода и т. д.
-
collector — позволяет нам генерировать обработанный кортеж.
conf — Предоставляет конфигурацию шторма для этого болта.
контекст — предоставляет полную информацию о месте болта в топологии, его идентификаторе задачи, информации ввода и вывода и т. д.
collector — позволяет нам генерировать обработанный кортеж.
выполнять
Подпись метода execute следующая:
execute(Tuple tuple)
Здесь кортеж — это входной кортеж, который нужно обработать.
Метод execute обрабатывает один кортеж за раз. Доступ к данным кортежа можно получить с помощью метода getValue класса Tuple. Нет необходимости немедленно обрабатывать входной кортеж. Несколько кортежей можно обрабатывать и выводить как один кортеж. Обработанный кортеж может быть передан с помощью класса OutputCollector.
уборка
Подпись метода очистки выглядит следующим образом:
cleanup()
declareOutputFields
Подпись метода DeclareOutputFields выглядит следующим образом:
declareOutputFields(OutputFieldsDeclarer declarer)
Здесь описатель параметров используется для объявления идентификаторов выходного потока, полей вывода и т. Д.
Этот метод используется для указания схемы вывода кортежа
Журнал звонков Creator Bolt
Болт создателя журнала вызовов получает кортеж журнала вызовов. Кортеж журнала вызовов содержит номер звонящего, номер получателя и продолжительность звонка. Этот болт просто создает новое значение, комбинируя номер звонящего и номер получателя. Формат нового значения — «Номер вызывающего абонента — Номер получателя», и оно называется новым полем «Вызов». Полный код приведен ниже.
Кодирование — CallLogCreatorBolt.java
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Счетчик звонков
Болт счетчика журнала вызовов принимает вызов и его продолжительность в виде кортежа. Этот болт инициализирует объект словаря (Map) в методе prepare. В методе execute он проверяет кортеж и создает новую запись в объекте словаря для каждого нового значения «call» в кортеже и устанавливает значение 1 в объекте словаря. Для уже доступной записи в словаре она просто увеличивает свое значение. Проще говоря, этот болт сохраняет вызов и его счет в объекте словаря. Вместо сохранения вызова и его количества в словаре, мы также можем сохранить его в источнике данных. Полный код программы выглядит следующим образом:
Кодирование — CallLogCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Создание топологии
Топология Storm — это в основном структура Thrift. Класс TopologyBuilder предоставляет простые и легкие методы для создания сложных топологий. Класс TopologyBuilder имеет методы для установки spout (setSpout) и для установки болта (setBolt) . Наконец, TopologyBuilder имеет createTopology для создания топологии. Используйте следующий фрагмент кода для создания топологии —
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
Методы shuffleGrouping и fieldsGrouping помогают установить группировку потока для носика и болтов.
Локальный кластер
В целях разработки мы можем создать локальный кластер, используя объект «LocalCluster», а затем передать топологию, используя метод submitTopology класса «LocalCluster». Одним из аргументов для submitTopology является экземпляр класса Config. Класс «Config» используется для настройки параметров конфигурации перед отправкой топологии. Эта опция конфигурации будет объединена с конфигурацией кластера во время выполнения и отправлена всем задачам (носик и болт) методом подготовки. Как только топология будет передана в кластер, мы подождем 10 секунд, пока кластер вычислит представленную топологию, а затем остановим кластер, используя метод «shutdown» «LocalCluster». Полный код программы выглядит следующим образом:
Кодирование — LogAnalyserStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
Сборка и запуск приложения
Полное приложение имеет четыре кода Java. Они —
- FakeCallLogReaderSpout.java
- CallLogCreaterBolt.java
- CallLogCounterBolt.java
- LogAnalyerStorm.java
Приложение может быть построено с помощью следующей команды —
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
Приложение можно запустить с помощью следующей команды —
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
Выход
Как только приложение запустится, оно выведет полную информацию о процессе запуска кластера, обработке носика и болтов и, наконец, о процессе отключения кластера. В «CallLogCounterBolt» мы распечатали вызов и детали его подсчета. Эта информация будет отображаться на консоли следующим образом —
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
Языки не JVM
Штормовые топологии реализуются интерфейсами Thrift, что позволяет легко передавать топологии на любом языке. Storm поддерживает Ruby, Python и многие другие языки. Давайте посмотрим на привязку Python.
Python Binding
Python — это интерпретируемый, интерактивный, объектно-ориентированный и высокоуровневый язык программирования общего назначения. Storm поддерживает Python для реализации своей топологии. Python поддерживает генерацию, привязку, фиксацию и регистрацию.
Как известно, болты могут быть определены на любом языке. Болты, написанные на другом языке, выполняются как подпроцессы, и Storm связывается с этими подпроцессами с помощью сообщений JSON через stdin / stdout. Сначала возьмем образец болта WordCount, который поддерживает связывание Python.
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
Здесь класс WordCount реализует интерфейс IRichBolt и работает с реализацией Python, указанным в качестве аргумента супер-метода «splitword.py». Теперь создайте реализацию Python с именем «splitword.py».
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()
Это пример реализации для Python, который считает слова в данном предложении. Точно так же вы можете связать с другими языками поддержки.
Apache Storm — Трезубец
Трайдент — это продолжение Шторма. Как и Storm, Trident был также разработан Twitter. Основной причиной разработки Trident является предоставление высокоуровневой абстракции поверх Storm наряду с обработкой потоков с отслеживанием состояния и распределенными запросами с низкой задержкой.
Trident использует носик и болт, но эти низкоуровневые компоненты автоматически создаются Trident перед выполнением. У Trident есть функции, фильтры, объединения, группировка и агрегация.
Trident обрабатывает потоки как серию пакетов, которые называются транзакциями. Обычно размер этих небольших пакетов будет порядка тысяч или миллионов кортежей, в зависимости от входного потока. Таким образом, Trident отличается от Storm, который выполняет обработку по кортежу.
Концепция пакетной обработки очень похожа на транзакции базы данных. Каждой транзакции присваивается идентификатор транзакции. Транзакция считается успешной, как только вся ее обработка завершена. Однако сбой в обработке одного из кортежей транзакции приведет к повторной передаче всей транзакции. Для каждого пакета Trident будет вызывать beginCommit в начале транзакции и фиксировать в конце транзакции.
Топология трезубца
Trident API предоставляет простой вариант создания топологии Trident с использованием класса «TridentTopology». По сути, топология Trident получает входной поток из носика и выполняет упорядоченную последовательность операций (фильтрацию, агрегацию, группировку и т. Д.) В потоке. Storm Tuple заменяется на Trident Tuple, а болты заменяются операциями. Простая топология Trident может быть создана следующим образом:
TridentTopology topology = new TridentTopology();
Трайдент Туплс
Кортеж Trident — это именованный список значений. Интерфейс TridentTuple — это модель данных топологии Trident. Интерфейс TridentTuple — это основная единица данных, которая может обрабатываться топологией Trident.
Трезубец Носик
Носик Trident похож на носик Storm, с дополнительными опциями для использования функций Trident. На самом деле, мы все еще можем использовать IRichSpout, который мы использовали в топологии Storm, но он будет не транзакционным по своей природе, и мы не сможем использовать преимущества, предоставляемые Trident.
Основной носик, имеющий все функциональные возможности для использования функций Trident, является «ITridentSpout». Он поддерживает как транзакционную, так и непрозрачную транзакционную семантику. Другими источниками являются IBatchSpout, IPartitionedTridentSpout и IOpaquePartitionedTridentSpout.
В дополнение к этим универсальным носикам, Trident имеет множество примеров реализации носика trident. Одним из них является носик FeederBatchSpout, который мы можем использовать для простой отправки именованного списка кортежей трезубца, не беспокоясь о пакетной обработке, параллелизме и т. Д.
Создание FeederBatchSpout и подача данных могут быть выполнены, как показано ниже —
TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout( ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”)); topology.newStream("fixed-batch-spout", testSpout) testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
Трезубец Операции
Trident полагается на «Операцию Trident» для обработки входного потока кортежей трезубца. Trident API имеет ряд встроенных операций для обработки потоковой обработки от простого к сложному. Эти операции варьируются от простой проверки до сложной группировки и агрегации кортежей трезубца. Давайте пройдемся по самым важным и часто используемым операциям.
Фильтр
Фильтр — это объект, используемый для выполнения проверки ввода. Фильтр Trident получает подмножество полей кортежа trident в качестве входных данных и возвращает либо true, либо false, в зависимости от того, выполнены определенные условия или нет. Если true возвращается, тогда кортеж сохраняется в выходном потоке; в противном случае кортеж удаляется из потока. Фильтр будет в основном наследоваться от класса BaseFilter и реализовывать метод isKeep . Вот пример реализации операции фильтра —
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(1) % 2 == 0; } } input [1, 2] [1, 3] [1, 4] output [1, 2] [1, 4]
Функция фильтра может быть вызвана в топологии с использованием метода «каждый». Класс «Поля» может использоваться для указания входных данных (подмножество кортежа трезубца). Пример кода выглядит следующим образом —
TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields("a", "b"), new MyFilter())
функция
Функция — это объект, используемый для выполнения простой операции с одним кортежем трезубца. Он принимает подмножество полей кортежа трезубца и испускает ноль или более новых полей кортежа трезубца.
Функция в основном наследуется от класса BaseFunction и реализует метод execute . Пример реализации приведен ниже —
public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int a = tuple.getInteger(0); int b = tuple.getInteger(1); collector.emit(new Values(a + b)); } } input [1, 2] [1, 3] [1, 4] output [1, 2, 3] [1, 3, 4] [1, 4, 5]
Как и операция Filter, функция Function может вызываться в топологии с использованием каждого метода. Пример кода выглядит следующим образом —
TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
агрегирование
Агрегация — это объект, используемый для выполнения операций агрегации во входном пакете, разделе или потоке. Трайдент имеет три типа агрегации. Они заключаются в следующем —
-
агрегат — агрегирует каждую партию кортежа трезубца изолированно. В процессе агрегации кортежи первоначально перераспределяются с использованием глобальной группировки, чтобы объединить все разделы одного пакета в один раздел.
-
partitionAggregate — агрегирует каждый раздел вместо всего пакета трезубец кортежа. Выходные данные агрегата разделов полностью заменяют входной кортеж. Выходные данные агрегата раздела содержат один кортеж поля.
-
persistentaggregate — агрегирует все кортежи трезубца во всех пакетах и сохраняет результат либо в памяти, либо в базе данных.
агрегат — агрегирует каждую партию кортежа трезубца изолированно. В процессе агрегации кортежи первоначально перераспределяются с использованием глобальной группировки, чтобы объединить все разделы одного пакета в один раздел.
partitionAggregate — агрегирует каждый раздел вместо всего пакета трезубец кортежа. Выходные данные агрегата разделов полностью заменяют входной кортеж. Выходные данные агрегата раздела содержат один кортеж поля.
persistentaggregate — агрегирует все кортежи трезубца во всех пакетах и сохраняет результат либо в памяти, либо в базе данных.
TridentTopology topology = new TridentTopology(); // aggregate operation topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .aggregate(new Count(), new Fields(“count”)) // partitionAggregate operation topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .partitionAggregate(new Count(), new Fields(“count")) // persistentAggregate - saving the count to memory topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
Операция агрегации может быть создана с использованием CombinerAggregator, ReducerAggregator или универсального интерфейса Aggregator. Агрегатор «count», используемый в приведенном выше примере, является одним из встроенных агрегаторов. Он реализован с использованием «CombinerAggregator». Реализация выглядит следующим образом:
public class Count implements CombinerAggregator<Long> { @Override public Long init(TridentTuple tuple) { return 1L; } @Override public Long combine(Long val1, Long val2) { return val1 + val2; } @Override public Long zero() { return 0L; } }
группирование
Операция группировки является встроенной операцией и может вызываться методом groupBy . Метод groupBy перераспределяет поток, выполняя partitionBy для указанных полей, а затем внутри каждого раздела группирует кортежи, чьи групповые поля равны. Обычно мы используем «groupBy» вместе с «persistentAggregate», чтобы получить группированную агрегацию. Пример кода выглядит следующим образом —
TridentTopology topology = new TridentTopology(); // persistentAggregate - saving the count to memory topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .groupBy(new Fields(“d”) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
Слияние и соединение
Слияние и объединение могут быть выполнены с использованием методов «слияния» и «соединения» соответственно. Слияние объединяет один или несколько потоков. Объединение аналогично объединению, за исключением того, что при объединении используется поле кортежа трезубца с обеих сторон для проверки и объединения двух потоков. Кроме того, объединение будет работать только на уровне партии. Пример кода выглядит следующим образом —
TridentTopology topology = new TridentTopology(); topology.merge(stream1, stream2, stream3); topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
Государственное обслуживание
Трайдент предоставляет механизм для поддержания государства. Информация о состоянии может храниться в самой топологии, в противном случае вы также можете хранить ее в отдельной базе данных. Причина заключается в том, чтобы поддерживать состояние, при котором во время обработки происходит сбой любого кортежа, а затем сбойный кортеж повторяется. Это создает проблему при обновлении состояния, поскольку вы не уверены, было ли ранее обновлено состояние этого кортежа или нет. Если перед обновлением состояния произошел сбой в кортеже, то повторная попытка кортежа сделает состояние стабильным. Однако если после обновления состояния произошел сбой в кортеже, повторная попытка того же кортежа снова увеличит число в базе данных и сделает состояние нестабильным. Чтобы убедиться, что сообщение обработано только один раз, необходимо выполнить следующие шаги:
-
Обработайте кортежи небольшими партиями.
-
Присвойте уникальный идентификатор каждой партии. Если партия повторяется, ей присваивается тот же уникальный идентификатор.
-
Обновления состояния заказываются между партиями. Например, обновление состояния второго пакета будет невозможно до тех пор, пока обновление состояния для первого пакета не будет завершено.
Обработайте кортежи небольшими партиями.
Присвойте уникальный идентификатор каждой партии. Если партия повторяется, ей присваивается тот же уникальный идентификатор.
Обновления состояния заказываются между партиями. Например, обновление состояния второго пакета будет невозможно до тех пор, пока обновление состояния для первого пакета не будет завершено.
Распределенный RPC
Распределенный RPC используется для запроса и получения результата из топологии Trident. Storm имеет встроенный распределенный RPC-сервер. Распределенный сервер RPC получает запрос RPC от клиента и передает его в топологию. Топология обрабатывает запрос и отправляет результат на распределенный RPC-сервер, который перенаправляется распределенным RPC-сервером клиенту. Распределенный RPC-запрос Trident выполняется как обычный RPC-запрос, за исключением того факта, что эти запросы выполняются параллельно.
Когда использовать Trident?
Как и во многих случаях использования, если требование обрабатывать запрос только один раз, мы можем достичь этого, написав топологию в Trident. С другой стороны, в случае Storm будет трудно добиться ровно однократной обработки. Следовательно, Trident будет полезен для тех случаев, когда вам требуется ровно один раз обработки. Trident подходит не для всех вариантов использования, особенно для высокопроизводительных, поскольку он добавляет сложности в Storm и управляет состоянием.
Рабочий пример Трайдента
Мы собираемся преобразовать наше приложение для анализа журнала вызовов, разработанное в предыдущем разделе, в среду Trident. Приложение Trident будет относительно простым по сравнению с обычным штормом благодаря высокоуровневому API. Storm в основном потребуется для выполнения любой из функций Function, Filter, Aggregate, GroupBy, Join и Merge в Trident. Наконец, мы запустим сервер DRPC, используя класс LocalDRPC, и произведем поиск по ключевому слову, используя метод execute класса LocalDRPC.
Форматирование информации о звонке
Целью класса FormatCall является форматирование информации о вызове, содержащей «Номер вызывающего абонента» и «Номер получателя». Полный код программы выглядит следующим образом:
Кодирование: FormatCall.java
import backtype.storm.tuple.Values; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; public class FormatCall extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String fromMobileNumber = tuple.getString(0); String toMobileNumber = tuple.getString(1); collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber)); } }
CSVSplit
Целью класса CSVSplit является разделение входной строки на основе «запятая (,)» и испускание каждого слова в строке. Эта функция используется для анализа входного аргумента распределенных запросов. Полный код выглядит следующим образом —
Кодирование: CSVSplit.java
import backtype.storm.tuple.Values; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; public class CSVSplit extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { for(String word: tuple.getString(0).split(",")) { if(word.length() > 0) { collector.emit(new Values(word)); } } } }
Log Analyzer
Это основное приложение. Сначала приложение будет инициализировать TridentTopology и информацию о вызывающем абоненте с помощью FeederBatchSpout . Поток топологии Trident может быть создан с использованием метода newStream класса TridentTopology. Точно так же поток DRPC топологии Trident может быть создан с использованием метода newDRCPStream класса TridentTopology. Простой сервер DRCP может быть создан с использованием класса LocalDRPC. LocalDRPC имеет метод execute для поиска по ключевому слову. Полный код приведен ниже.
Кодирование: LogAnalyserTrident.java
import java.util.*; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.utils.DRPCClient; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.tuple.TridentTuple; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.Sum; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Debug; import storm.trident.operation.BaseFilter; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.FeederBatchSpout; import storm.trident.testing.Split; import storm.trident.testing.MemoryMapState; import com.google.common.collect.ImmutableList; public class LogAnalyserTrident { public static void main(String[] args) throws Exception { System.out.println("Log Analyser Trident"); TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber", "toMobileNumber", "duration")); TridentState callCounts = topology .newStream("fixed-batch-spout", testSpout) .each(new Fields("fromMobileNumber", "toMobileNumber"), new FormatCall(), new Fields("call")) .groupBy(new Fields("call")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); LocalDRPC drpc = new LocalDRPC(); topology.newDRPCStream("call_count", drpc) .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count")); topology.newDRPCStream("multiple_call_count", drpc) .each(new Fields("args"), new CSVSplit(), new Fields("call")) .groupBy(new Fields("call")) .stateQuery(callCounts, new Fields("call"), new MapGet(), new Fields("count")) .each(new Fields("call", "count"), new Debug()) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("trident", conf, topology.build()); Random randomGenerator = new Random(); int idx = 0; while(idx < 10) { testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123403", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123404", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123402", "1234123403", randomGenerator.nextInt(60)))); idx = idx + 1; } System.out.println("DRPC : Query starts"); System.out.println(drpc.execute("call_count","1234123401 - 1234123402")); System.out.println(drpc.execute("multiple_call_count", "1234123401 - 1234123402,1234123401 - 1234123403")); System.out.println("DRPC : Query ends"); cluster.shutdown(); drpc.shutdown(); // DRPCClient client = new DRPCClient("drpc.server.location", 3772); } }
Сборка и запуск приложения
Полное приложение имеет три кода Java. Они заключаются в следующем —
- FormatCall.java
- CSVSplit.java
- LogAnalyerTrident.java
Приложение может быть построено с помощью следующей команды —
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
Приложение можно запустить с помощью следующей команды —
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
Выход
Как только приложение будет запущено, оно выведет полную информацию о процессе запуска кластера, обработке операций, информации о сервере DRPC и клиенте и, наконец, о процессе остановки кластера. Этот вывод будет отображаться на консоли, как показано ниже.
DRPC : Query starts [["1234123401 - 1234123402",10]] DEBUG: [1234123401 - 1234123402, 10] DEBUG: [1234123401 - 1234123403, 10] [[20]] DRPC : Query ends
Apache Storm в твиттере
Здесь, в этой главе, мы обсудим приложение Apache Storm в реальном времени. Мы увидим, как Storm используется в Twitter.
щебет
Twitter — это онлайн-сервис социальных сетей, который предоставляет платформу для отправки и получения пользовательских твитов. Зарегистрированные пользователи могут читать и публиковать твиты, но незарегистрированные пользователи могут читать только твиты. Hashtag используется для классификации твитов по ключевым словам, добавляя # перед соответствующим ключевым словом. Теперь давайте возьмем в режиме реального времени сценарий поиска наиболее используемого хэштега по теме.
Создание носика
Цель spout — как можно быстрее получать твиты от людей. Twitter предоставляет «Twitter Streaming API», инструмент, основанный на веб-сервисах, для получения твитов, отправленных людьми в режиме реального времени. API Twitter Streaming доступен на любом языке программирования.
twitter4j — это неофициальная библиотека Java с открытым исходным кодом, которая предоставляет модуль на основе Java для быстрого доступа к API потоковой передачи Twitter. twitter4j предоставляет основанную на слушателе структуру для доступа к твитам. Чтобы получить доступ к API потоковой передачи Twitter, нам нужно войти в учетную запись разработчика Twitter и получить следующие данные аутентификации OAuth.
- Customerkey
- CustomerSecret
- маркер доступа
- AccessTookenSecret
Storm предоставляет в своем стартовом наборе твиттер TwitterSampleSpout. Мы будем использовать его для получения твитов. Для носика требуются данные аутентификации OAuth и хотя бы ключевое слово. Носик будет генерировать твиты в реальном времени на основе ключевых слов. Полный код программы приведен ниже.
Кодирование: TwitterSampleSpout.java
import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.FilterQuery; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.auth.AccessToken; import twitter4j.conf.ConfigurationBuilder; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; @SuppressWarnings("serial") public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String consumerKey; String consumerSecret; String accessToken; String accessTokenSecret; String[] keyWords; public TwitterSampleSpout(String consumerKey, String consumerSecret, String accessToken, String accessTokenSecret, String[] keyWords) { this.consumerKey = consumerKey; this.consumerSecret = consumerSecret; this.accessToken = accessToken; this.accessTokenSecret = accessTokenSecret; this.keyWords = keyWords; } public TwitterSampleSpout() { // TODO Auto-generated constructor stub } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) {} @Override public void onTrackLimitationNotice(int i) {} @Override public void onScrubGeo(long l, long l1) {} @Override public void onException(Exception ex) {} @Override public void onStallWarning(StallWarning arg0) { // TODO Auto-generated method stub } }; ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); _twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); _twitterStream.addListener(listener); if (keyWords.length == 0) { _twitterStream.sample(); }else { FilterQuery query = new FilterQuery().track(keyWords); _twitterStream.filter(query); } } @Override public void nextTuple() { Status ret = queue.poll(); if (ret == null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) {} @Override public void fail(Object id) {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); } }
Болт считывателя хэштега
Твит , отправленный spout , будет перенаправлен в HashtagReaderBolt , который обработает твит и выдаст все доступные хэштеги. HashtagReaderBolt использует метод getHashTagEntities, предоставленный twitter4j. getHashTagEntities читает твит и возвращает список хэштегов. Полный код программы выглядит следующим образом:
Кодирование: HashtagReaderBolt.java
import java.util.HashMap; import java.util.Map; import twitter4j.*; import twitter4j.conf.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagReaderBolt implements IRichBolt { private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { Status tweet = (Status) tuple.getValueByField("tweet"); for(HashtagEntity hashtage : tweet.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); this.collector.emit(new Values(hashtage.getText())); } } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Счетчик болтов хештега
Переданный хэштег будет перенаправлен в HashtagCounterBolt . Этот болт обработает все хэштеги и сохранит каждый хэштег и его количество в памяти, используя объект Java Map. Полный код программы приведен ниже.
Кодирование: HashtagCounterBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class HashtagCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String key = tuple.getString(0); if(!counterMap.containsKey(key)){ counterMap.put(key, 1); }else{ Integer c = counterMap.get(key) + 1; counterMap.put(key, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println("Result: " + entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("hashtag")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Отправка топологии
Отправка топологии является основным приложением. Топология Twitter состоит из TwitterSampleSpout , HashtagReaderBolt и HashtagCounterBolt . Следующий программный код показывает, как отправить топологию.
Кодирование: TwitterHashtagStorm.java
import java.util.*; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class TwitterHashtagStorm { public static void main(String[] args) throws Exception{ String consumerKey = args[0]; String consumerSecret = args[1]; String accessToken = args[2]; String accessTokenSecret = args[3]; String[] arguments = args.clone(); String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length); Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keyWords)); builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt()) .shuffleGrouping("twitter-spout"); builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt()) .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TwitterHashtagStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
Сборка и запуск приложения
Полное приложение имеет четыре кода Java. Они заключаются в следующем —
- TwitterSampleSpout.java
- HashtagReaderBolt.java
- HashtagCounterBolt.java
- TwitterHashtagStorm.java
Вы можете скомпилировать приложение, используя следующую команду —
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java
Запустите приложение, используя следующие команды —
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:. TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret> <keyword1> <keyword2> … <keywordN>
Выход
Приложение напечатает текущий доступный хэштег и его количество. Вывод должен быть похож на следующее —
Result: jazztastic : 1 Result: foodie : 1 Result: Redskins : 1 Result: Recipe : 1 Result: cook : 1 Result: android : 1 Result: food : 2 Result: NoToxicHorseMeat : 1 Result: Purrs4Peace : 1 Result: livemusic : 1 Result: VIPremium : 1 Result: Frome : 1 Result: SundayRoast : 1 Result: Millennials : 1 Result: HealthWithKier : 1 Result: LPs30DaysofGratitude : 1 Result: cooking : 1 Result: gameinsight : 1 Result: Countryfile : 1 Result: androidgames : 1
Apache Storm в Yahoo! финансов
Yahoo! «Финансы» — ведущий интернет-сайт, посвященный деловым новостям и финансовым данным Это часть Yahoo! и предоставляет информацию о финансовых новостях, статистике рынка, данных о международном рынке и другую информацию о финансовых ресурсах, к которым имеет доступ каждый.
Если вы являетесь зарегистрированным Yahoo! пользователь, то вы можете настроить Yahoo! Финансировать, чтобы воспользоваться его определенными предложениями. Yahoo! Финансовый API используется для запроса финансовых данных из Yahoo!
Этот API-интерфейс отображает данные, которые задерживаются на 15 минут от реального времени, и обновляет свою базу данных каждые 1 минуту, чтобы получить доступ к текущей информации, связанной с запасами. Теперь давайте рассмотрим сценарий компании в режиме реального времени и посмотрим, как создать предупреждение, когда стоимость ее акций опустится ниже 100.
Создание носика
Цель носика состоит в том, чтобы получить подробную информацию о компании и сообщить цены на болты. Вы можете использовать следующий программный код для создания носика.
Кодирование: YahooFinanceSpout.java
import java.util.*; import java.io.*; import java.math.BigDecimal; //import yahoofinace packages import yahoofinance.YahooFinance; import yahoofinance.Stock; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; public class YahooFinanceSpout implements IRichSpout { private SpoutOutputCollector collector; private boolean completed = false; private TopologyContext context; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){ this.context = context; this.collector = collector; } @Override public void nextTuple() { try { Stock stock = YahooFinance.get("INTC"); BigDecimal price = stock.getQuote().getPrice(); this.collector.emit(new Values("INTC", price.doubleValue())); stock = YahooFinance.get("GOOGL"); price = stock.getQuote().getPrice(); this.collector.emit(new Values("GOOGL", price.doubleValue())); stock = YahooFinance.get("AAPL"); price = stock.getQuote().getPrice(); this.collector.emit(new Values("AAPL", price.doubleValue())); } catch(Exception e) {} } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("company", "price")); } @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Создание болта
Здесь цель «болта» — обработать цены данной компании, когда цены упадут ниже 100. Он использует объект Java Map для установки предупреждения об ограничении цены отсечения как истинного, когда цены на акции опускаются ниже 100; иначе ложь. Полный код программы выглядит следующим образом:
Кодирование: PriceCutOffBolt.java
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class PriceCutOffBolt implements IRichBolt { Map<String, Integer> cutOffMap; Map<String, Boolean> resultMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.cutOffMap = new HashMap <String, Integer>(); this.cutOffMap.put("INTC", 100); this.cutOffMap.put("AAPL", 100); this.cutOffMap.put("GOOGL", 100); this.resultMap = new HashMap<String, Boolean>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String company = tuple.getString(0); Double price = tuple.getDouble(1); if(this.cutOffMap.containsKey(company)){ Integer cutOffPrice = this.cutOffMap.get(company); if(price < cutOffPrice) { this.resultMap.put(company, true); } else { this.resultMap.put(company, false); } } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("cut_off_price")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Отправка топологии
Это основное приложение, в котором YahooFinanceSpout.java и PriceCutOffBolt.java соединены вместе и создают топологию. Следующий программный код показывает, как вы можете отправить топологию.
Кодирование: YahooFinanceStorm.java
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; public class YahooFinanceStorm { public static void main(String[] args) throws Exception{ Config config = new Config(); config.setDebug(true); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout()); builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt()) .fieldsGrouping("yahoo-finance-spout", new Fields("company")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
Сборка и запуск приложения
Полное приложение имеет три кода Java. Они заключаются в следующем —
- YahooFinanceSpout.java
- PriceCutOffBolt.java
- YahooFinanceStorm.java
Приложение может быть построено с помощью следующей команды —
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java
Приложение можно запустить с помощью следующей команды —
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:. YahooFinanceStorm
Выход
Вывод будет похож на следующее —
GOOGL : false AAPL : false INTC : true
Apache Storm — Приложения
Платформа Apache Storm поддерживает многие из лучших на сегодняшний день промышленных приложений. В этой главе мы дадим очень краткий обзор некоторых наиболее заметных приложений Storm.
Klout
Klout — это приложение, которое использует аналитику социальных сетей для ранжирования своих пользователей на основе социального влияния в Интернете с помощью Klout Score , который представляет собой числовое значение от 1 до 100. Klout использует встроенную абстракцию Trident Apache Storm для создания сложных топологий, которые передают данные.
Канал о погоде
Канал погоды использует топологию Storm для получения данных о погоде. Он связан с Твиттером, чтобы включить информирование о погоде в Твиттере и мобильных приложениях. OpenSignal — это компания, которая специализируется на картографировании беспроводного покрытия. StormTag и WeatherSignal — это погодные проекты, созданные OpenSignal. StormTag — это метеостанция Bluetooth, которая подключается к связке ключей. Данные о погоде, собранные устройством, отправляются на приложение WeatherSignal и серверы OpenSignal.
Телекоммуникационная индустрия
Поставщики телекоммуникационных услуг обрабатывают миллионы телефонных звонков в секунду. Они выполняют экспертизу при пропущенных звонках и плохом качестве звука. Подробные записи вызовов поступают со скоростью миллионов в секунду, и Apache Storm обрабатывает их в режиме реального времени и выявляет любые вызывающие беспокойство шаблоны. Анализ штормов может использоваться для постоянного улучшения качества вызовов.