Статьи

API источника данных Spark: расширение механизма Spark SQL Query

В моем последнем посте, Apache Spark как механизм распределенного SQL , мы объяснили, как мы можем использовать SQL для запроса наших данных, хранящихся в Hadoop. Наш движок способен читать CSV-файлы из распределенной файловой системы, автоматически обнаруживая схему из файлов и выставляя их в виде таблиц через мета-хранилище Hive. Все это было сделано для того, чтобы иметь возможность подключать стандартные клиенты SQL к нашему движку и исследовать наш набор данных без ручного определения схемы наших файлов, избегая работы ETL.

Spark предоставляет платформу, которая может быть расширена, и мы расширим ее возможности, расширив некоторые из ее функций.

API источника данных Spark

API источника данных позволяет нам управлять структурированными данными в любом формате. В Spark уже встроены некоторые стандартные структуры, такие как Avro и Parquet, но третьи лица создали новые читатели для CSV, JSON и других, расширив этот API. Сегодня мы собираемся создать свой собственный.

У нас есть две причины для расширения API.

Во-первых, нам нужна библиотека, способная считывать наш прежний формат и преобразовывать наш текущий источник данных в новый, более простой в использовании.

Во-вторых, мы хотим поделиться этой библиотекой со всеми нашими приложениями, которые используют наши данные, избегая сложной упаковки приложений, которые должны совместно использоваться для достижения той же цели.

Источник данных

Наш источник данных состоит из набора файлов, каждый из которых сам по себе является сущностью. Для этого примера мы определили простой формат, где каждый файл представляет собой текстовый файл, содержащий информацию о пользователе, каждое поле за строкой. Давайте посмотрим пример файла.

1
2
3
4
pepe
20
Miami
Cube

Этот файл представляет пользователя «pepe», которому 20 лет, он живет в Майами и родился на Кубе.

В реальном мире формат может быть настолько сложным, насколько мы хотим, но процесс, который мы собираемся объяснить, не изменится.

Каждый файл имеет одинаковый формат, и у нас их миллионы. Мы также хотим выставить их для запроса в SQL.

Наша реализация

Чтобы расширить API источника данных, нам нужно реализовать определенные классы из среды Spark, чтобы можно было загружать и использовать нашу специальную программу чтения.

Давайте начнем с создания приложения Spark в качестве точки входа в наш пример. Мы можем сделать это, следуя сообщениям SBT, Scala и Spark .

Первое, что нам нужно сделать после создания приложения, это связать правильные библиотеки Spark. Мы собираемся запустить примеры на Spark 1.5.1, и наш файл sbt определен следующим образом.

1
name := "spark-datasource" version := "1.0" scalaVersion := "2.11.7" libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.5.1" libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.5.1"

Создание нашей схемы

Начальной точкой расширения API источника данных является класс RelationProvider. Класс RelationProvider будет использоваться для создания необходимых отношений наших данных.

Нам также нужно смешать черту SchemaRelationProvider, которая позволяет нам создавать схемы, которые мы хотим.

Нам нужно создать класс с именем DefaultSource, и Spark будет искать его в заданном пакете. Класс DefaultSource расширяет RelationProvider и смешивает SchemaRelationProvider

Наш код выглядит следующим образом:

1
class DefaultSource extends RelationProvider with SchemaRelationProvider {   override def createRelation(sqlContext: SQLContext, parameters: Map[String, String])     : BaseRelation = {     createRelation(sqlContext, parameters, null)   }   override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]     , schema: StructType)     : BaseRelation = {     parameters.getOrElse("path", sys.error("'path' must be specified for our data."))     return new LegacyRelation(parameters.get("path").get, schema)(sqlContext)   } }

В коде мы в основном создаем объект LegacyRelation, который определил отношение, которое мы хотим создать. Подумайте об отношении как о наборе кортежей с известной схемой.

Давайте посмотрим, как реализован наш класс Relation.

1
class LegacyRelation(location: String, userSchema: StructType) (@transient val sqlContext: SQLContext)   extends BaseRelation        with Serializable {   override def schema: StructType = {     if (this.userSchema != null) {       return this.userSchema     }     else {       return StructType(Seq(StructField("name", StringType, true),                              StructField("age", IntegerType, true)))     }   } }

Здесь мы переопределяем функцию схемы, поэтому она возвращает схему, которую мы хотим. В этом примере мы знаем схему наших данных, но здесь мы можем сделать все, что захотим, чтобы получить требуемую схему. Если бы данные были CSV, мы могли бы вывести схему, используя заголовки файла, или выполнить любые другие необходимые нам операции.

Обратите внимание, что нам нужны только поля имени и возраста, а не весь контент наших объектов.

Следующим шагом является проверка того, что мы получаем правильную схему, и мы можем сделать это, добавив следующий код в наше приложение.

1
object app {   def main(args: Array[String]) {     val config = new SparkConf().setAppName("testing provider")     val sc = new SparkContext(config)     val sqlContext = new SQLContext(sc)        val df = sqlContext               .read               .format("com.nico.datasource.dat")               .load("/Users/anicolaspp/data/")                       df.printSchema()   } }

Этот код создает SparkContext и SQLContext из него. Используя SQLContext, мы устанавливаем формат, передавая имя пакета (Помните, что Spark будет искать этот пакет для класса DefaultSource). Затем мы загружаем данные по указанному пути с помощью нашего провайдера в DataFrame.

1
df.printSchema()

напечатает схему, которую мы определили, и результат должен выглядеть следующим образом.

1
root  |-- name: string (nullable = true)  |-- age: integer (nullable = true)

На данный момент, мы только создали схему, которую мы хотим, но нет ничего, что говорит о том, как подготовить данные и как структурировать их в нашей определенной схеме.

Чтение данных в нашу схему

Для чтения из нашего источника данных наш класс LegacyRelation должен смешивать черту TableScan. В TableScan есть метод, который нам нужно реализовать со следующей сигнатурой:

1
def buildScan(): RDD[Row]

Метод buildScan должен возвращать все строки из нашего источника данных. В нашем конкретном случае каждая строка будет выбранным содержимым каждого файла. Давайте посмотрим на нашу реализацию buildScan.

1
override def buildScan(): RDD[Row] = {     val rdd = sqlContext                 .sparkContext                 .wholeTextFiles(location)                 .map(x => x._2)        val rows = rdd.map(file => {       val lines = file.split("\n")       Row.fromSeq(Seq(lines(0), lines(1)))     })     rows   }

Здесь мы используем метод wholeTextFiles, который читает весь файл (каждый файл является сущностью), читает первые две строки (единственные поля, которые нам нужны) и создает строку из каждой из них. В результате получается коллекция строк, каждая из которых создается с использованием только той части файла, которая нам нужна.

Этого будет достаточно, чтобы изменить наше приложение, чтобы оно распечатывало содержимое нашего источника данных. Приложение теперь выглядит следующим образом.

1
object app {   def main(args: Array[String]) {     val config = new SparkConf().setAppName("testing provider")     val sc = new SparkContext(config)     val sqlContext = new SQLContext(sc)        val df = sqlContext               .read               .format("com.nico.datasource.dat")               .load("/Users/anicolaspp/data/")                       df.show()   } }

Несмотря на то, что мы читаем нужный формат в фрейм данных, нет информации о типах полей наших данных. Наша схема по определению поддерживает разные типы данных, но мы не применяем их принудительно.

Давайте изменим наш метод buildScan, чтобы он выводил информацию о типе при создании каждой строки.

1
override def buildScan(): RDD[Row] = {     val schemaFields = schema.fields     val rdd = sqlContext                 .sparkContext                 .wholeTextFiles(location)                 .map(x => x._2)          val rows = rdd.map(file => {       val lines = file.split("\n")              val typedValues = lines.zipWithIndex.map {         case (value, index) => {           val dataType = schemaFields(index).dataType           castValue(value, dataType)         }     nbsp;  }       Row.fromSeq(typedValues)     })          rows   }       private def castValue(value: String, toType: DataType) = toType match {     case _: StringType      => value     case _: IntegerType     => value.toInt   }

Здесь единственное изменение заключается в том, что мы приводим каждое значение, прочитанное из наших файлов, к его правильному типу, выведенному из объекта schema.fields. В нашем конкретном случае нас интересует только то, что имя — String, а age — Integer, но, опять же, мы могли бы быть очень креативными на этом этапе.

Теперь наш последний класс LegacyRelation будет выглядеть следующим образом.

1
class LegacyRelation(location: String, userSchema: StructType)   (@transient val sqlContext: SQLContext)   extends BaseRelation       with TableScan with Serializable {   override def schema: StructType = {     if (this.userSchema != null) {       return this.userSchema     }     else {       return StructType(Seq(StructField("name", StringType, true),                              StructField("age", IntegerType, true)))     }   }   private def castValue(value: String, toType: DataType) = toType match {     case _: StringType      => value     case _: IntegerType     => value.toInt   }   override def buildScan(): RDD[Row] = {     val schemaFields = schema.fields     val rdd = sqlContext               .sparkContext               .wholeTextFiles(location)               .map(x => x._2)                    val rows = rdd.map(file => {       val lines = file.split("\n")       val typedValues = lines.zipWithIndex.map{         case (value, index) => {           val dataType = schemaFields(index).dataType           castValue(value, dataType)         }       }       Row.fromSeq(typedValues)     })     rows   }

Теперь мы можем загрузить наши данные в DataFrame и зарегистрировать их для использования клиентами SQL, как мы объясняли в нашем предыдущем посте. Наше приложение так же просто, как мы показываем.

1
object app {   def main(args: Array[String]) {     val config = new SparkConf().setAppName("testing provider")     val sc = new SparkContext(config)     val sqlContext = new SQLContext(sc)     val df = sqlContext               .read               .format("com.nico.datasource.dat")               .load("/Users/anicolaspp/data/")        df.registerTempTable("users")     sqlContext.sql("select name from users").show()   } }

Мы показали достаточно для чтения пользовательского формата во фрейм данных, чтобы мы могли воспользоваться преимуществами API DataFrame, но можно сделать еще больше.

API источника данных предлагает не только функциональные возможности для чтения данных, но и для записи их в произвольном формате. Эта функциональность очень мощная, если мы хотим преобразовать набор данных из формата в другой. Давайте посмотрим, как мы добавим эти возможности в наш существующий драйвер.

Написание форматтера

Предположим, мы хотим сохранить наши данные, чтобы их можно было прочитать из других стандартных систем. Мы собираемся загрузить наш пользовательский источник данных и создать CSV-подобный вывод из него.

Чтобы поддерживать вызовы сохранения из API, наш класс DefaultSource должен смешиваться с признаком CreatableRelationProvider. Эта черта имеет метод createRelation, который мы должны реализовать, давайте рассмотрим его.

1
override def createRelation(sqlContext: SQLContext, mode: SaveMode,      parameters: Map[String, String], data: DataFrame): BaseRelation = {          saveAsCsvFile(data, parameters.get("path").get)     createRelation(sqlContext, parameters, data.schema)   }      def saveAsCsvFile(data: DataFrame, path: String) = {     val dataCustomRDD = data.rdd.map(row => {       val values = row.toSeq.map(value => value.toString)       values.mkString(",")     })     dataCustomRDD.saveAsTextFile(path)   }

Мы в основном сохраняем наш фрейм данных в виде CSV-файла и затем возвращаем отношение с известной схемой.

Метод saveAsCsvFile создает RDD [String] с нашими данными, отформатированными как CSV, затем сохраняет его по указанному пути. Для простоты мы не включили заголовки в наши выходные файлы, но помните, что мы можем делать все, что нам нужно, для вывода данных в требуемом формате.

Весь код нашего класса DefaultSource следующий.

1
class DefaultSource extends RelationProvider      with SchemaRelationProvider      with CreatableRelationProvider {   override def createRelation(sqlContext: SQLContext,      parameters: Map[String, String]): BaseRelation = {                  createRelation(sqlContext, parameters, null)   }   override def createRelation(sqlContext: SQLContext,      parameters: Map[String, String], schema: StructType): BaseRelation = {              parameters.getOrElse("path", sys.error("'path' must be specified for CSV data."))         return new LegacyRelation(parameters.get("path").get, schema)(sqlContext)   }   def saveAsCsvFile(data: DataFrame, path: String) = {     val dataCustomRDD = data.rdd.map(row => {       val values = row.toSeq.map(value => value.toString)       values.mkString(",")     })     dataCustomRDD.saveAsTextFile(path)   }   override def createRelation(sqlContext: SQLContext, mode: SaveMode,      parameters: Map[String, String], data: DataFrame): BaseRelation = {              saveAsCsvFile(data, parameters.get("path").get)         createRelation(sqlContext, parameters, data.schema)   } }

Чтобы сохранить исходные данные в формате CSV, мы модифицируем наше приложение следующим образом.

1
object app {   def main(args: Array[String]) {     val config = new SparkConf().setAppName("testing provider")     val sc = new SparkContext(config)     val sqlContext = new SQLContext(sc)          val df = sqlContext               .read               .format("com.nico.datasource.dat")               .load("/Users/anicolaspp/data/")             df.write       .format("com.nico.datasource.dat")       .save("/Users/anicolaspp/data/output")   } }

Обратите внимание, что каждый раз, когда мы читаем / записываем наши данные, нам нужно указать имя пакета, в котором находится наш класс DefaultSource.

Теперь мы можем упаковать нашу библиотеку и включить ее в любой проект, в котором нам нужно использовать источник данных, который мы описали. Многие другие библиотеки создаются для поддержки всех возможных форматов, которые мы можем себе представить, и теперь вы можете создать свою собственную, чтобы внести свой вклад в сообщество или просто использовать в своих собственных проектах.

окончания

Мы видели, как загружать данные из пользовательского формата во фреймы данных с помощью API источника данных Spark. Мы также рассмотрели классы, участвующие в процессе, особенно то, как Spark использует наш DefaultSource из нашего пакета для выполнения необходимых операций. Мы также реализовали форматер вывода, чтобы наши фреймы данных могли быть сохранены, как нам нравится.

С API Data Source мы можем сделать гораздо больше, но найти нужную документацию было довольно сложно. Я считаю, что можно создать лучшую документацию, особенно для тех частей API, которые очень полезны при их расширении.

Несмотря на то, что в нашем примере показано, как расширить API источника данных для поддержки простого формата, его можно изменить для чтения и записи более сложных типов, таких как двоичные объекты.

Возможность интеграции наших собственных типов данных в Spark делает его одной из лучших платформ для обработки данных.

В мире Hadoop мы можем найти множество инструментов, которые разделяют цели и функции, но ни один из них не является настолько гибким и универсальным, как Spark. Это делает Spark очень желательным в этой области. Если нас интересует среда обработки, способная работать в безграничных условиях, то Apache Spark — это то, что нужно.