Этот пост является результатом моих усилий, направленных на то, чтобы показать коллеге, как получить необходимую информацию, используя возможности потоковой передачи и краткий API-интерфейс Apache Spark . В этом посте вы узнаете, как выполнять простую, но очень интересную аналитику, которая поможет вам решить реальные проблемы, анализируя конкретные области социальной сети.
Использование подмножества потока Twitter было идеальным выбором для этой демонстрации, поскольку в нем было все, что нам нужно: бесконечный и непрерывный источник данных, который был готов для изучения.
Поток искр, минимизированный
Spark Streaming очень хорошо объясняется здесь и в главе 6 книги «Начало работы с Apache Spark», поэтому мы собираемся пропустить некоторые подробности о Streaming API и перейти к настройке нашего приложения.
Настройка нашего приложения
Давайте посмотрим, как подготовить наше приложение, прежде чем делать что-то еще.
01
02
03
04
05
06
07
08
09
10
11
12
|
val config = new SparkConf().setAppName( "twitter-stream-sentiment" ) val sc = new SparkContext(config) sc.setLogLevel( "WARN" ) val ssc = new StreamingContext(sc, Seconds( 5 )) System.setProperty( "twitter4j.oauth.consumerKey" , "consumerKey" ) System.setProperty( "twitter4j.oauth.consumerSecret" , "consumerSecret" ) System.setProperty( "twitter4j.oauth.accessToken" , accessToken) System.setProperty( "twitter4j.oauth.accessTokenSecret" , "accessTokenSecret" ) val stream = TwitterUtils.createStream(ssc, None) |
Здесь мы создали Spark Context sc и установили уровень журнала на WARN, чтобы устранить шумный журнал, который генерирует Spark. Мы также создали потоковый контекст ssc, используя sc . Затем мы настраиваем наши учетные данные Twitter (перед этим нам нужно было выполнить следующие шаги ), которые мы получили с веб-сайта Twitter. Теперь начинается настоящее веселье .
Что сейчас в тренде в Twitter?
Легко узнать, что творится в Твиттере в любой момент; это просто вопрос подсчета появления каждого тега в потоке. Давайте посмотрим, как Spark позволяет нам делать эту операцию.
01
02
03
04
05
06
07
08
09
10
11
|
val tags = stream.flatMap { status => status.getHashtagEntities.map(_.getText) } tags.countByValue() .foreachRDD { rdd => val now = org.joda.time.DateTime.now() rdd .sortBy(_._2) .map(x => (x, now)) .saveAsTextFile(s "~/twitter/$now" ) } |
Сначала мы получили теги из твитов, посчитали, сколько раз они появились (тег), и отсортировали их по количеству. После этого мы сохранили результат, чтобы указать на него Splunk (или любой другой инструмент в этом отношении). Мы могли бы создать несколько интересных панелей, используя эту информацию, чтобы отслеживать самые популярные хэштеги. Основываясь на этой информации, мой коллега мог создавать кампании и использовать эти популярные теги, чтобы привлечь большую аудиторию.
Анализ твитов
Теперь мы хотим добавить функциональность, чтобы получить общее мнение о том, что люди думают о наборе тем. Ради этого примера, скажем, мы хотим узнать мнение твитов о больших данных и еде , двух очень не связанных темах.
Существует несколько API для анализа настроений из твитов, но мы собираемся использовать интересную библиотеку из Stanford Natural Language Processing Group для извлечения соответствующих настроений .
В нашем файле build.sbt нам нужно добавить соответствующие зависимости.
1
|
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models" |
Теперь нам нужно выбрать только те твиты, которые нам действительно нужны, путем фильтрации потока с использованием определенного хэштега (#) . Такая фильтрация довольно проста благодаря унифицированному API Spark.
Посмотрим как.
1
2
3
4
|
val tweets = stream.filter {t => val tags = t.getText.split( " " ).filter(_.startsWith( "#" )).map(_.toLowerCase) tags.contains( "#bigdata" ) && tags.contains( "#food" ) } |
Здесь мы получаем все теги в каждом твите, проверяя, что он был помечен с помощью #bigdata и #food .
Как только у нас появятся наши твиты, извлечь соответствующие чувства довольно просто. Давайте определим функцию, которая извлекает чувства из содержимого Tweet, чтобы мы могли включить их в наш конвейер.
1
|
def detectSentiment(message: String): SENTIMENT_TYPE |
Мы собираемся использовать эту функцию, предполагая, что она делает то, что должна, и мы поставим ее реализацию в конце, так как это не тема этого поста. Чтобы понять, как это работает, давайте построим несколько тестов вокруг него.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
|
it( "should detect not understood sentiment" ) { detectSentiment( "" ) should equal (NOT_UNDERSTOOD) } it( "should detect a negative sentiment" ) { detectSentiment( "I am feeling very sad and frustrated." ) should equal (NEGATIVE) } it( "should detect a neutral sentiment" ) { detectSentiment( "I'm watching a movie" ) should equal (NEUTRAL) } it( "should detect a positive sentiment" ) { detectSentiment( "It was a nice experience." ) should equal (POSITIVE) } it( "should detect a very positive sentiment" ) { detectSentiment( "It was a very nice experience." ) should equal (VERY_POSITIVE) } |
Этих тестов должно быть достаточно, чтобы показать, как работает DetectiveSentiment .
Давайте посмотрим на пример.
1
2
3
4
5
6
|
val data = tweets.map { status => val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText) val tags = status.getHashtagEntities.map(_.getText.toLowerCase) (status.getText, sentiment.toString, tags) } |
data представляет DStream Tweets, который мы хотим, связанные настроения и хэштеги в Tweet (здесь мы должны найти теги, которые мы использовали для фильтрации).
Взаимодействие SQL
Теперь мы хотим сопоставить данные настроений с внешним набором данных, который мы можем запросить с помощью SQL. Для моего коллеги имеет смысл присоединиться к потоку Twitter с его другим набором данных.
Давайте посмотрим, как мы могли этого добиться.
1
2
3
4
5
6
7
|
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ data.foreachRDD { rdd => rdd.toDF().registerTempTable( "sentiments" ) } |
Мы преобразовали наш поток в другое представление ( DataFrame ), которое также поддерживается всеми концепциями Spark (устойчивым, распределенным, очень быстрым), и представили его в виде таблицы, чтобы мой коллега мог использовать свой любимый SQL для запросов к различным источникам.
Настроение таблицы (которое мы определили из нашего DataFrame) будет запрашиваться как любая другая таблица в его системе. Другая возможность состоит в том, что мы можем запрашивать другие источники данных (Cassandra, Xmls или наши собственные файлы в двоичном формате), используя Spark SQL, и скрещивать их с потоком.
Вы можете узнать больше информации по этой теме здесь и здесь .
Пример запроса DataFrame показан ниже.
1
|
sqlContext.sql( "select * from sentiments" ).show() |
Оконные операции
Spark Streaming имеет возможность оглядываться назад в потоке, функциональность, которой не хватает большинству потоковых движков (если у них есть эта функциональность, ее очень сложно реализовать).
Чтобы реализовать оконную операцию, вам нужно будет проверить поток, но это простая задача. Вы найдете больше информации об этом здесь .
Вот небольшой пример такой операции:
1
|
tags .window(Minutes( 1 )) . (...) |
Вывод
Хотя наши примеры довольно просты, мы смогли решить реальную проблему с помощью Spark. Теперь у нас есть возможность определять актуальные темы в Твиттере, что помогает нам ориентироваться и расширять нашу аудиторию. В то же время мы можем получить доступ к различным наборам данных с помощью одного набора инструментов, таких как SQL.
Очень интересные результаты пришли от #bigdata и #food одновременно. Возможно, люди чирикают о больших данных во время обеда — кто знает?
Ссылка: | Spark Streaming и анализ настроений в Twitter от нашего партнера JCG Чейза Хули в блоге Mapr . |