Учебники

Spark SQL – DataFrames

DataFrame – это распределенная коллекция данных, которая организована в именованные столбцы. Концептуально это эквивалентно реляционным таблицам с хорошими методами оптимизации.

DataFrame может быть создан из массива различных источников, таких как таблицы Hive, файлы структурированных данных, внешние базы данных или существующие RDD. Этот API был разработан для современных приложений Big Data и Data Science, основанных на DataFrame в R Programming и Pandas в Python .

Особенности DataFrame

Вот несколько характерных особенностей DataFrame:

  • Возможность обработки данных размером от килобайта до петабайта от одного узла к большому кластеру.

  • Поддерживает различные форматы данных (Avro, CSV, эластичный поиск и Cassandra) и системы хранения (HDFS, таблицы HIVE, MySQL и т. Д.).

  • Современная оптимизация и генерация кода с помощью оптимизатора Spark SQL Catalyst (структура преобразования дерева).

  • Может быть легко интегрирован со всеми инструментами и средами больших данных через Spark-Core.

  • Предоставляет API для программирования на Python, Java, Scala и R.

Возможность обработки данных размером от килобайта до петабайта от одного узла к большому кластеру.

Поддерживает различные форматы данных (Avro, CSV, эластичный поиск и Cassandra) и системы хранения (HDFS, таблицы HIVE, MySQL и т. Д.).

Современная оптимизация и генерация кода с помощью оптимизатора Spark SQL Catalyst (структура преобразования дерева).

Может быть легко интегрирован со всеми инструментами и средами больших данных через Spark-Core.

Предоставляет API для программирования на Python, Java, Scala и R.

SQLContext

SQLContext является классом и используется для инициализации функций Spark SQL. Объект класса SparkContext (sc) необходим для инициализации объекта класса SQLContext.

Следующая команда используется для инициализации SparkContext через spark-shell.

$ spark-shell

По умолчанию объект SparkContext инициализируется именем sc при запуске spark-shell.

Используйте следующую команду для создания SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

пример

Давайте рассмотрим пример записей сотрудников в файле JSON с именем employee.json . Используйте следующие команды для создания DataFrame (df) и прочитайте документ JSON с именем employee.json со следующим содержимым.

employee.json – Поместите этот файл в каталог, где находится текущий указатель scala> .

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

Операции с фреймами данных

DataFrame предоставляет предметно-ориентированный язык для управления структурированными данными. Здесь мы приведем несколько основных примеров обработки структурированных данных с использованием DataFrames.

Выполните шаги, приведенные ниже, для выполнения операций DataFrame –

Прочитайте документ JSON

Сначала мы должны прочитать документ JSON. Исходя из этого, сгенерируйте DataFrame с именем (dfs).

Используйте следующую команду, чтобы прочитать документ JSON с именем employee.json . Данные отображаются в виде таблицы с полями – id, name и age.

scala> val dfs = sqlContext.read.json("employee.json")

Вывод – Имена полей берутся автоматически из employee.json .

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

Показать данные

Если вы хотите увидеть данные в DataFrame, используйте следующую команду.

scala> dfs.show()

Вывод. Вы можете просматривать данные о сотрудниках в табличном формате.

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

Используйте метод printSchema

Если вы хотите увидеть структуру (схему) объекта DataFrame, используйте следующую команду.

scala> dfs.printSchema()

Выход

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

Использовать метод выбора

Используйте следующую команду для извлечения имени -колонки среди трех столбцов из DataFrame.

scala> dfs.select("name").show()

Выход – Вы можете увидеть значения столбца имени .

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

Использовать фильтр возраста

Используйте следующую команду для поиска сотрудников, возраст которых превышает 23 (возраст> 23).

scala> dfs.filter(dfs("age") > 23).show()

Выход

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

Используйте метод groupBy

Используйте следующую команду для подсчета числа сотрудников одного возраста.

scala> dfs.groupBy("age").count().show()

Выход – два сотрудника в возрасте 23 года.

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

Выполнение SQL-запросов программно

SQLContext позволяет приложениям программно выполнять запросы SQL при выполнении функций SQL и возвращает результат в виде DataFrame.

Как правило, в фоновом режиме SparkSQL поддерживает два разных метода для преобразования существующих RDD в DataFrames –

Этот метод использует отражение для создания схемы RDD, которая содержит объекты определенных типов.

Второй метод создания DataFrame – через программный интерфейс, который позволяет создать схему и затем применить ее к существующему RDD.