Статьи

Apache Spark как распределенный движок SQL

SQL здесь уже давно, и людям это нравится. Однако механизмы, обеспечивающие работу SQL, со временем менялись, чтобы решать новые проблемы и не отставать от требований потребителей.

Традиционные механизмы, такие как Microsoft SQL Server, имели некоторые проблемы с масштабируемостью, которые они решили с помощью временных и облачных решений. С другой стороны, другие были созданы с нуля для работы в распределенной среде, чтобы они могли поставить производительность на первое место в своем списке приоритетов.

Существует не инструмент для всех случаев использования. На самом деле, мы считаем, что инструменты создаются с учетом вариантов использования для решения конкретной проблемы. Затем они эволюционируют до более зрелой стадии, где их можно использовать для решения многих других проблем.

В традиционной среде SQL данные представлены в виде таблиц и взаимосвязей между ними, но этого представления иногда недостаточно, поэтому для решения этой проблемы были разработаны новые инструменты. Мы можем найти везде организации, которые не используют реляционные базы данных; вместо этого они предпочитают переходить на не-SQL.

Hadoop

В мире Hadoop у нас есть множество различных механизмов запросов; у каждого из них есть свои особенности, и каждый из них решает самые разные проблемы.

В любом дистрибутиве Hadoop мы можем найти Apache Hive , похожий на SQL инструмент, который предлагает инфраструктуру хранилища данных и возможности для запросов и анализа больших данных.

В зависимости от дистрибутива Hadoop, мы также можем найти Apache Impala и Apache Drill . Все они предлагают более или менее одинаковые возможности, разделяя общую цель. Мы можем использовать SQL или SQL-подобные языки для запроса данных, хранящихся в Hadoop. У них также есть свои ограничения и преимущества, о которых вы должны знать. Вот ссылка с более подробной информацией об этих технологиях.

Apache Spark

Apache Spark — это молниеносные кластерные вычисления, которые можно развернуть в кластере Hadoop или в автономном режиме. Он также может быть использован в качестве движка SQL, как и другие, которые мы упоминали. Spark, однако, предлагает некоторые преимущества по сравнению с предыдущими.

Spark предоставляет API-интерфейсы для различных языков, таких как Scala, Java, Python и R. Это делает его доступным для многих типов людей, таких как разработчики, исследователи данных и те, кто имеет опыт работы со статистикой.

Интерактивные алгоритмы легко реализуются в Spark, особенно машинного обучения.

Давайте рассмотрим пример использования Spark в качестве движка SQL.

Изучение нашего источника данных

Наш набор данных представляет собой простую папку с несколькими терабайтами в файлах в формате CSV, и каждый файл занимает около 40 МБ каждый. Размер файлов не влияет на производительность, поскольку они хранятся в кластере MapR . MapR заботится о проблеме небольших файлов Hadoop, как я объясню в этом посте .

Поскольку мы используем MapR, копировать файлы в кластер довольно просто, поскольку мы смонтировали том в нашу локальную файловую систему.

Чтобы смонтировать том MapR, мы запускаем эту команду:

1
sudo mount_nfs -o "hard,nolock" 10.21.112.209:/mapr/mapr.domain.com/datalake /Users/anicolaspp/mapr/

Теперь, если мы снова запустим команды POSIX в нашей локальной папке, они фактически будут выполняться в кластере MapR.

Подготовка среды для автоматического обнаружения схемы

Мы собираемся создать приложение Spark с использованием Scala, которое позволит нам выполнять операторы SQL над нашими данными, хранящимися в MapR Distribution.

В этом посте я объяснил, как создать приложение в Spark, и о предыдущих шагах, которые мы должны выполнить.

Наш класс приложения будет выглядеть следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Created by anicolaspp.
*/
import org.apache.spark
import org.apache.spark._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object app {
 def main(args: Array[String]) {
 val conf = new SparkConf().setAppName("testing")
 val sc = new SparkContext(conf)
 val sql = new HiveContext(sc)
 sql.setConf("hive.server2.thrift.port", "10001")
 val delimiter = "\t"
 val data = sc.textFile("datalake/myTestDataFolder/")
 val headers = data.first.split(delimiter)
 val schema = StructType(headers.map(h => StructField(h, StringType)))
 val rowRDD = data.map(p => Row.fromSeq(p.split(delimiter)))
>
 val dataFrame = sql.createDataFrame(rowRDD, schema)
 dataFrame.registerTempTable("someTableName")
 HiveThriftServer2.startWithContext(sql)
 while (true) {
 Thread.`yield`()
 }
 }
}

Давайте рассмотрим наш код.

Сначала мы создаем Spark Context на основе объекта Config.

1
2
3
val conf = new SparkConf().setAppName("testing")
val sc = new SparkContext(conf)
val sql = new HiveContext(sc)

Затем мы устанавливаем комиссионный порт, чтобы избежать конфликтов с другими компонентами, такими как Hive.

1
sql.setConf("hive.server2.thrift.port", "10001")

Теперь мы устанавливаем наш разделитель CSV, который в данном случае является символом табуляции. Мы также устанавливаем местоположение нашего набора данных, создавая Resilient Distributed Dataset (RDD), используя Spark Context (sc).

1
2
val delimiter = "\t"
val data = sc.textFile("datalake/myTestDataFolder/")

На данный момент мы хотим иметь возможность обслуживать наши данные, не беспокоясь о схеме нашего файла; нам нужна среда BI самообслуживания, как я объяснил здесь . Используя заголовки из наших файлов данных, мы можем создать схему автоматически, поэтому нам не нужно беспокоиться об изменениях схемы в будущем. Как только у нас есть схема, мы создаем DataFrame, который мы собираемся предоставить для запроса с использованием SQL.

1
2
3
4
val headers = data.first.split(delimiter)
val schema = StructType(headers.map(h => StructField(h, StringType)))
val rowRDD = data.map(p => Row.fromSeq(p.split(delimiter)))
val dataFrame = sql.createDataFrame(rowRDD, schema)

Отсутствует только одна часть, которая регистрирует наш набор данных в виде таблицы в мета-хранилище Hive; мы делаем это, делая:

1
2
dataFrame.registerTempTable("someTableName")
HiveThriftServer2.startWithContext(sql)

У нас есть цикл, чтобы просто поддерживать наше приложение. Обратите внимание, что преобразования RDD являются ленивыми, и они будут выполняться только при отправке запроса на выполнение.

Развертывание нашего приложения

Мы создаем и тестируем наше приложение, используя SBT, и полученный .jar можно скопировать в кластер так же, как мы копируем файлы в нашей локальной файловой системе.

1
cp pathToOurJar/app.jar /Users/anicolaspp/mapr/testing

Помните, что это возможно, потому что мы ранее смонтировали том MapR в нашей локальной файловой системе.

Теперь нам нужно отправить наше приложение в кластер, и мы делаем это с помощью команды spark-submit. Подробную документацию о подаче заявок Spark можно найти на веб-сайте Spark .

В нашем кластере мы запускаем:

1
/spark-submit --master yarn /mapr/mapr.domain.com/datalake/testing/testing_2.10-1.0.jar

Наше приложение должно начать работать на YARN, как мы указали при его отправке.

Наш SQL-движок готов к запросу, поэтому давайте продолжим и протестируем его.

Клиенты SQL

Простой способ протестировать наш движок SQL — запустить beeline , инструмент командной строки, который работает как клиент SQL.

Мы можем найти Билайн в папке Spark bin. Для начала набираем ./beeline.

В beeline нам нужно подключиться к конечной точке, которую мы определили в нашем приложении, поэтому мы запускаем:

1
!connect jdbc:hive2://localhost:10001

Мы должны быть готовы к выполнению операторов SQL, но давайте проверим, что мы можем видеть таблицу, которую мы зарегистрировали.

1
show tables;

Spark SQL вернет таблицу с зарегистрированными таблицами, включая ту, которую мы зарегистрировали в нашем приложении (someTableName).

Таким же образом мы можем подключиться с помощью других клиентов, таких как Microstrategy или Tableau. Мы попробовали оба варианта, и они оба могут создавать и выполнять запросы к таблицам, зарегистрированным приложениями Spark. Мы также можем комбинировать различные источники (Spark SQL, MS SQL Server, Hive, Impala и т. Д.), Что дает нам гибкость объединения реляционных источников с нереляционными данными.

Spark SQL работает достаточно хорошо и часто лучше, чем другие провайдеры в Hadoop, но имейте в виду, что производительность может ухудшаться при определенных условиях и случаях использования.

Почему Apache Spark

Конечно, Spark SQL предлагает некоторые функции, которые есть в других инструментах Hadoop. Однако возможность исследования сложных наборов данных довольно уникальна для Spark, поскольку мы можем кодировать пользовательские процессы сериализации / десериализации в нашем приложении. Используя Spark SQL, мы можем подключиться к любому источнику данных и представить его в виде таблиц для использования клиентами SQL. Это так же просто, как изменить способ подготовки данных в этих источниках, изменив сериализатор в нашем приложении.

окончания

Есть очень полезные инструменты, которые мы можем использовать в Hadoop для запроса данных в стиле SQL, и все они имеют свои преимущества. Модуль Spark SQL от Apache Spark предлагает некоторую гибкость, которой не хватает другим, сохраняя производительность в качестве одного из главных приоритетов.

Spark — не единственный инструмент, который вы можете использовать, но мы настоятельно рекомендуем вам включить его в решения для больших данных, где должны выполняться операторы SQL. Возможно, вам потребуется использовать различные инструменты, но Spark должен быть важной частью системы, которую вы строите.

Ссылка: Apache Spark как распределенный движок SQL от нашего партнера по JCG Николаса А. Переса в блоге Mapr .