Статьи

Апачская искра, Кассандра и Игра престолов

Apache Spark с Cassandra — мощная комбинация в конвейерах обработки данных. В этом посте мы создадим приложение Scala с комбо Spark Cassandra и запросим данные о битвах из Игры престолов. Теперь мы не собираемся делать какие-либо прогнозы шоу! Но мы покажем наиболее агрессивных королей, а также королей, на которых больше всего напали. Таким образом, у вас есть, что Goin для вас.

обзор

Нашей основной задачей являются технические особенности интеграции Spark Cassandra со Scala. Для этого мы загрузим Cassandra с данными о битве с Game of Thrones, а затем запросим их у Spark с помощью Scala. Мы будем использовать Spark из оболочки, а также развертывать программу Spark Driver в кластере. У нас будут примеры демонстрации класса случая Scala благодаря соединителю DataStax, а также использованию SparkSQL для создания DataFrames. Мы также смешиваем некоторые настройки sbt.

Внизу этого поста в разделе «Ресурсы» находятся скринкасты и соответствующие ссылки.

Предполагаемая аудитория этого учебника Spark Cassandra — это тот, у кого есть опыт работы со Scala и Apache Spark. Если вы хотите быстро и эффективно достичь этого уровня, ознакомьтесь с нашим учебным курсом Apache Spark с Scala по требованию .

Предпосылки

  1. Apache Cassandra (см. Ресурсы ниже)
  2. Загруженные данные Game of Thrones (см. Ресурсы ниже)
  3. Apache Spark

Контур

  1. Импортировать данные в Кассандру
  2. Написать код Scala
  3. Тестирование кода Spark Cassandra в оболочке SBT
  4. Разверните Spark Cassandra в Spark Cluster с помощью SBT и spark-submit

Пример Spark Cassandra

Часть 1: Подготовьте Кассандру

Давайте импортируем боевые данные GOT в Кассандру. Для простоты я собираюсь использовать локально работающий экземпляр Cassandra. Я запустил Cassandra со скриптом bin / cassandra на моем Mac. (используйте cassandra.bat в Windows, но вы уже это знали.)

Затем подключитесь к Cassandra с помощью cqlsh и создайте пространство ключей для использования. Этот урок создает пространство клавиш «gameofthrones»:

1
CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

Отсюда мы создаем таблицу для данных битвы.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
CREATE TABLE gameofthrones.battles (
name TEXT,
year INT,
battle_number INT,
attacker_king TEXT,
defender_king TEXT,
attacker_1 TEXT,
attacker_2 TEXT,
attacker_3 TEXT,
attacker_4 TEXT,
defender_1 TEXT,
defender_2 TEXT,
defender_3 TEXT,
defender_4 TEXT,
attacker_outcome TEXT,
battle_type TEXT,
major_death TEXT,
major_capture TEXT,
attacker_size TEXT,
defender_size TEXT,
attacker_commander TEXT,
defender_commander TEXT,
summer TEXT,
location TEXT,
region TEXT,
note TEXT,
PRIMARY KEY(battle_number)
);

Затем импортируйте данные сражений, используя Cassandra COPY показанную ниже. (см. раздел Ресурсы ниже, где можно скачать данные).
Кстати, мне нужно было запустить скрипт Perl для обновления кодировок конца строки с Mac на Unix для файла CSV, используя perl -pi -e 's/\r/\n/g . Ваш пробег может варьироваться.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
COPY battles (
name,
year,
battle_number,
attacker_king,
defender_king,
attacker_1,
attacker_2,
attacker_3,
attacker_4,
defender_1,
defender_2,
defender_3,
defender_4,
attacker_outcome,
battle_type,
major_death,
major_capture,
attacker_size,
defender_size,
attacker_commander,
defender_commander,
summer,
location,
region,
note)
FROM 'battles.csv'  // update this location as necessary
WITH HEADER = true;

Это завершает часть 1. Давайте перейдем к части 2, где мы напишем некоторый код Scala.

Часть 2: Spark Cassandra Scala Code

(Примечание: весь следующий пример кода, если он доступен на Github. Ссылка в разделе Ресурсы ниже.)

Для начала давайте разберем каркасную структуру проекта —

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
mkdir got-battles // name it anything you'd like
  
cd got-battles  // if you named it got-battles
  
mkdir src
  
mkdir src/main
  
mkdir src/main/scala
  
mkdir src/main/scala/com
  
mkdir src/main/scala/com/supergloo
  
mkdir project

Далее мы добавим несколько файлов для sbt и плагина sbt-assembly.
Сначала файл сборки для sbt
файл got-battles / build.sbt:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
name := "spark-cassandra-example"
  
version := "1.0"
  
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
  
// https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/5muNwRaCJnU
assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) {
  (old) => {
    case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
    case x => old(x)
  }
}
  
scalaVersion := "2.10.6"
  
resolvers += "jitpack" at "https://jitpack.io"
  
libraryDependencies ++= Seq(
// use provided line when building assembly jar
// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
// comment above line and uncomment the following to run in sbt
   "org.apache.spark" %% "spark-sql" % "1.6.1",
   "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0"
)

и файл 1-строки got-battles / project / assembly.sbt:

1
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

А теперь давайте создадим код драйвера Spark в got-battles / src / main / scala / com / supergloo под названием SparkCassandra.scala

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.supergloo
  
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
  
import com.datastax.spark.connector._
import com.datastax.spark.connector.rdd._
import org.apache.spark.sql.cassandra._
  
  
/**
  * Simple Spark Cassandra
  * One example with Scala case class marshalling
  * Another example using Spark SQL
  */
object SparkCassandra {
  
  case class Battle(   
    battle_number: Integer,
    year: Integer,
    attacker_king: String,
    defender_king: String
  )
  
  def main(args: Array[String]) {
  
    val conf = new SparkConf().setAppName("SparkCassandraExampleApp")
  
    if (args.length > 0) conf.setMaster(args(0)) // in case we're running in sbt shell such as: run local[5]
  
    conf.set("spark.cassandra.connection.host", "127.0.0.1"// so yes, I'm assuming Cassandra is running locally here.
                                                              // adjust as needed
  
    val sc = new SparkContext(conf)
  
    // Spark Cassandra Example one which marshalls to Scala case classes
    val battles:Array[Battle] = sc.cassandraTable[Battle]("gameofthrones", "battles").
                                        select("battle_number","year","attacker_king","defender_king").toArray
     
    battles.foreach { b: Battle =>
        println("Battle Number %s was defended by %s.".format(b.battle_number, b.defender_king))
    }
  
  
    // Spark Cassandra Example Two - Create DataFrame from Spark SQL read
    val sqlContext = new SQLContext(sc)
  
    val df = sqlContext.read
              .format("org.apache.spark.sql.cassandra")
              .options(Map( "table" -> "battles", "keyspace" -> "gameofthrones" ))
              .load()
  
    df.show
  
    // Game of Thrones Battle analysis
  
    // Who were the most aggressive kings?  (most attacker_kings)
    val countsByAttack = df.groupBy("attacker_king").count().sort(desc("count"))
    countsByAttack.show()
  
    // Which kings were attacked the most?  (most defender_kings)
    val countsByDefend = df.groupBy("defender_king").count().sort(desc("count"))
    countsByDefend.show()
  
    sc.stop()
     
  }
}

Часть 3: Запуск Spark Cassandra Scala Code из консоли SBT

Запустите консоль sbt через sbt . Когда все будет готово, вы можете выполнить команду run с аргументом для вашего местоположения Spark Master; т.е. запустить локально [ 5 ]

(Опять же, в конце этого поста есть скринкаст, в котором показан пример выполнения этой команды. См. Раздел Ресурсы ниже.)

В зависимости от уровня вашего журнала, вы должны увидеть различные выходные данные SparkCassandra из кода SparkCassandra. Эти консольные выходы от Cassandra — показатель успеха. О да. Скажи это сейчас со мной. О даааааа

Запуск кода в консоли sbt — это удобный способ быстро вносить и тестировать изменения. Когда я разрабатывал этот код, в одном окне был открыт терминал, а в другом — редактор. Всякий раз, когда я изменял и сохранял исходный код Scala, я мог просто перезапускаться в консоли sbt.

Итак, предположим, мы достигли точки, когда захотели развернуть эту программу Spark. Давайте узнаем в следующем разделе.

Часть 4: Сборка кода Spark Cassandra Scala и развертывание в кластере Spark

Чтобы собрать jar и развернуть его в кластере Spark, нам нужно внести небольшое изменение в наш файл build.sbt . Как вы могли заметить из приведенного выше кода, в файле есть комментарии, которые указывают, что необходимо изменить. Мы должны раскомментировать эту строку:

1
// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",

и закомментируйте эту строку:

1
"org.apache.spark" %% "spark-sql" % "1.6.1",

затем мы можем запустить sbt assembly из командной строки, чтобы создать развертываемый jar- sbt assembly Spark. Если вы используете пример файла build.sbt то получится target / scala-2.10 / spark-cassandra-example-assembly-1.0.jar

Для развертывания используйте spark-submit с соответствующими аргументами; т.е.

1
spark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jarspark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jar

Вывод

Так что ты думаешь? Когда вы запускаете код, вы можете увидеть самых агрессивных королей и королей, на которых больше всего напали. Не отдавая этого, я думаю, можно спорить, стоит ли связывать Мэнса Рейдера с Ренли Баратеоном в списке наиболее атакованных. Но на самом деле не в этом суть этого урока. Что касается кода и настройки, есть ли у вас какие-либо вопросы, мнения или предложения для следующих шагов?

Спарк Кассандра Учебное пособие Screencast

В следующем скриншоте я расскажу о шагах, описанных в этом руководстве. Оставайтесь с нами, потому что в конце скриншота есть кадры. Потому что я имею в виду, почему бы и нет.

Spark Cassandra Учебное пособие

  1. Весь исходный код, включая файл battles.csv, который я очистил с помощью сценария perl, описанного выше в примере кода Apache Spark Cassandra
  2. https://github.com/datastax/spark-cassandra-connector
  3. Фреймы данных с Cassandra Connector
  4. Данные игры престолов: https://github.com/chrisalbon/war_of_the_five_kings_dataset
Ссылка: Apache Spark, Cassandra и Game of Thrones от нашего партнера по JCG Тодда МакГрата в блоге Supergloo .