Apache Spark с Cassandra — мощная комбинация в конвейерах обработки данных. В этом посте мы создадим приложение Scala с комбо Spark Cassandra и запросим данные о битвах из Игры престолов. Теперь мы не собираемся делать какие-либо прогнозы шоу! Но мы покажем наиболее агрессивных королей, а также королей, на которых больше всего напали. Таким образом, у вас есть, что Goin для вас.
обзор
Нашей основной задачей являются технические особенности интеграции Spark Cassandra со Scala. Для этого мы загрузим Cassandra с данными о битве с Game of Thrones, а затем запросим их у Spark с помощью Scala. Мы будем использовать Spark из оболочки, а также развертывать программу Spark Driver в кластере. У нас будут примеры демонстрации класса случая Scala благодаря соединителю DataStax, а также использованию SparkSQL для создания DataFrames. Мы также смешиваем некоторые настройки sbt.
Внизу этого поста в разделе «Ресурсы» находятся скринкасты и соответствующие ссылки.
Предполагаемая аудитория этого учебника Spark Cassandra — это тот, у кого есть опыт работы со Scala и Apache Spark. Если вы хотите быстро и эффективно достичь этого уровня, ознакомьтесь с нашим учебным курсом Apache Spark с Scala по требованию .
Предпосылки
- Apache Cassandra (см. Ресурсы ниже)
- Загруженные данные Game of Thrones (см. Ресурсы ниже)
- Apache Spark
Контур
- Импортировать данные в Кассандру
- Написать код Scala
- Тестирование кода Spark Cassandra в оболочке SBT
- Разверните Spark Cassandra в Spark Cluster с помощью SBT и
spark-submit
Пример Spark Cassandra
Часть 1: Подготовьте Кассандру
Давайте импортируем боевые данные GOT в Кассандру. Для простоты я собираюсь использовать локально работающий экземпляр Cassandra. Я запустил Cassandra со скриптом bin / cassandra на моем Mac. (используйте cassandra.bat в Windows, но вы уже это знали.)
Затем подключитесь к Cassandra с помощью cqlsh и создайте пространство ключей для использования. Этот урок создает пространство клавиш «gameofthrones»:
|
1
|
CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}; |
Отсюда мы создаем таблицу для данных битвы.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};CREATE TABLE gameofthrones.battles (name TEXT,year INT,battle_number INT,attacker_king TEXT,defender_king TEXT,attacker_1 TEXT,attacker_2 TEXT,attacker_3 TEXT,attacker_4 TEXT,defender_1 TEXT,defender_2 TEXT,defender_3 TEXT,defender_4 TEXT,attacker_outcome TEXT,battle_type TEXT,major_death TEXT,major_capture TEXT,attacker_size TEXT,defender_size TEXT,attacker_commander TEXT,defender_commander TEXT,summer TEXT,location TEXT,region TEXT,note TEXT,PRIMARY KEY(battle_number)); |
Затем импортируйте данные сражений, используя Cassandra COPY показанную ниже. (см. раздел Ресурсы ниже, где можно скачать данные).
Кстати, мне нужно было запустить скрипт Perl для обновления кодировок конца строки с Mac на Unix для файла CSV, используя perl -pi -e 's/\r/\n/g . Ваш пробег может варьироваться.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
COPY battles (name,year,battle_number,attacker_king,defender_king,attacker_1,attacker_2,attacker_3,attacker_4,defender_1,defender_2,defender_3,defender_4,attacker_outcome,battle_type,major_death,major_capture,attacker_size,defender_size,attacker_commander,defender_commander,summer,location,region,note)FROM 'battles.csv' // update this location as necessaryWITH HEADER = true; |
Это завершает часть 1. Давайте перейдем к части 2, где мы напишем некоторый код Scala.
Часть 2: Spark Cassandra Scala Code
(Примечание: весь следующий пример кода, если он доступен на Github. Ссылка в разделе Ресурсы ниже.)
Для начала давайте разберем каркасную структуру проекта —
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
mkdir got-battles // name it anything you'd like cd got-battles // if you named it got-battles mkdir src mkdir src/main mkdir src/main/scala mkdir src/main/scala/com mkdir src/main/scala/com/supergloo mkdir project |
Далее мы добавим несколько файлов для sbt и плагина sbt-assembly.
Сначала файл сборки для sbt
файл got-battles / build.sbt:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
name := "spark-cassandra-example" version := "1.0" assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) // https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/5muNwRaCJnUassemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) { (old) => { case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last case x => old(x) }} scalaVersion := "2.10.6" libraryDependencies ++= Seq(// use provided line when building assembly jar// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",// comment above line and uncomment the following to run in sbt "org.apache.spark" %% "spark-sql" % "1.6.1", "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0") |
и файл 1-строки got-battles / project / assembly.sbt:
|
1
|
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0") |
А теперь давайте создадим код драйвера Spark в got-battles / src / main / scala / com / supergloo под названием SparkCassandra.scala
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
package com.supergloo import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.SQLContextimport org.apache.spark.sql.functions._ import com.datastax.spark.connector._import com.datastax.spark.connector.rdd._import org.apache.spark.sql.cassandra._ /** * Simple Spark Cassandra * One example with Scala case class marshalling * Another example using Spark SQL */object SparkCassandra { case class Battle( battle_number: Integer, year: Integer, attacker_king: String, defender_king: String ) def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkCassandraExampleApp") if (args.length > 0) conf.setMaster(args(0)) // in case we're running in sbt shell such as: run local[5] conf.set("spark.cassandra.connection.host", "127.0.0.1") // so yes, I'm assuming Cassandra is running locally here. // adjust as needed val sc = new SparkContext(conf) // Spark Cassandra Example one which marshalls to Scala case classes val battles:Array[Battle] = sc.cassandraTable[Battle]("gameofthrones", "battles"). select("battle_number","year","attacker_king","defender_king").toArray battles.foreach { b: Battle => println("Battle Number %s was defended by %s.".format(b.battle_number, b.defender_king)) } // Spark Cassandra Example Two - Create DataFrame from Spark SQL read val sqlContext = new SQLContext(sc) val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "battles", "keyspace" -> "gameofthrones" )) .load() df.show // Game of Thrones Battle analysis // Who were the most aggressive kings? (most attacker_kings) val countsByAttack = df.groupBy("attacker_king").count().sort(desc("count")) countsByAttack.show() // Which kings were attacked the most? (most defender_kings) val countsByDefend = df.groupBy("defender_king").count().sort(desc("count")) countsByDefend.show() sc.stop() }} |
Часть 3: Запуск Spark Cassandra Scala Code из консоли SBT
Запустите консоль sbt через sbt . Когда все будет готово, вы можете выполнить команду run с аргументом для вашего местоположения Spark Master; т.е. запустить локально [ 5 ]
(Опять же, в конце этого поста есть скринкаст, в котором показан пример выполнения этой команды. См. Раздел Ресурсы ниже.)
В зависимости от уровня вашего журнала, вы должны увидеть различные выходные данные SparkCassandra из кода SparkCassandra. Эти консольные выходы от Cassandra — показатель успеха. О да. Скажи это сейчас со мной. О даааааа
Запуск кода в консоли sbt — это удобный способ быстро вносить и тестировать изменения. Когда я разрабатывал этот код, в одном окне был открыт терминал, а в другом — редактор. Всякий раз, когда я изменял и сохранял исходный код Scala, я мог просто перезапускаться в консоли sbt.
Итак, предположим, мы достигли точки, когда захотели развернуть эту программу Spark. Давайте узнаем в следующем разделе.
Часть 4: Сборка кода Spark Cassandra Scala и развертывание в кластере Spark
Чтобы собрать jar и развернуть его в кластере Spark, нам нужно внести небольшое изменение в наш файл build.sbt . Как вы могли заметить из приведенного выше кода, в файле есть комментарии, которые указывают, что необходимо изменить. Мы должны раскомментировать эту строку:
|
1
|
// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided", |
и закомментируйте эту строку:
|
1
|
"org.apache.spark" %% "spark-sql" % "1.6.1", |
затем мы можем запустить sbt assembly из командной строки, чтобы создать развертываемый jar- sbt assembly Spark. Если вы используете пример файла build.sbt то получится target / scala-2.10 / spark-cassandra-example-assembly-1.0.jar
Для развертывания используйте spark-submit с соответствующими аргументами; т.е.
|
1
|
spark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jarspark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jar |
Вывод
Так что ты думаешь? Когда вы запускаете код, вы можете увидеть самых агрессивных королей и королей, на которых больше всего напали. Не отдавая этого, я думаю, можно спорить, стоит ли связывать Мэнса Рейдера с Ренли Баратеоном в списке наиболее атакованных. Но на самом деле не в этом суть этого урока. Что касается кода и настройки, есть ли у вас какие-либо вопросы, мнения или предложения для следующих шагов?
Спарк Кассандра Учебное пособие Screencast
В следующем скриншоте я расскажу о шагах, описанных в этом руководстве. Оставайтесь с нами, потому что в конце скриншота есть кадры. Потому что я имею в виду, почему бы и нет.
Spark Cassandra Учебное пособие
- Весь исходный код, включая файл battles.csv, который я очистил с помощью сценария perl, описанного выше в примере кода Apache Spark Cassandra
- https://github.com/datastax/spark-cassandra-connector
- Фреймы данных с Cassandra Connector
- Данные игры престолов: https://github.com/chrisalbon/war_of_the_five_kings_dataset
| Ссылка: | Apache Spark, Cassandra и Game of Thrones от нашего партнера по JCG Тодда МакГрата в блоге Supergloo . |