Статьи

Аналитика с Apache Spark. Учебное пособие. Часть 2. Spark SQL

Spark , очень мощный инструмент для аналитики в реальном времени, очень популярен. В первой части этой серии статей о Spark мы представили Spark. Мы рассмотрели историю Spark и объяснилиRDD(которые используются для разделения данных в кластере Spark). Мы также рассмотрели экосистему Apache Spark.

Во второй части представлена ​​важная часть экосистемы Spark, а именно  Spark SQL и DataFrames . Этот туториал покажет, как использовать  Spark и Spark SQL с Cassandra . Если вы пропустили первую часть этой серии, ознакомьтесь  с разделом Введение в Apache Spark, часть 1, аналитика в реальном времени .

Apache Spark  является естественным преемником и дополнением  Hadoop  и продолжает тенденцию BigData. Spark предоставляет простой в использовании API для выполнения больших распределенных заданий для анализа данных. Это быстрее, чем другие виды аналитики, поскольку многое можно сделать в памяти. Apache Spark  предоставляет мощные возможности BigData в руки простых смертных разработчиков для анализа данных в реальном времени. Spark SQL  является примером простого в использовании, но мощного API, предоставляемого  Apache Spark .

Название изображения

Spark SQL

Spark SQL  позволяет легко выполнять запросы SQL и hiveQL. (Обратите внимание, что  hiveQL взят из Apache Hive, который представляет собой систему хранилища данных, построенную на основе Hadoop для обеспечения аналитики BigData .)  Spark SQL  может находить таблицы и метаданные, не выполняя никакой дополнительной работы. Spark SQL  предоставляет возможность запрашивать структурированные данные внутри Spark, используя либо SQL, либо знакомый API DataFrame (RDD). Вы можете использовать  Spark SQL  с вашим любимым языком; Java, Scala, Python и R.

Запрос данных с помощью Java

String query = "SELECT * FROM table";

ResultSet results = session.execute(query); 

В основе  Spark SQL  лежит то, что называется  DataFrame . DataFrame просто хранит данные как набор строк, и каждому столбцу в строке присваивается имя. С DataFrames вы можете легко выбирать, строить и фильтровать данные.

Вы можете использовать DataFrames для ввода и вывода данных, например, вы можете смонтировать следующие форматы данных в виде таблиц и начать выполнять операции с ними из коробки, используя DataFrames в  Spark SQL :

  • РДД
  • JSON
  • улей
  • Паркет
  • MySQL
  • HDFS
  • S3
  • JDBC
  • и более …

После того, как вы прочитали данные, с помощью DataFrames вы можете легко фильтровать данные, выбирать столбец, считать, усреднять и объединять данные из  разных источников .

Если вы планируете читать и записывать данные для анализа,  Spark SQL  может автоматизировать этот процесс и упростить его для вас.

Продемонстрируем, как использовать  Spark SQL  и  DataFrames  в  оболочке Python Spark,  на следующем примере. Мы извлечем данные истории коммитов для  QBit, Java Microservices Lib  от Github. Загрузите его в  Spark , затем поиграйте с данными, вот шаги:

Запустите оболочку Python Spark со своим терминалом:

Запустите оболочку Python Spark

cd spark-1.5.0-bin-hadoop2.4
./bin/pyspark


15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0
      /_/

Using Python version 2.7.5 (default, Mar  9 2014 22:15:05)
SparkContext available as sc, HiveContext available as sqlContext.

Извлеките историю коммитов для QBit из github в файл с именем test.log: 

Извлечь историю коммитов в файл журнала

git log > test.log

Поскольку в этот раз мы используем Python, давайте сначала создадим test.log в качестве RDD, назовем его textFile и выполним некоторые операции над ним:

Создать текстовый файл RDD из test.log

textFile = sc.textFile("../qbit/test.log")

Теперь у нас есть RDD с именем textFile, который разбит на строки текста, давайте посчитаем строки в этом RDD:

Подсчитать строки в текстовом файле RDD

textFile.count()

5776

Мы получили 5776 строк. Давайте отфильтруем все строки, в которых есть слово commit:

Отфильтруйте строки с помощью Commit

linesWithCommit = textFile.filter(lambda line: "commit" in line)

Достаточно поиграть с RDD, так как мы делали это в предыдущем примере, мы просто хотели продемонстрировать, как легко это сделать с Python.

Теперь, чтобы использовать Dataframe, давайте извлечем файл истории журнала из github как тип JSON и вызовем файл sparktest.json:

Вытащить историю коммитов из github как JSON

git log  --pretty=format:'{"commit":"%H","author":"%an","author_email":"%ae","date":"%ad","message":"%f"}' > sparktest.json

В начале работы  Spark SQL  нам нужен sqlContext. SqlContext может быть сделан с помощью SparkContext:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Внутри оболочки sqlContext будет существовать как sqlContext, точно так же SparkContext будет существовать как SC из коробки, поэтому нет необходимости создавать его.

Теперь давайте загрузим данные JSON в spark как DataFrame с именем dataframe:

Загрузите данные JSON в DataFrame.

dataframe = sqlContext.load("../qbit/sparktest.json", "json")

Когда мы загружаем данные, вы просто вызываете load () для sqlContext с указанием директории и типа файла в качестве параметров. Spark автоматически определит все столбцы и их имена для данных. Чтобы убедиться, что все работает как надо, выведите схему:

Распечатать схему для датафрейма

dataframe.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_email: string (nullable = true)
 |-- commit: string (nullable = true)
 |-- date: string (nullable = true)
 |-- message: string (nullable = true)

Эта корневая карта покажет имена и типы столбцов для каждой строки. Каждая строка в этом примере представляет один коммит на Github для проекта  QBit Microservices Lib  . Как только мы это получим, мы можем начать играть с данными.

Например, мы можем получить первый коммит в файле, который представляет последний коммит на github:

Получение последнего коммита для аналитики

dataframe.first()

Row(author=u'Richard Hightower', author_email=u'[email protected]', 
commit=u'696a94f80d1eedae97175f76b9139a340fab1a27', 
date=u'Wed Aug 19 17:51:11 2015 -0700', 
message=u'Merge-pull-request-359-from-advantageous-add_better_uri_param_handling')

Мы можем выделить весь столбец и показать его содержимое. Например, давайте выберем столбец автора и покажем последние 20 участников в  QBit Microservices Lib , по умолчанию  Spark  покажет последние 20:

Аналитика с Spark SQL — выберите автора столбца и покажите последние 20

dataframe.select("author").show()

+-----------------+
|           author|
+-----------------+
|Richard Hightower|
|   Rick Hightower|
|   Rick Hightower|
|Richard Hightower|
|   Rick Hightower|
|Richard Hightower|
|   Rick Hightower|
|Geoffrey Chandler|
|Geoffrey Chandler|
|Richard Hightower|
|Richard Hightower|
|Richard Hightower|
|Richard Hightower|
|Richard Hightower|
|Richard Hightower|
|   Rick Hightower|
|   Rick Hightower|
|   Rick Hightower|
|   Rick Hightower|
|   Rick Hightower|
+-----------------+

Вы можете показать более или менее просто, установив параметр show () в нужное число, давайте покажем последних 5 авторов, которые внесли свой вклад в  QBit Microservices Lib :

Выберите автора столбца и покажите последние 5

dataframe.select("author").show(5)

+-----------------+
|           author|
+-----------------+
|Richard Hightower|
|   Rick Hightower|
|   Rick Hightower|
|Richard Hightower|
|   Rick Hightower|
+-----------------+

Задумайтесь об этом на мгновение. Здесь мы взяли некоторые довольно неструктурированные данные. В этом случае мы взяли некоторые  журналы git commit  из проекта, и мы можем немедленно начать выполнять запросы к нему. А теперь представьте, что делаете это с тысячами проектов, возможно, против каждого git-репозитория в большой компании. И представьте, если нам нужно было часто проводить какой-то анализ, чтобы вместо индексации этих данных для анализа мы просто использовали наш  кластер Spark  для обработки тонны неструктурированных данных. Вы можете начать видеть мощь Spark как  платформы для анализа данных в  реальном времени , которая проста в использовании, масштабируема и мощна.

Давайте выберем столбец даты и покажем последние 20 дат принятия:

Выберите дату столбца и покажите последние 20 дат принятия

dataframe.select("date").show()

+--------------------+
|                date|
+--------------------+
|Wed Aug 19 17:51:...|
|Wed Aug 19 17:37:...|
|Wed Aug 19 16:59:...|
|Wed Aug 19 14:47:...|
|Wed Aug 19 14:42:...|
|Wed Aug 19 13:05:...|
|Wed Aug 19 11:59:...|
|Mon Aug 17 10:18:...|
|Mon Aug 17 10:17:...|
|Mon Aug 17 00:46:...|
|Sun Aug 16 23:52:...|
|Sun Aug 16 23:33:...|
|Sun Aug 16 23:05:...|
|Sun Aug 16 23:03:...|
|Sun Aug 16 22:33:...|
|Thu Aug 13 21:20:...|
|Thu Aug 13 21:15:...|
|Thu Aug 13 20:31:...|
|Thu Aug 13 20:05:...|
|Thu Aug 13 20:04:...|
+--------------------+

Давайте посчитаем количество коммитов, выполненных на  QBit Microservices Lib  из dataframe, подсчитав строки:

Количество совершенных коммитов на QBit Microservice Lib

dataframe.count()

914

914 — это число коммитов, мы также можем видеть это на Github.

Мы также можем использовать фильтры в DataFrames, например, мы можем видеть, сколько коммитов было сделано Ричардом Хайтауэром и Джеффри Чандлером:

Отфильтруйте коммиты Ричарда Хайтауэра и посчитайте их

dataframe.filter(dataframe.author =="Richard Hightower").count()

708

Ричард Хайтауэр сделал 708  коммитов .

Отфильтруйте коммиты, сделанные Джеффри Чендлером, и посчитайте их

dataframe.filter(dataframe.author =="Geoffrey Chandler").count()

102

102 коммита были сделаны  Джеффри Чендлером .

В предыдущем примере мы создали DataFrame из файла данных JSON. Вы также можете сделать DataFrame из RDD двумя различными способами:

  • Если столбцы и их типы неизвестны до времени выполнения, вы можете создать схему и применить ее к СДР.
  • Если столбцы и их типы известны, вы можете использовать метод, который называется отражением.

Для простоты, чтобы создать RDD, давайте использовать файл people.txt, предоставленный Spark, он содержит только три имени, их возраст разделен запятой. Расположен в следующем каталоге: ~ / spark / examples / src / main / resources / people.txt. Шаги кодирования будут хорошо прокомментированы, чтобы понять их.

People.txt Listing

Michael, 29
Andy, 30
Justin, 19

Создайте схему и примените ее к текстовому файлу RDD

# Import data types
from pyspark.sql.types import *


# Create a RDD from `people.txt`
# then convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# encode the schema in a string.
schemaString = "name age"

# Create a type fields
fields = [StructField(field_name, StringType(), True) \
            for field_name in schemaString.split()]

# Create the schema
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)

# In order to query data you need
# to register the DataFrame as a table.
schemaPeople.registerTempTable("people")

# Using sql query all the name from the table
results = sqlContext.sql("SELECT name FROM people")

# The results of SQL queries are RDDs
# and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print name

Будет производить следующее:

Вывод

Name: Michael
Name: Andy
Name: Justin

Которые действительно все имена.

Вы можете видеть, что легко взять неструктурированные данные и дать им достаточно структуры, чтобы начать запрашивать их. Spark  может даже разделить данные между узлами кластера и выполнять  анализ параллельно . Вы можете начать видеть привлекательность Apache Spark в качестве быстрого и общего механизма для крупномасштабной обработки данных для  аналитики в реальном времени  и специальной  быстрой аналитики .

Давайте рассмотрим метод отражения для проведения аналитики.

Использование метода отражения для аналитики с Spark SQL

# First we need to import the following Row class
from pyspark.sql import SQLContext, Row

# Create a RDD peopleAge,
# when this is done the RDD will
# be partitioned into three partitions
peopleAge = sc.textFile("examples/src/main/resources/people.txt")

# Since name and age are separated by a comma let's split them
parts = peopleAge.map(lambda l: l.split(","))

# Every line in the file will represent a row
# with 2 columns name and age.
# After this line will have a table called people
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Using the RDD create a DataFrame
schemaPeople = sqlContext.createDataFrame(people)

# In order to do sql query on a dataframe,
# you need to register it as a table
schemaPeople.registerTempTable("people")

# Finally we are ready to use the DataFrame.
# Let's query the adults that are aged between 21 and 50
adults = sqlContext.sql("SELECT name FROM people \
       WHERE age >= 21 AND age <= 50")

# loop through names and ages
adults = adults.map(lambda p: "Name: " + p.name)
for Adult in adults.collect():
  print Adult

Получите:

Вывод

Name: Michael
Name: Andy

Которые действительно в возрасте от 21 до 50 лет.

Spark, SparkSQL работает с Кассандрой

Spark работает с Кассандрой

Допустим, вы хотите создать программу с  Java,  которая использует  Spark и Cassandra . Вот шаги, которые позволяют  Apache Spark  работать с  Apache Cassandra :

Сначала нам нужно импортировать следующие зависимости:

  • искровая Кассандра connector_2.10: 1.1.1-RC4′
  • искровым Cassandra-разъем-java_2.10: 1.1.1′
  • искровой streaming_2.10: 1.5.0′

Используя Gradle:

Пример сборки Gradle для аналитики с использованием Spark SQL и Cassandra

dependencies {


    //Spark and Cassandra connector to work with java

   compile 'com.datastax.spark:spark-cassandra-connector_2.10:1.1.1-rc4'
    compile 'com.datastax.spark:spark-cassandra-connector-java_2.10:1.1.1'
    compile 'org.apache.spark:spark-streaming_2.10:1.5.0'
}

Далее мы настраиваем   конфигурации Spark . SparkConf предназначен для настройки свойств, таких как мастер Spark и имя приложения, а также произвольных пар ключ-значение, таких как spark.cassandra.connection.host, с помощью метода set ().

Spark master  — это менеджер кластера для подключения, некоторые из разрешенных URL-адресов:

  • local (Запустите Spark локально с одним рабочим потоком, как мы используем в этом примере)
  • local [K] Запустите Spark локально с K потоками, обычно k настраивается так, чтобы соответствовать количеству ядер на вашей машине
  • spark: // HOST: PORT (подключение к данному мастеру кластера. Порт должен совпадать с вашим мастером, по умолчанию 7077)

Чтобы соединить Spark с  Cassandra,  нам нужно установить spark.cassandra.connection.host на хост Sparks Masters, который в данном случае является нашим локальным хостом; Вот конфигурация Spark:

SparkConf conf = new SparkConf();
  ...
        conf.setAppName("TODO spark and cassandra");
        conf.setMaster("local");
        conf.set("spark.cassandra.connection.host", "localhost");

Теперь мы готовы создать схему, то есть создать  пространство ключей  и  таблицу  в  Cassandra, в  которой будут храниться наши данные:

Создайте экземпляр соединителя CassandraConnector и создайте список ключей Keyspace и таблицу Todolisttable в Cassandra

private void createSchema(JavaSparkContext sc) {

        CassandraConnector connector = 
                   CassandraConnector.apply(sc.getConf());
        try (Session session = connector.openSession()) {

            session.execute(deletekeyspace);
            session.execute(keyspace);
            session.execute("USE todolist");
            session.execute(table);
            session.execute(tableRDD);

        }
 }

Как вы можете видеть выше, мы создаем экземпляр коннектора CassandraConnector и выполняем CQL (Cassandra Query Language). Мы рассмотрим эту тему более подробно в другой статье — скоро.

Используемые CQL-команды Cassandra

/* Delete keyspace todolist if exists. */
String deletekeyspace = "DROP KEYSPACE IF EXISTS todolist";

/* Create keyspace todolist. */
String keyspace = "CREATE KEYSPACE IF NOT EXISTS todolist" +
  " WITH replication = {'class': 'SimpleStrategy'," +
  " 'replication_factor':1}";

/* Create table todolisttable. */
String table = "CREATE TABLE todolist.todolisttable(" +
            + " id text PRIMARY KEY, "
            + " description text, "
            + " category text, "
            + " date timestamp )";

/* Create table temp. */
String tableRDD = "CREATE TABLE todolist.temp(id text PRIMARY KEY, "
            + "description text, "
            + "category text )";

Теперь у нас есть две таблицы: todolisttable и temp, давайте загрузим некоторые данные в todolisttable, используя  Cassandra CQL  для загрузки некоторых элементов todo:

private void loadData(JavaSparkContext sc) {

        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        try (Session session = connector.openSession()) {
            session.execute(task1);
            session.execute(task2);
            session.execute(task3);
            session.execute(task4);
            session.execute(task5);
            session.execute(task6);
            session.execute(task7);

        }

Вот элементы задач, которые загружаются в Cassandra, после чего следуют команды CQL.

элементы todo, которые загружаются с Cassandra CQL-командами в Spark

TodoItem item = new TodoItem("George", "Buy a new computer", "Shopping");
   TodoItem item2 = new TodoItem("John", "Go to the gym", "Sport");
    TodoItem item3 = new TodoItem("Ron", "Finish the homework", "Education");
    TodoItem item4 = new TodoItem("Sam", "buy a car", "Shopping");
    TodoItem item5 = new TodoItem("Janet", "buy groceries", "Shopping");
    TodoItem item6 = new TodoItem("Andy", "go to the beach", "Fun");
    TodoItem item7 = new TodoItem("Paul", "Prepare lunch", "Coking");

//index data
    String task1 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item.toString();

    String task2 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item2.toString();

    String task3 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item3.toString();

    String task4 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item4.toString();

    String task5 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item5.toString();

    String task6 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item6.toString();

    String task7 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item7.toString();

И запросить данные из todolisttable Кассандры:

Запрос данных из списка дел Кассандры

private void queryData(JavaSparkContext sc) {

        CassandraConnector connector = 
                   CassandraConnector.apply(sc.getConf());
        try (Session session = connector.openSession()) {

            ResultSet results = session.execute(query);

            System.out.println("Query all results from cassandra:\n" + results.all());

        }

    }

Чтобы получить доступ к данным из таблицы Кассандры в виде Spark RDD:

Доступ к данным из таблицы Кассандры в виде искры RDD

public  void accessTableWitRDD(JavaSparkContext sc){

        JavaRDD<String> cassandraRDD = javaFunctions(sc).cassandraTable("todolist", "todolisttable")
                .map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });

    }

Для того, чтобы читать таблицы с Кассандры как RDD, мы используем cassandraTable («пространство ключей», «таблица») метод. За cassandraTable метод работы нам нужно обернуть sparkcontextсо специальной оберткой, и мы используем javaFunctions () способ сделать это.

Для этого СДР тип данных CassandraRow

Чтобы распечатать этот СДР:

Распечатайте данные Spark RDD

System.out.println("\nData as CassandraRows from a RDD: \n" + StringUtils.join(cassandraRDD.toArray(), "\n"));

Мы также можем сохранять RDD в Cassandra так же легко, как и читать, для этого создадим RDD типа TodoItem и заполнить его некоторыми данными, а затем сохранить его в температура стол в Кассандре:

Создайте СДР со списком предметов и сохраните их в Cassandra

public void saveRDDToCass(JavaSparkContext sc) {
        List<TodoItem> todos = Arrays.asList(
        new TodoItem("George", "Buy a new computer", "Shopping"),
        new TodoItem("John", "Go to the gym", "Sport"),
        new TodoItem("Ron", "Finish the homework", "Education"),
        new TodoItem("Sam", "buy a car", "Shopping"),
        new TodoItem("Janet", "buy groceries", "Shopping"),
        new TodoItem("Andy", "go to the beach", "Fun"),
        new TodoItem("Paul", "Prepare lunch", "Coking")
        );
        JavaRDD<TodoItem> rdd = sc.parallelize(todos);
        javaFunctions(rdd).writerBuilder("todolist", "temp", mapToRow(TodoItem.class)).saveToCassandra();

Выше мы только что создали список массивов TodoItem затем создал Spark RDD рдд со всеми данными, используя Распараллеливать метод, а затем сохранен рдд в пространство клавиш список дели стол температура вызывая writerBuilder метод на завернутый рдд,

Чтобы убедиться, что рдд был сохранен в таблице температура в Кассандре, давайте запросить температураиз Кассандры:

Запрос температура стол из Кассандры

String query1 = "SELECT * FROM todolist.temp";

ResultSet results1 = session.execute(query1);
System.out.println("\nQuery all results from temp" +
 " table after saving a RDD into Cassandra:\n" +
 results1.all());

В конце мы предоставим полный список кодов. Также будет   раздел Run it со всей инструкцией о том, как извлечь код из Github и как запустить его на вашем компьютере.

Spark SQL работает с Кассандрой

Spark SQL  позволяет запрашивать структурированные данные, такие как RDD и любые сохраненные данные на Cassandra, чтобы использовать  Spark SQL,  нам нужно сделать следующее:

  • Создайте SQLContext. (SQLContext оборачивает SparkContext)
  • Загрузка данных в формате паркета (формат паркета представляет собой столбчатое хранилище; это означает, что таблицы данных структурированы как разделы столбцов данных, а не строк данных).
  • После загрузки данных у нас будет DataFrame,
  • Эта дополнительная информация позволяет запрашивать данные после регистрации данных в виде таблицы с использованием SQL
  • SQL-запросы имеют объекты типа строки
  • SQL-запрос — очень мощный инструмент

Обратите внимание, что  искра DataFrame имеет все функции как обычный  Spark РДДплюс дополнительные метаданные об именах и типах столбцов в  наборе данных .

Полезная информация о  Spark SQL :

  • Spark SQL может кэшировать таблицы в памяти
  • Когда вы запрашиваете с SQL, результаты RDD
  • Чтение данных в паркетах: столбчатое хранилище помогает избежать ненужных данных
  • СДР могут храниться в файлах паркета
  • JSON объекты могут быть преобразованы в DataFrame используя jsonRDD

Напомним, что РДД обеспечивает параллелизм. СДР представляет собой  отказоустойчивые распределенные данные . РДД  является основным компонентом  Спарк . СДР (устойчивые распределенные данные) — это представление данных. СДР — это данные, которые могут быть разбиты на кластеры (если хотите, данные будут защищены). Разбиение позволяет выполнять задачи параллельно. Чем больше у вас разделов, тем больше параллелизма вы можете сделать.

Паркет  является столбчатым форматом. Паркет поддерживается другими системами обработки данных, такими как Hive. Apache Parquet  является частью экосистемы Hadoop. Паркет предназначен для кросс-языковой, кросс-обработки данных, столбчатого формата данных. Spark SQL  может читать и записывать файлы Parquet. Эти паркет сохраняет схему данных.

Теперь давайте продемонстрируем, как использовать  Spark SQL  в Java на примере элемента todo.

Сначала нам нужно импортировать искровым SQL зависимость в нашем файле Gradle:

Используйте зависимость Spark SQL в Gradle

dependencies {
 compile 'org.apache.spark:spark-sql_2.10:1.5.0'

}

Затем создайте конфигурацию Spark для соединения с Cassandra:

Конфигурация искры для Кассандры

SparkConf conf = new SparkConf();

conf.setAppName("TODO sparkSQL and cassandra");
conf.setMaster("local");
conf.set("spark.cassandra.connection.host", "localhost");

Создать контекст Spark (JavaSparkContext).

Создать контекст Spark

JavaSparkContext sc = new JavaSparkContext(conf);

The SQLContext используется для подключения к  Cassandra  с помощью SQL:

Создайте Spark SQL Context

SQLContext sqlContext = new SQLContext(sc);

SQLContext позволяет регистрировать СДР и выполнять операции запросов с использованием Spark SQL.

Давайте создадим СДР (рдд) и загрузить в него некоторые данные (TodoItems):

RDD загрузка TodoItems

 List<TodoItem> todos = Arrays.asList(
                new TodoItem("George", "Buy a new computer", "Shopping"),
                new TodoItem("John", "Go to the gym", "Sport"),
                new TodoItem("Ron", "Finish the homework", "Education"),
                new TodoItem("Sam", "buy a car", "Shopping"),
                new TodoItem("Janet", "buy groceries", "Shopping"),
                new TodoItem("Andy", "go to the beach", "Fun"),
                new TodoItem("Paul", "Prepare lunch", "Cooking")
        );
        JavaRDD<TodoItem> rdd = sc.parallelize(todos);

Обратите внимание, что мы Распараллеливать данные Todo среди кластера Spark.  JavaRDD производится из context.parallelize,

Затем создайте DataFrame из sqlContext:

Создайте DataFrame из sqlContext

DataFrame dataframe = sqlContext.createDataFrame(rdd, TodoItem.class);

Обратите внимание, что он получил схему из класса Java TodoItem.class,

Затем зарегистрируйте его как таблицу с именем сделать:

Зарегистрируйте DataFrame как таблицу с именем сделать

sqlContext.registerDataFrameAsTable(dataframe, "todo");

Это позволит нам выполнять запросы к  DataFrame,  используя имя todo.

Теперь мы готовы выполнить все операции, предлагаемые  Spark SQL , давайте сначала посчитаем, сколько элементов todo, это также загрузит данные в память, чтобы ускорить выполнение запросов:

Получение количества элементов TODO с помощью DataFrame

System.out.println("Total number of TodoItems = [" + rdd.count() + "]\n");

И, наконец, давайте запросим данные с помощью SQL:

Показать и выбрать TodoItems в DataFrame с помощью Spark SQL

 DataFrame result = sqlContext.sql("SELECT * from todo");

        System.out.println("Show the DataFrame result:\n");
        result.show();

        System.out.println("Select the id column and show its contents:\n");
        result.select("id").show();

Чтобы получить код и инструкции по его запуску, перейдите к Запустить его раздел в конце. Вот полный список кодов для этого примера.

Полный код

SparkApp.java Линстинг

package com.example;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

//import org.apache.cassandra.cql.BatchStatement;


/**
 * Created by fadi on 5/18/15.
 */
public class SparkApp implements Serializable {


    static final Logger logger = LoggerFactory.getLogger(SparkApp.class);

    TodoItem item = new TodoItem("George", "Buy a new computer", "Shopping");
    TodoItem item2 = new TodoItem("John", "Go to the gym", "Sport");
    TodoItem item3 = new TodoItem("Ron", "Finish the homework", "Education");
    TodoItem item4 = new TodoItem("Sam", "buy a car", "Shopping");
    TodoItem item5 = new TodoItem("Janet", "buy groceries", "Shopping");
    TodoItem item6 = new TodoItem("Andy", "go to the beach", "Fun");
    TodoItem item7 = new TodoItem("Paul", "Prepare lunch", "Coking");



    String keyspace = "CREATE KEYSPACE IF NOT EXISTS todolist  WITH replication = {'class': 'SimpleStrategy', 'replication_factor':1}";

    //index data
    String task1 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item.toString();

    String task2 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item2.toString();

    String task3 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item3.toString();

    String task4 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item4.toString();

    String task5 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item5.toString();

    String task6 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item6.toString();

    String task7 = "INSERT INTO todolisttable (ID, Description, Category, Date)"

            + item7.toString();


    //delete keyspace
    String deletekeyspace = "DROP KEYSPACE IF EXISTS todolist";

    //delete table
    String deletetable = "DROP TABLE todolisttable";

    //create table
    String table = "CREATE TABLE todolist.todolisttable(id text PRIMARY KEY, "
            + "description text, "
            + "category text, "
            + "date timestamp )";

    String tableRDD = "CREATE TABLE todolist.temp(id text PRIMARY KEY, "
            + "description text, "
            + "category text )";

    //Query all data
    String query = "SELECT * FROM todolist.todolisttable";

    String query1 = "SELECT * FROM todolist.temp";
    //Update table
    String update = "UPDATE todolisttable SET Category='Fun',Description='Go to the beach' WHERE ID='Ron'";

    //Deleting data where the index id = George
    String delete = "DELETE FROM todolisttable WHERE ID='George'";

    //Deleting all data
    String deleteall = "TRUNCATE todolisttable";

//---------------------------------------------------------------------------------


    private transient SparkConf conf;

    private SparkApp(SparkConf conf) {
        this.conf = conf;
    }

    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);
        loadData(sc);
        saveRDDToCassandra(sc);
        queryData(sc);
        accessTableWitRDD(sc);

        sc.stop();

    }



    private void createSchema(JavaSparkContext sc) {

        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
        try (Session session = connector.openSession()) {

            session.execute(deletekeyspace);
            session.execute(keyspace);
            session.execute("USE todolist");
            session.execute(table);
            session.execute(tableRDD);


        }
    }

    private void loadData(JavaSparkContext sc) {

        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        try (Session session = connector.openSession()) {
            session.execute(task1);
            session.execute(task2);
            session.execute(task3);
            session.execute(task4);
            session.execute(task5);
            session.execute(task6);
            session.execute(task7);

        }



    }
    private void queryData(JavaSparkContext sc) {

        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
        try (Session session = connector.openSession()) {

            ResultSet results = session.execute(query);

            System.out.println("\nQuery all results from cassandra's todolisttable:\n" + results.all());

            ResultSet results1 = session.execute(query1);

            System.out.println("\nSaving RDD into a temp table in casssandra then query all results from cassandra:\n" + results1.all());


        }

    }

    public  void accessTableWitRDD(JavaSparkContext sc){

        JavaRDD<String> cassandraRDD = javaFunctions(sc).cassandraTable("todolist", "todolisttable")
                .map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });
        System.out.println("\nReading Data from todolisttable in Cassandra with a RDD: \n" + StringUtils.join(cassandraRDD.toArray(), "\n"));


        // javaFunctions(cassandraRDD).writerBuilder("todolist", "todolisttable", mapToRow(String.class)).saveToCassandra();
    }


    public void saveRDDToCassandra(JavaSparkContext sc) {
        List<TodoItem> todos = Arrays.asList(
                new TodoItem("George", "Buy a new computer", "Shopping"),
                new TodoItem("John", "Go to the gym", "Sport"),
                new TodoItem("Ron", "Finish the homework", "Education"),
                new TodoItem("Sam", "buy a car", "Shopping"),
                new TodoItem("Janet", "buy groceries", "Shopping"),
                new TodoItem("Andy", "go to the beach", "Fun"),
                new TodoItem("Paul", "Prepare lunch", "Coking")
        );
        JavaRDD<TodoItem> rdd = sc.parallelize(todos);
        javaFunctions(rdd).writerBuilder("todolist", "temp", mapToRow(TodoItem.class)).saveToCassandra();



    }



//----------------------------------------------------------------------------------------------------------------------------

    public static void main( String args[] )


    {

        SparkConf conf = new SparkConf();

        conf.setAppName("TODO spark and cassandra");
        conf.setMaster("local");
        conf.set("spark.cassandra.connection.host", "localhost");


        SparkApp app = new SparkApp(conf);
        app.run();

    }
}

Список SparkSQLApp.java

package com.example;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.Arrays;
import java.util.List;

/**
 * Created by fadi on 6/14/15.
 */





public class SparkSQLApp {

    private transient SparkConf conf;

    private SparkSQLApp(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
         SQLContext sqlContext = new SQLContext(sc);

        createDataframe(sc, sqlContext);

        querySQLData(sqlContext);

        sc.stop();

    }

    public void createDataframe(JavaSparkContext sc, SQLContext sqlContext ) {
        List<TodoItem> todos = Arrays.asList(
                new TodoItem("George", "Buy a new computer", "Shopping"),
                new TodoItem("John", "Go to the gym", "Sport"),
                new TodoItem("Ron", "Finish the homework", "Education"),
                new TodoItem("Sam", "buy a car", "Shopping"),
                new TodoItem("Janet", "buy groceries", "Shopping"),
                new TodoItem("Andy", "go to the beach", "Fun"),
                new TodoItem("Paul", "Prepare lunch", "Cooking")
        );
        JavaRDD<TodoItem> rdd = sc.parallelize(todos);

        DataFrame dataframe =   sqlContext.createDataFrame(rdd, TodoItem.class);
        sqlContext.registerDataFrameAsTable(dataframe, "todo");

        System.out.println("Total number of TodoItems = [" + rdd.count() + "]\n");

    }


    public void querySQLData(SQLContext sqlContext) {

        DataFrame result = sqlContext.sql("SELECT * from todo");

        System.out.println("Show the DataFrame result:\n");
        result.show();

        System.out.println("Select the id column and show its contents:\n");
        result.select("id").show();


    }

    public static void main( String args[] )


    {

        SparkConf conf = new SparkConf();

        conf.setAppName("TODO sparkSQL and cassandra");
        conf.setMaster("local");
        conf.set("spark.cassandra.connection.host", "localhost");


        SparkSQLApp app = new SparkSQLApp(conf);
        app.run();

    }
}

Листинг Todoitem.java

package com.example;


import java.io.Serializable;
import java.time.LocalDateTime;

public class TodoItem implements Serializable {


    private String id;
    private String description;
    private String category;
    private final LocalDateTime date = LocalDateTime.now();


    public TodoItem(String id, String description, String category) {
        this.id = id;
        this.description = description;
        this.category = category;


    }

    public String getId(){
        return this.id;
    }

    public  String getDescription(){
        return this.description;
    }

    public String getCategory(){
        return this.category;
    }

    public void setId(String id) {
        this.id = id;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public void setCategory(String category) {
        this.category = category;
    }


    @Override
    public String toString() {
        return  "VALUES ( " + "'" + this.id +"'" + ", " + "'" + this.description +"'" + ", " + "'" + this.category +"'" +", "  + "'" + date +"'" + ")";

    }
}

Листинг build.gradle


apply plugin: 'idea'
apply plugin: 'java'
apply plugin: 'jetty'
apply plugin: 'application'

applicationName = 'todocass'

applicationDefaultJvmArgs = ["-Dlogback.configurationFile=etc/todosolr/logging.xml"]

sourceCompatibility = 1.8
version = '1.0'

repositories {
    mavenLocal()
    mavenCentral()
}

task runSpark(type: JavaExec, dependsOn: 'classes') {
    main = "com.example.SparkApp"
    classpath = sourceSets.main.runtimeClasspath

}

task runSparkSQL(type: JavaExec, dependsOn: 'classes') {
    main = "com.example.SparkSQLApp"
    classpath = sourceSets.main.runtimeClasspath
}


dependencies {


    //spark and cassandra connector to work with java

    compile 'com.datastax.spark:spark-cassandra-connector_2.10:1.1.1-rc4'
    compile 'com.datastax.spark:spark-cassandra-connector-java_2.10:1.1.1'
    compile 'org.apache.spark:spark-streaming_2.10:1.5.0'

    compile 'org.apache.spark:spark-sql_2.10:1.5.0'

    //logback dependencies
    compile 'ch.qos.logback:logback-core:1.1.3'
    compile 'ch.qos.logback:logback-classic:1.1.3'
    compile 'org.slf4j:slf4j-api:1.7.12'
}

//Install/copy tasks
task copyDist(type: Copy) {
    dependsOn "installApp"
    from "$buildDir/install/todocass"
    into 'opt/todocass'
}

task copyLog(type: Copy) {
    from "src/main/resources/logback.xml"
    into "etc/todocass/"
}


task copyLogToImage(type: Copy) {
    from "src/main/resources/logback.xml"
    into "image-todo-cass/etc"
}
task copyDistToImage(type: Copy) {
    dependsOn "installApp"
    from "$buildDir/install/todocass"
    into "$projectDir/image-todo-cass/opt/todocass"
}

Запустить его

Первый запуск Кассандры:

cd ~/cassandra
bin/cassandra -f

Получить код:

git clone https://github.com/MammatusTech/Spark-Course.git

Затем создайте Spark-Course:

cd Spark-Course
gradle clean build

Сначала запустите SparkApp, это пример работы Spark с Cassandra:

gradle runSpark

Вы должны увидеть следующее:

Query all results from cassandra's todolisttable:
[Row[George, Shopping, Mon Jun 15 13:36:07 PDT 2015, Buy a new computer], Row[Janet, Shopping, Mon Jun 15 13:36:07 PDT 2015, buy groceries], Row[John, Sport, Mon Jun 15 13:36:07 PDT 2015, Go to the gym], Row[Paul, Coking, Mon Jun 15 13:36:07 PDT 2015, Prepare lunch], Row[Ron, Education, Mon Jun 15 13:36:07 PDT 2015, Finish the homework], Row[Andy, Fun, Mon Jun 15 13:36:07 PDT 2015, go to the beach], Row[Sam, Shopping, Mon Jun 15 13:36:07 PDT 2015, buy a car]]

Saving RDD into a temp table in casssandra then query all results from cassandra:
[Row[George, Shopping, Buy a new computer], Row[Janet, Shopping, buy groceries], Row[John, Sport, Go to the gym], Row[Paul, Coking, Prepare lunch], Row[Ron, Education, Finish the homework], Row[Andy, Fun, go to the beach], Row[Sam, Shopping, buy a car]]

Reading Data from todolisttable in Cassandra with a RDD:
CassandraRow{id: Paul, category: Coking, date: 2015-06-15 13:36:07-0700, description: Prepare lunch}
CassandraRow{id: Sam, category: Shopping, date: 2015-06-15 13:36:07-0700, description: buy a car}
CassandraRow{id: Ron, category: Education, date: 2015-06-15 13:36:07-0700, description: Finish the homework}
CassandraRow{id: Janet, category: Shopping, date: 2015-06-15 13:36:07-0700, description: buy groceries}
CassandraRow{id: John, category: Sport, date: 2015-06-15 13:36:07-0700, description: Go to the gym}
CassandraRow{id: George, category: Shopping, date: 2015-06-15 13:36:07-0700, description: Buy a new computer}
CassandraRow{id: Andy, category: Fun, date: 2015-06-15 13:36:07-0700, description: go to the beach}

Затем запустите SparkSQLAPP, это пример Spark SQL, работающего с Cassandra:

gradle runSparkSQL:

Вы должны увидеть следующее:

Total number of TodoItems = [7]

Show the DataFrame result:

+---------+-------------------+------+
| category|        description|    id|
+---------+-------------------+------+
| Shopping| Buy a new computer|George|
|    Sport|      Go to the gym|  John|
|Education|Finish the homework|   Ron|
| Shopping|          buy a car|   Sam|
| Shopping|      buy groceries| Janet|
|      Fun|    go to the beach|  Andy|
|  Cooking|      Prepare lunch|  Paul|
+---------+-------------------+------+

Select the id column and show its contents:

+------+
|    id|
+------+
|George|
|  John|
|   Ron|
|   Sam|
| Janet|
|  Andy|
|  Paul|
+------+

Вывод

Мы показали, что  Spark  — очень мощный инструмент для аналитики в реальном времени. Мы представляем важную часть экосистемы Spark, а именно  Spark SQL и DataFrames . DataFrames основываются на RDD, чтобы обеспечить разделы данных, которые могут обрабатываться параллельно.

Мы также продемонстрировали, как использовать  Spark и Spark SQL с Cassandra .

Apache Spark  оказался естественным преемником Hadoop и продолжает тенденцию BigData. Он хорошо работает в мире Hadoop и может быстро перейти к анализу данных BigData. Spark  предоставляет простой в использовании API для выполнения больших распределенных заданий для анализа данных. Apache Spark  предоставляет мощные возможности BigData в руки простых смертных разработчиков для анализа данных в реальном времени. Spark SQL  является примером простого в использовании, но мощного API, предоставляемого  Apache Spark .

Ссылки