Статьи

Использование Apache Spark DataFrames для обработки табличных данных

Этот пост поможет вам начать использовать Apache Spark DataFrames со Scala в песочнице MapR. Новый Спарк DataFrames API  разработан , чтобы сделать большую обработку данных по табличным данным легче. Spark DataFrame — это распределенная коллекция данных, организованная в именованные столбцы, которая предоставляет операции для фильтрации, группировки или вычисления агрегатов, и может использоваться с Spark SQL. Фреймы данных могут быть построены из файлов структурированных данных, существующих СДР, таблиц в Hive или внешних баз данных. В этом посте вы узнаете, как:

  • Загрузить данные в Spark DataFrames
  • Исследуйте данные с помощью Spark SQL

Этот пост предполагает базовое понимание концепций Spark. Если вы еще не читали учебник по  началу работы со Spark в MapR Sandbox , было бы неплохо сначала прочитать его.

Программное обеспечение

Этот учебник будет работать на песочнице MapR v5.0, которая включает в себя Spark 1.3

  • Вы можете скачать код и данные для запуска этих примеров здесь:

  • Примеры в этом посте можно запустить в spark-shell после запуска с помощью команды spark-shell.
  • Вы также можете запустить код как отдельное приложение, как описано в руководстве по  началу работы с Spark в MapR Sandbox .

Примеры наборов данных

Мы будем использовать два примера наборов данных — один из  онлайн-аукционов eBay  и один из  системы отчетности о преступлениях SFPD .

Набор данных онлайн-аукциона eBay содержит следующие поля данных:

auctionid  — уникальный идентификатор аукциона
ставка  — прокси ставка помещается на цену
bidtime  — время (в днях)что заявка была размещена, с самого начала аукциона
участника торгов  — eBay имя пользователя Претендента
bidderrate  — eBay рейтинг обратной связи из претендент
openbid  — набор открытия торгов продавца
цена  — цена закрытиячто проданная деталь для (эквивалента второго наивысочайшей ставки + приращение)

В таблице ниже показаны поля данных с некоторыми примерами данных: 

Используя Spark DataFrames, мы исследуем данные с такими вопросами, как:

  • Сколько аукционов было проведено?
  • Сколько ставок было сделано на предмет?
  • Какое минимальное, максимальное и среднее количество ставок на единицу товара?
  • Показать предложения с ценой> 100

В таблице ниже показаны поля данных SFPD с некоторыми примерами данных: 

Используя Spark DataFrames, мы изучим данные SFPD с помощью следующих вопросов:

  • Каковы 10 лучших резолюций?
  • Сколько существует категорий?
  • Каковы 10 лучших категорий инцидентов?

Загрузка данных в Spark DataFrames

Войдите в MapR Sandbox, как описано в разделе  Начало работы с Spark в MapR Sandbox , используя идентификатор пользователя user01, пароль mapr. Скопируйте файлы примеров данных в домашнюю директорию песочницы / user / user01 с помощью scp. Начните искровую оболочку с: 
$ spark-shell

Сначала мы импортируем некоторые пакеты и создадим экземпляр sqlContext, который является точкой входа для работы со структурированными данными (строками и столбцами) в Spark и позволяет создавать объекты DataFrame. (В фрагментах кода ниже вывод показан в комментариях)

//  SQLContext entry point for working with structured data
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Import Spark SQL data types and Row.
import org.apache.spark.sql._

Ниже мы загружаем данные из файла ebay.csv в Resilient Distributed Dataset (RDD). СДР могут иметь преобразования и действия; действие first () возвращает первый элемент в RDD, который является строкой “8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3”

// load the data into a  new RDD
val ebayText = sc.textFile("ebay.csv")

// Return the first element in this RDD
ebayText.first()
// res6: String = 8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3

Ниже мы используем класс case Scala для определения схемы аукциона, соответствующей файлу ebay.csv. Затем преобразования map () применяются к каждому элементу ebayText для создания СДР ebay объектов Auction.

//define the schema using a case class
case class Auction(auctionid: String, bid: Float, bidtime: Float, bidder: String, bidderrate: Integer, openbid: Float, price: Float, item: String, daystolive: Integer)

// create an RDD of Auction objects 
val ebay = ebayText.map(_.split(",")).map(p => Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt ))

Действие ebay RDD first () возвращает первый элемент в RDD, Auction = Auction (8213034705, 95.0, 2.927373, jake7870, 0, 95.0, 117.5, xbox, 3).

// Return the first element in this RDD
ebay.first()
//res7: Auction = Auction(8213034705,95.0,2.927373,jake7870,0,95.0,117.5,xbox,3)
// Return the number of elements in the RDD
ebay.count()
//res8: Long = 10654

DataFrame — это распределенная коллекция данных, организованная в именованные столбцы. Spark SQL поддерживает автоматическое преобразование RDD, содержащей классы дел, в DataFrame с помощью метода toDF ():

// change ebay RDD of Auction objects to a DataFrame
val auction = ebay.toDF()

Предыдущие преобразования RDD также можно записать в одну строку, например:

val auction = sc.textFile("ebay.csv").map(_.split(",")).map(p => 
Auction(p(0),p(1).toFloat,p(2).toFloat,p(3),p(4).toInt,p(5).toFloat,p(6).toFloat,p(7),p(8).toInt )).toDF()

Исследуйте и запросите данные аукциона eBay с помощью Spark DataFrames

DataFrames предоставляют предметно-ориентированный язык для управления структурированными данными в Scala, Java и Python; Ниже приведены некоторые примеры с аукционом DataFrame. Действие showFrame show () отображает 20 верхних строк в табличной форме.

// Display the top 20 rows of DataFrame 
auction.show()
// auctionid  bid   bidtime  bidder         bidderrate openbid price item daystolive
// 8213034705 95.0  2.927373 jake7870       0          95.0    117.5 xbox 3
// 8213034705 115.0 2.943484 davidbresler2  1          95.0    117.5 xbox 3 …

DataFrame printSchema () Печатает схему на консоли в древовидном формате

// Return the schema of this DataFrame
auction.printSchema()
//root
// |-- auctionid: string (nullable = true)
// |-- bid: float (nullable = false)
// |-- bidtime: float (nullable = false)
// |-- bidder: string (nullable = true)
// |-- bidderrate: integer (nullable = true)
// |-- openbid: float (nullable = false)
// |-- price: float (nullable = false)
// |-- item: string (nullable = true)
// |-- daystolive: integer (nullable = true)

После того, как экземпляр данных создан, вы можете запросить его с помощью SQL-запросов. Вот несколько примеров запросов с использованием Scala DataFrame API:

// How many auctions were held?
auction.select("auctionid").distinct.count
// Long = 627

// How many bids per item?
auction.groupBy("auctionid", "item").count.show
//auctionid  item    count
//3016429446 palm    10
//8211851222 xbox    28
//3014480955 palm    12

// What's the min number of bids per item? what's the average? what's the max? 
auction.groupBy("item", "auctionid").count.agg(min("count"), avg("count"),max("count")).show

// MIN(count) AVG(count)        MAX(count)
// 1  16.992025518341308 75

// Get the auctions with closing price > 100
val highprice= auction.filter("price > 100")
// highprice: org.apache.spark.sql.DataFrame = [auctionid: string, bid: float, bidtime: float, bidder: // string, bidderrate: int, openbid: float, price: float, item: string, daystolive: int]

// display dataframe in a tabular format
highprice.show()
// auctionid  bid   bidtime  bidder         bidderrate openbid price item daystolive
// 8213034705 95.0  2.927373 jake7870       0          95.0    117.5 xbox 3        
// 8213034705 115.0 2.943484 davidbresler2  1          95.0    117.5 xbox 3

Вы можете зарегистрировать DataFrame как временную таблицу с заданным именем, а затем выполнить операторы SQL, используя методы sql, предоставляемые sqlContext. Вот несколько примеров запросов с использованием  sqlContext :

// register the DataFrame as a temp table 
auction.registerTempTable("auction")
// SQL statements can be run 
// How many  bids per auction?
val results =sqlContext.sql("SELECT auctionid, item,  count(bid) FROM auction GROUP BY auctionid, item")
// display dataframe in a tabular format
results.show()
// auctionid  item    count
// 3016429446 palm    10
// 8211851222 xbox    28. . . 

val results =sqlContext.sql("SELECT auctionid, MAX(price) FROM auction  GROUP BY item,auctionid")
results.show()
// auctionid  c1
// 3019326300 207.5
// 8213060420 120.0 . . . 

Загрузка данных SFPD в кадры данных Spark с использованием библиотеки синтаксического анализа csv

Теперь мы загрузим набор данных SFPD в фрейм данных Spark, используя  библиотеку разбора spark-csv  из Databricks. Вы можете использовать эту библиотеку в оболочке Spark, указав —packages com.databricks: spark-csv_2.10: 1.0.3 при запуске оболочки, как показано ниже:

$ spark-shell --packages com.databricks:spark-csv_2.10:1.0.3

Операция загрузки проанализирует файл sfpd.csv и вернет фрейм данных, используя первую строку заголовка файла для имен столбцов.

import sqlContext.implicits._
import org.apache.spark.sql._

//  Return the dataset specified by data source as a DataFrame, use the header for column names
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "sfpd.csv", "header" -> "true"))

Операция take возвращает указанное количество строк в DataFame.

// Return the first n rows in the DataFrame
df.take(1)

// res4: Array[org.apache.spark.sql.Row] = Array([150467944,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Thursday,05/28/2015,23:59,TENDERLOIN,NONE,TAYLOR ST / OFARRELL ST,-122.411328369311,37.7859963050476,(37.7859963050476, -122.411328369311),15046794406244])

// Print the schema to the console in a tree format
df.printSchema()
//root
// |-- IncidntNum: string (nullable = true)
// |-- Category: string (nullable = true)
// |-- Descript: string (nullable = true)
// |-- DayOfWeek: string (nullable = true)
// |-- Date: string (nullable = true)
// |-- Time: string (nullable = true)
// |-- PdDistrict: string (nullable = true)
// |-- Resolution: string (nullable = true)
// |-- Address: string (nullable = true)
// |-- X: string (nullable = true)
// |-- Y: string (nullable = true)
// |-- Location: string (nullable = true)
// |-- PdId: string (nullable = true)

// display dataframe in a tabular format
df.show()
//IncidntNum Category Descript DayOfWeek Date Time PdDistrict Resolution Address X Y Location PdId
//150467944  LARCENY/THEFT GRAND THEFT FROM ... Thursday  05/28/2015 23:59 TENDERLOIN NONE           TAYLOR ST / OFARR... -122.411328369311 37.7859963050476 (37.7859963050476... 15046794406244

Вот несколько примеров запросов с использованием sqlContext:

// how many categories are there?
df.select("Category").distinct.count
// res5: Long = 39
// register as a temp table inorder to use sql 
df.registerTempTable("sfpd")

// How many categories are there
sqlContext.sql("SELECT distinct Category FROM sfpd").collect().foreach(println)

// [ASSAULT]
// [MISSING PERSON]
// [TREA] . . .
// What are the top 10 Resolutions ?
sqlContext.sql("SELECT Resolution , count(Resolution) as rescount FROM sfpd group by Resolution order by rescount desc limit 10").collect().foreach(println)
// [NONE,1063775]
// [ARREST, BOOKED,414219]
// [ARREST, CITED,154033] . . .
// What are the top 10 most incident Categories?
val t =  sqlContext.sql("SELECT Category , count(Category) as catcount FROM sfpd group by Category order by catcount desc limit 10")

t.show()
// Category       catcount
// LARCENY/THEFT  353793
// OTHER OFFENSES 253627
// NON-CRIMINAL   186272. . .

// The results of SQL queries are DataFrames and support RDD operations.
// The columns of a row in the result can be accessed by ordinal
t.map(t => "column 0: " + t(0)).collect().foreach(println)
// column 0: LARCENY/THEFT
// column 0: OTHER OFFENSES
// column 0: NON-CRIMINAL
// column 0: ASSAULT …

Физический план для фреймов данных

Оптимизатор Catalyst запрос создает физический план выполнения  для DataFrames , как показано на рисунке ниже: 

Печать физического плана на консоль

Фреймы данных разработаны для того, чтобы принимать запросы SQL, построенные на них, и оптимизировать выполнение в виде последовательностей Spark Jobs по мере необходимости. Вы можете распечатать физический план для DataFrame, используя операцию объяснения, как показано ниже:

//  Prints the physical plan to the console for debugging purpose
auction.select("auctionid").distinct.explain()

// == Physical Plan ==
// Distinct false
// Exchange (HashPartitioning [auctionid#0], 200)
//  Distinct true
//   Project [auctionid#0]
 //   PhysicalRDD   //[auctionid#0,bid#1,bidtime#2,bidder#3,bidderrate#4,openbid#5,price#6,item#7,daystolive#8], MapPartitionsRDD[11] at mapPartitions at ExistingRDD.scala:37

Резюме

В этом посте вы узнали, как загружать данные в Spark DataFrames и исследовать данные с помощью Spark SQL. Если у вас есть дополнительные вопросы или вы хотите поделиться информацией о том, как вы используете Spark DataFrames, добавьте свои комментарии в раздел ниже.