Этот пост поможет вам начать использовать Apache Spark DataFrames со Scala в песочнице MapR. Новый Спарк DataFrames API разработан , чтобы сделать большую обработку данных по табличным данным легче. Spark DataFrame — это распределенная коллекция данных, организованная в именованные столбцы, которая предоставляет операции для фильтрации, группировки или вычисления агрегатов, и может использоваться с Spark SQL. Фреймы данных могут быть построены из файлов структурированных данных, существующих СДР, таблиц в Hive или внешних баз данных. В этом посте вы узнаете, как:
- Загрузить данные в Spark DataFrames
- Исследуйте данные с помощью Spark SQL
Этот пост предполагает базовое понимание концепций Spark. Если вы еще не читали учебник по началу работы со Spark в MapR Sandbox , было бы неплохо сначала прочитать его.
Программное обеспечение
Этот учебник будет работать на песочнице MapR v5.0, которая включает в себя Spark 1.3
- Вы можете скачать код и данные для запуска этих примеров здесь:
- Примеры в этом посте можно запустить в spark-shell после запуска с помощью команды spark-shell.
- Вы также можете запустить код как отдельное приложение, как описано в руководстве по началу работы с Spark в MapR Sandbox .
Примеры наборов данных
Мы будем использовать два примера наборов данных — один из онлайн-аукционов eBay и один из системы отчетности о преступлениях SFPD .
Набор данных онлайн-аукциона eBay содержит следующие поля данных:
auctionid — уникальный идентификатор аукциона
ставка — прокси ставка помещается на цену
bidtime — время (в днях)что заявка была размещена, с самого начала аукциона
участника торгов — eBay имя пользователя Претендента
bidderrate — eBay рейтинг обратной связи из претендент
openbid — набор открытия торгов продавца
цена — цена закрытиячто проданная деталь для (эквивалента второго наивысочайшей ставки + приращение)
В таблице ниже показаны поля данных с некоторыми примерами данных:
Используя Spark DataFrames, мы исследуем данные с такими вопросами, как:
- Сколько аукционов было проведено?
- Сколько ставок было сделано на предмет?
- Какое минимальное, максимальное и среднее количество ставок на единицу товара?
- Показать предложения с ценой> 100
В таблице ниже показаны поля данных SFPD с некоторыми примерами данных:
Используя Spark DataFrames, мы изучим данные SFPD с помощью следующих вопросов:
- Каковы 10 лучших резолюций?
- Сколько существует категорий?
- Каковы 10 лучших категорий инцидентов?
Загрузка данных в Spark DataFrames
Войдите в MapR Sandbox, как описано в разделе Начало работы с Spark в MapR Sandbox , используя идентификатор пользователя user01, пароль mapr. Скопируйте файлы примеров данных в домашнюю директорию песочницы / user / user01 с помощью scp. Начните искровую оболочку с: $ spark-shell
Сначала мы импортируем некоторые пакеты и создадим экземпляр sqlContext, который является точкой входа для работы со структурированными данными (строками и столбцами) в Spark и позволяет создавать объекты DataFrame. (В фрагментах кода ниже вывод показан в комментариях)
// SQLContext entry point for working with structured data
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Import Spark SQL data types and Row.
import org.apache.spark.sql._
Ниже мы загружаем данные из файла ebay.csv в Resilient Distributed Dataset (RDD). СДР могут иметь преобразования и действия; действие first () возвращает первый элемент в RDD, который является строкой “8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3”
// load the data into a new RDD
val ebayText = sc.textFile("ebay.csv")
// Return the first element in this RDD
ebayText.first()
// res6: String = 8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3
Ниже мы используем класс case Scala для определения схемы аукциона, соответствующей файлу ebay.csv. Затем преобразования map () применяются к каждому элементу ebayText для создания СДР ebay объектов Auction.
//define the schema using a case class
case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float, item: String, daystolive: Integer)
// create an RDD of Auction objects
val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt ))
Действие ebay RDD first () возвращает первый элемент в RDD, Auction = Auction (8213034705, 95.0, 2.927373, jake7870, 0, 95.0, 117.5, xbox, 3).
// Return the first element in this RDD
ebay.first()
//res7: Auction = Auction(8213034705,95.0,2.927373,jake7870,0,95.0,117.5,xbox,3)
// Return the number of elements in the RDD
ebay.count()
//res8: Long = 10654
DataFrame — это распределенная коллекция данных, организованная в именованные столбцы. Spark SQL поддерживает автоматическое преобразование RDD, содержащей классы дел, в DataFrame с помощью метода toDF ():
// change ebay RDD of Auction objects to a DataFrame
val auction = ebay.toDF()
Предыдущие преобразования RDD также можно записать в одну строку, например:
val auction = sc.textFile("ebay.csv").map(_.split(",")).map(p =>
Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt )).toDF()
Исследуйте и запросите данные аукциона eBay с помощью Spark DataFrames
DataFrames предоставляют предметно-ориентированный язык для управления структурированными данными в Scala, Java и Python; Ниже приведены некоторые примеры с аукционом DataFrame. Действие showFrame show () отображает 20 верхних строк в табличной форме.
// Display the top 20 rows of DataFrame
auction.show()
// auctionid bid bidtime bidder bidderrate openbid price item daystolive
// 8213034705 95.0 2.927373 jake7870 0 95.0 117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2 1 95.0 117.5 xbox 3 …
DataFrame printSchema () Печатает схему на консоли в древовидном формате
// Return the schema of this DataFrame
auction.printSchema()
//root
// |-- auctionid: string (nullable = true)
// |-- bid: float (nullable = false)
// |-- bidtime: float (nullable = false)
// |-- bidder: string (nullable = true)
// |-- bidderrate: integer (nullable = true)
// |-- openbid: float (nullable = false)
// |-- price: float (nullable = false)
// |-- item: string (nullable = true)
// |-- daystolive: integer (nullable = true)
После того, как экземпляр данных создан, вы можете запросить его с помощью SQL-запросов. Вот несколько примеров запросов с использованием Scala DataFrame API:
// How many auctions were held?
auction.select("auctionid").distinct.count
// Long = 627
// How many bids per item?
auction.groupBy("auctionid", "item").count.show
//auctionid item count
//3016429446 palm 10
//8211851222 xbox 28
//3014480955 palm 12
// What's the min number of bids per item? what's the average? what's the max?
auction.groupBy("item", "auctionid").count.agg(min("count"), avg("count"),max("count")).show
// MIN(count) AVG(count) MAX(count)
// 1 16.992025518341308 75
// Get the auctions with closing price > 100
val highprice= auction.filter("price > 100")
// highprice: org.apache.spark.sql.DataFrame = [auctionid: string, bid: float, bidtime: float, bidder: // string, bidderrate: int, openbid: float, price: float, item: string, daystolive: int]
// display dataframe in a tabular format
highprice.show()
// auctionid bid bidtime bidder bidderrate openbid price item daystolive
// 8213034705 95.0 2.927373 jake7870 0 95.0 117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2 1 95.0 117.5 xbox 3
Вы можете зарегистрировать DataFrame как временную таблицу с заданным именем, а затем выполнить операторы SQL, используя методы sql, предоставляемые sqlContext. Вот несколько примеров запросов с использованием sqlContext :
// register the DataFrame as a temp table
auction.registerTempTable("auction")
// SQL statements can be run
// How many bids per auction?
val results =sqlContext.sql("SELECT auctionid, item, count(bid) FROM auction GROUP BY auctionid, item")
// display dataframe in a tabular format
results.show()
// auctionid item count
// 3016429446 palm 10
// 8211851222 xbox 28. . .
val results =sqlContext.sql("SELECT auctionid, MAX(price) FROM auction GROUP BY item,auctionid")
results.show()
// auctionid c1
// 3019326300 207.5
// 8213060420 120.0 . . .
Загрузка данных SFPD в кадры данных Spark с использованием библиотеки синтаксического анализа csv
Теперь мы загрузим набор данных SFPD в фрейм данных Spark, используя библиотеку разбора spark-csv из Databricks. Вы можете использовать эту библиотеку в оболочке Spark, указав —packages com.databricks: spark-csv_2.10: 1.0.3 при запуске оболочки, как показано ниже:
$ spark-shell --packages com.databricks:spark-csv_2.10:1.0.3
Операция загрузки проанализирует файл sfpd.csv и вернет фрейм данных, используя первую строку заголовка файла для имен столбцов.
import sqlContext.implicits._
import org.apache.spark.sql._
// Return the dataset specified by data source as a DataFrame, use the header for column names
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "sfpd.csv", "header" -> "true"))
Операция take возвращает указанное количество строк в DataFame.
// Return the first n rows in the DataFrame
df.take(1)
// res4: Array[org.apache.spark.sql.Row] = Array([150467944,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Thursday,05/28/2015,23:59,TENDERLOIN,NONE,TAYLOR ST / OFARRELL ST,-122.411328369311,37.7859963050476,(37.7859963050476, -122.411328369311),15046794406244])
// Print the schema to the console in a tree format
df.printSchema()
//root
// |-- IncidntNum: string (nullable = true)
// |-- Category: string (nullable = true)
// |-- Descript: string (nullable = true)
// |-- DayOfWeek: string (nullable = true)
// |-- Date: string (nullable = true)
// |-- Time: string (nullable = true)
// |-- PdDistrict: string (nullable = true)
// |-- Resolution: string (nullable = true)
// |-- Address: string (nullable = true)
// |-- X: string (nullable = true)
// |-- Y: string (nullable = true)
// |-- Location: string (nullable = true)
// |-- PdId: string (nullable = true)
// display dataframe in a tabular format
df.show()
//IncidntNum Category Descript DayOfWeek Date Time PdDistrict Resolution Address X Y Location PdId
//150467944 LARCENY/THEFT GRAND THEFT FROM ... Thursday 05/28/2015 23:59 TENDERLOIN NONE TAYLOR ST / OFARR... -122.411328369311 37.7859963050476 (37.7859963050476... 15046794406244
Вот несколько примеров запросов с использованием sqlContext:
// how many categories are there?
df.select("Category").distinct.count
// res5: Long = 39
// register as a temp table inorder to use sql
df.registerTempTable("sfpd")
// How many categories are there
sqlContext.sql("SELECT distinct Category FROM sfpd").collect().foreach(println)
// [ASSAULT]
// [MISSING PERSON]
// [TREA] . . .
// What are the top 10 Resolutions ?
sqlContext.sql("SELECT Resolution , count(Resolution) as rescount FROM sfpd group by Resolution order by rescount desc limit 10").collect().foreach(println)
// [NONE,1063775]
// [ARREST, BOOKED,414219]
// [ARREST, CITED,154033] . . .
// What are the top 10 most incident Categories?
val t = sqlContext.sql("SELECT Category , count(Category) as catcount FROM sfpd group by Category order by catcount desc limit 10")
t.show()
// Category catcount
// LARCENY/THEFT 353793
// OTHER OFFENSES 253627
// NON-CRIMINAL 186272. . .
// The results of SQL queries are DataFrames and support RDD operations.
// The columns of a row in the result can be accessed by ordinal
t.map(t => "column 0: " + t(0)).collect().foreach(println)
// column 0: LARCENY/THEFT
// column 0: OTHER OFFENSES
// column 0: NON-CRIMINAL
// column 0: ASSAULT …
Физический план для фреймов данных
Оптимизатор Catalyst запрос создает физический план выполнения для DataFrames , как показано на рисунке ниже:
Печать физического плана на консоль
Фреймы данных разработаны для того, чтобы принимать запросы SQL, построенные на них, и оптимизировать выполнение в виде последовательностей Spark Jobs по мере необходимости. Вы можете распечатать физический план для DataFrame, используя операцию объяснения, как показано ниже:
// Prints the physical plan to the console for debugging purpose
auction.select("auctionid").distinct.explain()
// == Physical Plan ==
// Distinct false
// Exchange (HashPartitioning [auctionid#0], 200)
// Distinct true
// Project [auctionid#0]
// PhysicalRDD //[auctionid#0,bid#1,bidtime#2,bidder#3,bidderrate#4,openbid#5,price#6,item#7,daystolive#8], MapPartitionsRDD[11] at mapPartitions at ExistingRDD.scala:37
Резюме
В этом посте вы узнали, как загружать данные в Spark DataFrames и исследовать данные с помощью Spark SQL. Если у вас есть дополнительные вопросы или вы хотите поделиться информацией о том, как вы используете Spark DataFrames, добавьте свои комментарии в раздел ниже.