Учебники

Apache Flume — получение данных из Twitter

Используя Flume, мы можем получать данные из различных служб и транспортировать их в централизованные хранилища (HDFS и HBase). В этой главе объясняется, как получать данные из службы Twitter и сохранять их в HDFS с помощью Apache Flume.

Как обсуждалось в Flume Architecture, веб-сервер генерирует данные журнала, и эти данные собираются агентом в Flume. Канал буферизует эти данные в приемник, который в конечном итоге передает их в централизованные хранилища.

В примере, приведенном в этой главе, мы создадим приложение и получим от него твиты, используя экспериментальный источник Twitter, предоставленный Apache Flume. Мы будем использовать канал памяти для буферизации этих твитов, а приемник HDFS для отправки этих твитов в HDFS.

Получить данные

Чтобы получить данные из Twitter, нам нужно будет выполнить следующие шаги:

  • Создать приложение для Twitter
  • Установить / запустить HDFS
  • Настроить Flume

Создание приложения Twitter

Чтобы получать твиты из Twitter, нужно создать приложение для Twitter. Следуйте инструкциям ниже, чтобы создать приложение для Twitter.

Шаг 1

Чтобы создать приложение для Twitter, нажмите следующую ссылку https://apps.twitter.com/ . Войдите в свой аккаунт Twitter. У вас будет окно управления приложениями Twitter, где вы можете создавать, удалять и управлять приложениями Twitter.

Окно управления приложениями

Шаг 2

Нажмите на кнопку « Создать новое приложение» . Вы будете перенаправлены в окно, где вы получите форму заявки, в которой вы должны будете заполнить свои данные, чтобы создать приложение. При заполнении адреса сайта укажите полный шаблон URL, например, http://example.com.

Создать приложение

Шаг 3

Заполните детали, примите Соглашение с разработчиком, когда закончите, нажмите кнопку « Создать приложение в Твиттере», которая находится внизу страницы. Если все идет хорошо, приложение будет создано с указанными деталями, как показано ниже.

Приложение создано

Шаг 4

Под ключами и вкладкой токены доступа внизу страницы вы можете увидеть кнопку под названием Создать мой токен доступа . Нажмите на него, чтобы сгенерировать токен доступа.

Ключевые токены доступа

Шаг 5

Наконец, нажмите на кнопку Test OAuth , которая находится в правой верхней части страницы. Это приведет к появлению страницы с вашим ключом потребителя, секретом потребителя, токеном доступа и секретом токена доступа . Скопируйте эти детали. Они полезны для настройки агента в Flume.

OAuth Tool

Запуск HDFS

Поскольку мы храним данные в HDFS, нам нужно установить / проверить Hadoop. Запустите Hadoop и создайте в нем папку для хранения данных Flume. Выполните шаги, приведенные ниже, перед настройкой Flume.

Шаг 1. Установите / проверьте Hadoop

Установите Hadoop . Если Hadoop уже установлен в вашей системе, проверьте установку с помощью команды версии Hadoop, как показано ниже.

$ hadoop version 

Если ваша система содержит Hadoop и вы установили переменную path, вы получите следующий вывод:

Hadoop 2.6.0 
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 
Compiled by jenkins on 2014-11-13T21:10Z 
Compiled with protoc 2.5.0 
From source with checksum 18e43357c8f927c0695f1e9522859d6a 
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

Шаг 2: Запуск Hadoop

Просмотрите каталог sbin Hadoop и запустите yarn и Hadoop dfs (распределенную файловую систему), как показано ниже.

cd /$Hadoop_Home/sbin/ 
$ start-dfs.sh 
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out 
localhost: starting datanode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out 
Starting secondary namenodes [0.0.0.0] 
starting secondarynamenode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out
  
$ start-yarn.sh 
starting yarn daemons 
starting resourcemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out 
localhost: starting nodemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out 

Шаг 3: Создайте каталог в HDFS

В Hadoop DFS вы можете создавать каталоги с помощью команды mkdir . Просмотрите его и создайте каталог с именем twitter_data в нужном пути, как показано ниже.

$cd /$Hadoop_Home/bin/ 
$ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data 

Конфигурирование Flume

Мы должны настроить источник, канал и приемник, используя файл конфигурации в папке conf . В примере, приведенном в этой главе, используется экспериментальный источник, предоставленный Apache Flume, под названием « Twitter 1% Firehose Memory channel» и приемник HDFS.

Твиттер 1% Firehose Source

Этот источник очень экспериментален. Он подключается к 1% -ному примеру Twitter Firehose с использованием потокового API и непрерывно загружает твиты, преобразует их в формат Avro и отправляет события Avro в нисходящий приемник Flume.

Мы получим этот источник по умолчанию вместе с установкой Flume. Файлы jar, соответствующие этому источнику, могут быть расположены в папке lib, как показано ниже.

Twitter Jar Files

Установка пути к классам

Установите переменную classpath для папки lib Flume в файле Flume-env.sh, как показано ниже.

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/* 

Для этого источника требуются такие сведения, как ключ потребителя, секрет потребителя, токен доступа и секрет токена доступа приложения Twitter. При настройке этого источника вы должны указать следующие свойства:

  • каналы

  • Тип источника: org.apache.flume.source.twitter.TwitterSource

  • consumerKey — ключ пользователя OAuth

  • consumerSecret — OAuth, секрет пользователя

  • accessToken — токен доступа OAuth

  • accessTokenSecret — Секретный токен OAuth

  • maxBatchSize — максимальное количество сообщений в Твиттере, которое должно быть в пакете твиттера. Значение по умолчанию 1000 (необязательно).

  • maxBatchDurationMillis — Максимальное количество миллисекунд ожидания до закрытия пакета. Значение по умолчанию 1000 (необязательно).

каналы

Тип источника: org.apache.flume.source.twitter.TwitterSource

consumerKey — ключ пользователя OAuth

consumerSecret — OAuth, секрет пользователя

accessToken — токен доступа OAuth

accessTokenSecret — Секретный токен OAuth

maxBatchSize — максимальное количество сообщений в Твиттере, которое должно быть в пакете твиттера. Значение по умолчанию 1000 (необязательно).

maxBatchDurationMillis — Максимальное количество миллисекунд ожидания до закрытия пакета. Значение по умолчанию 1000 (необязательно).

канал

Мы используем канал памяти. Чтобы настроить канал памяти, необходимо указать значение для типа канала.

  • тип — содержит тип канала. В нашем примере типом является MemChannel .

  • Емкость — это максимальное количество событий, хранящихся в канале. Его значение по умолчанию — 100 (необязательно).

  • TransactionCapacity — это максимальное количество событий, которые канал принимает или отправляет. Его значение по умолчанию — 100 (необязательно).

тип — содержит тип канала. В нашем примере типом является MemChannel .

Емкость — это максимальное количество событий, хранящихся в канале. Его значение по умолчанию — 100 (необязательно).

TransactionCapacity — это максимальное количество событий, которые канал принимает или отправляет. Его значение по умолчанию — 100 (необязательно).

Раковина HDFS

Этот приемник записывает данные в HDFS. Чтобы настроить этот приемник, вы должны предоставить следующую информацию.

  • канал

  • тип — hdfs

  • hdfs.path — путь к каталогу в HDFS, в котором должны храниться данные.

канал

тип — hdfs

hdfs.path — путь к каталогу в HDFS, в котором должны храниться данные.

И мы можем предоставить некоторые дополнительные значения в зависимости от сценария. Ниже приведены необязательные свойства приемника HDFS, которые мы настраиваем в нашем приложении.

  • fileType — это необходимый формат файла нашего HDFS. SequenceFile, DataStream и CompressedStream являются тремя типами, доступными для этого потока. В нашем примере мы используем DataStream .

  • writeFormat — может быть либо текстовым, либо записываемым.

  • batchSize — это число событий, записанных в файл перед его сбросом в HDFS. Его значение по умолчанию равно 100.

  • rollsize — размер файла для запуска броска. Это значение по умолчанию составляет 100.

  • rollCount — количество событий, записанных в файл перед его прокруткой. Значение по умолчанию 10.

fileType — это необходимый формат файла нашего HDFS. SequenceFile, DataStream и CompressedStream являются тремя типами, доступными для этого потока. В нашем примере мы используем DataStream .

writeFormat — может быть либо текстовым, либо записываемым.

batchSize — это число событий, записанных в файл перед его сбросом в HDFS. Его значение по умолчанию равно 100.

rollsize — размер файла для запуска броска. Это значение по умолчанию составляет 100.

rollCount — количество событий, записанных в файл перед его прокруткой. Значение по умолчанию 10.

Пример — файл конфигурации

Ниже приведен пример файла конфигурации. Скопируйте этот контент и сохраните как twitter.conf в папке conf Flume.

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS
  
# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret 
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token 
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret 
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
  
# Describing/Configuring the sink 

TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
 
# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 100
  
# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel 

выполнение

Просмотрите домашний каталог Flume и запустите приложение, как показано ниже.

$ cd $FLUME_HOME 
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf 
Dflume.root.logger=DEBUG,console -n TwitterAgent

Если все пойдет хорошо, начнется потоковая передача твитов в HDFS. Ниже приведен снимок окна командной строки при получении твитов.

Извлечение твитов

Проверка HDFS

Вы можете получить доступ к веб-интерфейсу администрирования Hadoop, используя приведенный ниже URL-адрес.

http://localhost:50070/ 

Нажмите на раскрывающееся меню « Утилиты» в правой части страницы. Вы можете увидеть два варианта, как показано на снимке экрана, приведенном ниже.

Проверка HDFS

Нажмите Обзор файловой системы и введите путь к каталогу HDFS, в котором вы сохранили твиты. В нашем примере путь будет / user / Hadoop / twitter_data / . Затем вы можете увидеть список файлов журнала Twitter, хранящихся в HDFS, как показано ниже.