Статьи

Краткий обзор улучшений производительности в Apache Drill 1.4

Сегодня мы рады сообщить, что Apache Drill 1.4 теперь доступен в MapR Distribution. Drill 1.4 — это готовая и поддерживаемая версия для MapR, которую можно скачать здесь и найти примечания к выпуску 1.4 здесь .

Основываясь на своей чрезвычайно гибкой и масштабируемой архитектуре, Drill 1.4 предоставляет множество новых функций, а также улучшений производительности запросов, что делает его очень важной вехой для сообщества Drill.

Вот список ключевых функций / улучшений, доступных в Drill 1.4.

  • Улучшено взаимодействие с Tableau благодаря более быстрым запросам Limit 0
  • Ускорение запросов метаданных (INFORMATION_SCHEMA) к схемам / таблицам Hive
  • Оптимизированное планирование и выполнение запросов благодаря расширенному сокращению разделов
  • Эффективное кэширование метаданных Parquet, ускорение запросов к большому количеству файлов
  • Улучшенные оконные функции, использование ресурсов и производительность
  • Табличные функции
  • Улучшен синтаксический анализ заголовка CSV
  • Новый и улучшенный драйвер MapR Drill JDBC

В этой записи блога я хочу специально представить краткий обзор нескольких недавних улучшений производительности, а именно сокращения разделов и кэширования метаданных Parquet, которые позволят вам достичь времени отклика с низкой задержкой в ​​ваших развертываниях Drill. Кэширование метаданных — это новая функция, добавленная в Drill 1.2, и отсечение разделов существовало с Drill 1.0, но с 1.4 обе эти функции стали намного эффективнее и охватывают широкий спектр вариантов использования.

Позвольте мне начать с некоторого фона. Drill предназначен для достижения интерактивной производительности на крупномасштабных наборах данных, содержащих широкий спектр типов данных и источников данных. Производительность в любом механизме запросов состоит из двух частей:

  1. Время, затрачиваемое на разбор запроса и создание наиболее оптимального плана запроса (он же время планирования запроса).
  2. Время, затрачиваемое на выполнение сгенерированного плана запроса на различных узлах кластера путем извлечения и обработки данных из базовой системы хранения (так называемое время выполнения запроса).

Ниже приведен список некоторых основных элементов и методов архитектуры Drill на каждом из этих этапов, которые позволяют Drill достигать интерактивной производительности. Как видите, как сокращение разделов, так и кэширование метаданных являются примерами методов оптимизации, которые применяются как часть планирования запросов.

performanceenhancements-блог-img1

Обрезка перегородок

Размеры набора данных в больших системах данных, таких как Hadoop, могут быть огромными, от терабайтов до петабайтов. В некоторых случаях наборы данных могут начинаться с малого, но клиенты выбирают Hadoop, поскольку ожидают, что объем данных будет расти значительно и довольно быстро. Сокращение секций позволяет обработчику запросов определять и извлекать наименьший необходимый набор данных для ответа на данный запрос. Чтение небольших данных означает меньшее количество циклов ввода-вывода и меньшее количество циклов ЦП для фактической обработки данных. Это стандартная методика, применяемая в традиционных системах СУБД / MPP для достижения производительности, но она становится гораздо более важной в контексте больших данных из-за больших объемов данных. Чтобы использовать сокращение разделов как часть запросов, данные должны быть организованы и разделены соответствующим образом на основе шаблонов запросов, которые вы ожидаете получить от пользователей.

Организация данных может выполняться во время приема или впоследствии как этап обработки с использованием различных экосистемных инструментов Hadoop, таких как Flume, Hive, Pig, или путем прямого приема через NFS, в случае MapR. Drill поддерживает удаление разделов с помощью различных типов плагинов для хранения. Сокращение разделов применяется при запросах файловых систем, основанных на структуре каталогов файлов и использовании информации о разделах таблиц метаданных Hive при запросах таблиц Hive. Сама Drill предоставляет возможность создавать секционированные данные как часть синтаксиса CREATE TABLE AS.

Вот пример разделения данных с использованием синтаксиса Drill SQL. Этот оператор преобразует образец набора данных JSON для бизнеса Yelp (который можно загрузить из Yelp) в формат Parquet. В рамках преобразования данные также разбиваются на три столбца: штат, город и звезды.

1
0: jdbc:drill:zk=local> create table dfs.tmp.businessparquet partition by (state,city,stars) as select state, city, stars, business_id, full_address, hours,name, review_count from `business.json`;

Результатом вышеприведенного оператора являются данные Parquet, которые были созданы в каталоге, соответствующем указанному рабочему пространству. В этом случае рабочее пространство dfs.tmp указывает на местоположение / tmp в файловой системе, а созданный каталог — / tmp / businessparquet, который представляет собой имя таблицы, указанное в предложении SQL.

Получим количество файлов, сгенерированных командой CTAS.

1
2
3
4
NRentachintala-MAC:businessparquet nrentachintala$ cd /tmp/businessparquet/
 
NRentachintala-MAC:businessparquet nrentachintala$ ls -l |wc -l
     652

Обратите внимание, что количество файлов, сгенерированных командой Drill CTAS, может быть настроено с различными параметрами в Drill; однако значение по умолчанию соответствует количеству различных комбинаций, которые будут иметь столбцы ключей разделов, указанные в CTAS. Например, следующий оператор SQL дает вам количество различных комбинаций столбцов ключей раздела.

1
2
3
4
5
6
0: jdbc:drill:zk=local> select count(*) from (select distinct state, city, stars from dfs.yelp.`business.json`) ;
+---------+
| EXPR$0  |
+---------+
| 652     |
+---------+

Теперь, когда данные Parquet разделены, запросы, поступающие с фильтрами по столбцам раздела (штат, город, звезды), могут использовать оптимизацию сокращения раздела; с диска считываются только соответствующие данные, а остальные разделы удаляются во время планирования.

Вы можете легко проверить, применяется ли сокращение раздела для данного запроса, запустив команду EXPLAIN PLAN для запроса или просмотрев профили в веб-интерфейсе Drill (который можно запустить из порта 8047 с узла Drillbit).

Давайте возьмем пару примеров запросов и посмотрим, применяется ли отсечение разделов с помощью веб-интерфейса.

Вот один запрос с фильтрами по двум столбцам раздела — штат и город.

01
02
03
04
05
06
07
08
09
10
11
12
0: jdbc:drill:zk=local> select name, city, stars from dfs.tmp.businessparquet where state='AZ' and city = 'Fountain Hills' limit 5;
 
+-----------------------------------------------+-----------------+--------+
|                     name                      |      city       | stars  |
+-----------------------------------------------+-----------------+--------+
| Fry's Food & Drug Stores                      | Fountain Hills  | 2.0    |
| Burger King                                   | Fountain Hills  | 2.0    |
| Francis & Sons Car Wash                       | Fountain Hills  | 2.0    |
| Kimmies                                       | Fountain Hills  | 2.0    |
| Le Baron Cleaners At Basha's Shopping Center  | Fountain Hills  | 3.5    |
+-----------------------------------------------+-----------------+--------+
5 rows selected (0.308 seconds)

Физический план запроса выглядит следующим образом в веб-интерфейсе для этого запроса. Обратите внимание на выделенное значение ‘numFiles’ в профиле. Это показывает, сколько файлов считывается с диска для обслуживания запроса. В этом случае 9 файлов из 652 считываются, потому что запрос применяет фильтры к столбцам штата и города, которые являются ключами разделов, и удаляет оставшиеся разделы данных. Проверка количества прочитанных файлов — это простой способ убедиться, что раздел применяется.

1
2
3
4
5
6
7
8
00-00    Screen : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {129.5 rows, 501.5 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 731
00-01      Project(name=[$0], city=[$1], stars=[$2]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {129.0 rows, 501.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 730
00-02        SelectionVectorRemover : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {129.0 rows, 501.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 729
00-03          Limit(fetch=[5]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {124.0 rows, 496.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 728
00-04            Limit(fetch=[5]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {119.0 rows, 476.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 727
00-05              Project(name=[$2], city=[$1], stars=[$3]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 114.0, cumulative cost = {114.0 rows, 456.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 726
00-06                Project(state=[$1], city=[$2], name=[$0], stars=[$3]) : rowType = RecordType(ANY state, ANY city, ANY name, ANY stars): rowcount = 114.0, cumulative cost = {114.0 rows, 456.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 725
00-07                  Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tmp/businessparquet/0_0_111.parquet], ReadEntryWithPath [path=/tmp/businessparquet/0_0_114.parquet], ReadEntryWithPath [path=/tmp/businessparquet/0_0_115.parquet], ReadEntryWithPath [path=/tmp/businessparquet/0_0_110.parquet], ReadEntryWithPath [path=/tmp/businessparquet/0_0_109.parquet], ReadEntryWithPath [path=/tmp/businessparquet/0_0_113.parquet], ReadEntryWithPath [path=/tmp/businessparquet/0_0_116.parquet], ReadEntryWithPath [path=/tmp/businessparquet/0_0_117.parquet], ReadEntryWithPath [path=/tmp/businessparquet/0_0_112.parquet]], selectionRoot=file:/tmp/businessparquet, numFiles=9, usedMetadataFile=false, columns=[`state`, `city`, `name`, `stars`]]]) : rowType = RecordType(ANY name, ANY state, ANY city, ANY stars): rowcount = 114.0, cumulative cost = {114.0 rows, 456.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 724

Теперь давайте расширим этот запрос, добавив еще один фильтр со столбцом звезд, который также является ключом раздела.

01
02
03
04
05
06
07
08
09
10
11
0: jdbc:drill:zk=local> select name, city, stars from dfs.tmp.businessparquet where state='AZ' and city = 'Fountain Hills' and stars= '3.5' limit 5;
+-----------------------------------------------+-----------------+--------+
|                     name                      |      city       | stars  |
+-----------------------------------------------+-----------------+--------+
| Le Baron Cleaners At Basha's Shopping Center  | Fountain Hills  | 3.5    |
| Euro Pizza Cafe                               | Fountain Hills  | 3.5    |
| Deluxe Nail & Spa                             | Fountain Hills  | 3.5    |
| Ha Ha China                                   | Fountain Hills  | 3.5    |
| Pony Express                                  | Fountain Hills  | 3.5    |
+-----------------------------------------------+-----------------+--------+
5 rows selected (0.342 seconds)

Обратите внимание, что физический план для этого запроса, как показано ниже, показывает «numFiles» как 1. Так что Drill должен был прочитать только 1 из 652 файлов, чтобы ответить на запрос. Чем больше фильтров на основе разделов у вас есть в запросе, тем больше запрос может быть направлен на очень конкретное подмножество данных. Это может привести к огромным улучшениям производительности. Обратите внимание, однако, что ваш запрос может быть чрезвычайно сложным, и в этом случае выигрыш в производительности, полученный при удалении разделов, может быть несопоставим со стоимостью обработки запроса. Однако в большинстве простых и средних запросов это будет очень полезно. Кроме того, наиболее важным аспектом использования сокращения разделов является определение общих шаблонов запросов и разделение данных соответствующим образом. Потратьте некоторое время на это, чтобы настроить развертывание.

1
2
3
4
5
6
7
00-00    Screen : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {40.5 rows, 145.5 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1005
00-01      Project(name=[$0], city=[$1], stars=[$2]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {40.0 rows, 145.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1004
00-02        SelectionVectorRemover : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {40.0 rows, 145.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1003
00-03          Limit(fetch=[5]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {35.0 rows, 140.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1002
00-04            Project(name=[$3], city=[$1], stars=[$2]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 30.0, cumulative cost = {30.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1001
00-05              Project(state=[$1], city=[$2], stars=[$3], name=[$0]) : rowType = RecordType(ANY state, ANY city, ANY stars, ANY name): rowcount = 30.0, cumulative cost = {30.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1000
00-06                Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tmp/businessparquet/0_0_114.parquet]], selectionRoot=file:/tmp/businessparquet, numFiles=1, usedMetadataFile=false, columns=[`state`, `city`, `stars`, `name`]]]) : rowType = RecordType(ANY name, ANY state, ANY city, ANY stars): rowcount = 30.0, cumulative cost = {30.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 999

Кэширование метаданных паркета

Другая общая характеристика развертываний Hadoop — это количество файлов в файловой системе. Мы видели, как клиенты используют Drill для запроса от сотен тысяч до миллионов файлов, как для отчетов, так и для случаев использования ETL. Одной из отличительных возможностей Drill является его способность работать с форматами данных с самоописанием, такими как Parquet, и обнаруживать схему на лету. Parquet хранит метаданные о данных как часть нижнего колонтитула файла и включает в себя такую ​​информацию, как имена столбцов, типы данных, обнуляемость и другие характеристики столбцов, а также параметры в макете данных, такие как размер группы строк. Эта информация используется Drill как часть времени планирования. Хотя Drill имеет возможность обнаруживать эти метаданные во время запроса, это может быть дорогостоящей операцией для случаев использования, когда существует много файлов. Начиная с Drill 1.2, мы представили возможность кэшировать метаданные Parquet в Drill. После того, как метаданные кэшируются, они могут обновляться по мере необходимости, в зависимости от того, как часто наборы данных изменяются в среде.

Ниже приведена команда для использования метаданных кэша. Команду можно использовать для папки или отдельного файла.

1
2
3
4
5
6
7
0: jdbc:drill:zk=local> REFRESH TABLE METADATA dfs.tmp.BusinessParquet;
+-------+-----------------------------------------------------------+
|  ok   |                          summary                          |
+-------+-----------------------------------------------------------+
| true  | Successfully updated metadata for table BusinessParquet.  |
+-------+-----------------------------------------------------------+
1 row selected (0.455 seconds)

Профиль запроса в веб-интерфейсе пользователя или команда «Объяснить план» показывает, используется ли кеш метаданных для данного запроса.

1
0: jdbc:drill:zk=local> select name, city, stars from dfs.tmp.businessparquet where state='AZ' and city = 'Fountain Hills' and stars= '3.5' limit 5;

Обратите внимание, что выделенный «usedMetadataCacheFile = true» в следующем профиле означает, что для этой команды используется кэширование метаданных.

1
2
3
4
5
6
7
00-00    Screen : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {40.5 rows, 145.5 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1279
00-01      Project(name=[$0], city=[$1], stars=[$2]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {40.0 rows, 145.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1278
00-02        SelectionVectorRemover : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {40.0 rows, 145.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1277
00-03          Limit(fetch=[5]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 5.0, cumulative cost = {35.0 rows, 140.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1276
00-04            Project(name=[$3], city=[$1], stars=[$2]) : rowType = RecordType(ANY name, ANY city, ANY stars): rowcount = 30.0, cumulative cost = {30.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1275
00-05              Project(state=[$1], city=[$2], stars=[$3], name=[$0]) : rowType = RecordType(ANY state, ANY city, ANY stars, ANY name): rowcount = 30.0, cumulative cost = {30.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1274
00-06                Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=/tmp/BusinessParquet/0_0_114.parquet]], selectionRoot=/tmp/BusinessParquet, numFiles=1, usedMetadataFile=true, columns=[`state`, `city`, `stars`, `name`]]]) : rowType = RecordType(ANY name, ANY state, ANY city, ANY stars): rowcount = 30.0, cumulative cost = {30.0 rows, 120.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1273

Комбинация сокращения разделов и кэширования метаданных может привести к значительному повышению производительности для различных запросов, особенно в случае особых случаев использования запросов / отчетов. Мы предоставим более подробную информацию об этих оптимизациях, а также ряд других функций и рекомендаций по производительности Drill в последующих публикациях в блоге.

Более подробную информацию и документацию по функциям Drill 1.4 можно найти в документах MapR и Drill . Поздравляем сообщество Drill с очередной ключевой вехой. Удачного Бурения!

Вот несколько способов начать работу с Drill: