Статьи

Spark, Parquet и S3 — это сложно

(Версия этого поста была первоначально размещена в блоге AppsFlyer . Также выражаем особую благодарность Морри Фельдману и Майклу Спектору из группы данных AppsFlyer, которые выполнили большую часть работы по решению проблем, обсуждаемых в этой статье)

TL; DR; Комбинация Spark, Parquet и S3 (& Mesos) является мощной, гибкой и экономически эффективной аналитической платформой (и, кстати, альтернативой Hadoop). Однако сделать все эти технологии идеальными для совместной игры — непростая задача. В этом посте описываются проблемы, с которыми мы (AppsFlyer) столкнулись при создании нашей аналитической платформы на этих технологиях, и шаги, которые мы предприняли, чтобы смягчить их и заставить все это работать.

Spark становится ведущей альтернативой Map / Reduce по нескольким причинам, включая широкое применение различными дистрибутивами Hadoop, объединение пакетного и потокового обмена на одной платформе и растущую библиотеку интеграции машинного обучения (оба с точки зрения включенных алгоритмов). и интеграция с языками машинного обучения, а именно R и Python). В AppsFlyer мы уже некоторое время используем Spark в качестве основной платформы для ETL (извлечение, преобразование и загрузка) и аналитики. Недавним примером является новая версия нашего отчета о сохранении, которую мы недавно выпустили, в которой Spark использовал несколько потоков данных (> 1 ТБ в день) с ETL (главным образом очистка данных) и аналитикой (ступенька к полному обнаружению мошенничества с кликами) подготовить отчет.

пара

Одним из основных изменений, которые мы представили в этом отчете, является переход от создания файлов Sequence к использованию файлов Parquet . Parquet — это столбчатый формат данных, который, вероятно, является наилучшим на сегодняшний день вариантом для хранения долгосрочных больших данных в аналитических целях (если только вы не вложили значительные средства в Hive, где Orc — более подходящий формат). Преимущества файлов Parquet и Sequence — это производительность и сжатие, не теряя преимущества широкой поддержки инструментов с большими данными (Spark, Hive, Drill, Tajo, Presto и т. Д.).

Одним из относительно уникальных аспектов нашей инфраструктуры для больших данных является то, что мы не используем Hadoop (возможно, это тема отдельного поста). Мы используем Mesos в качестве менеджера ресурсов вместо YARN, и мы используем Amazon S3 вместо HDFS в качестве решения распределенного хранения. HDFS имеет несколько преимуществ по сравнению с S3, однако цена / выгода для поддержки длительно работающих кластеров HDFS в AWS по сравнению с использованием S3 огромна в пользу S3.

Тем не менее, сочетание Spark, Parquet и S3 поставило перед нами несколько задач, и в этом посте будут перечислены основные из них и решения, которые мы придумали, чтобы справиться с ними.

Паркет и Искра

Паркет и Спарк, похоже, уже давно были в любовно-ненавистных отношениях. С одной стороны, документация Spark рекламирует Parquet как один из лучших форматов для анализа больших данных (так оно и есть), а с другой стороны, поддержка Parquet в Spark является неполной и раздражающей в использовании. Вещи, безусловно, движутся в правильном направлении, но есть еще несколько причуд и подводных камней, на которые стоит обратить внимание.

Начнем с позитивной ноты: интеграция Spark и Parquet прошла долгий путь за последние несколько месяцев. Раньше нужно было прыгать через обручи, чтобы иметь возможность конвертировать существующие данные в паркет . Введение DataFrames в Spark значительно упростило этот процесс. Когда формат ввода поддерживается API DataFrame, например, вводом является JSON (встроенный) или Avro (который еще не встроен в Spark, но вы можете использовать библиотеку для его чтения), конвертация в Parquet — это всего лишь вопрос чтение входного формата с одной стороны и сохранение его как паркет с другой. Рассмотрим, например, следующий фрагмент в Scala:

1
2
3
4
val inputPath = "../data/json"
val outputPath = "../data/parquet"
val data = sqlContext.read.json(inputPath)
date.write.parquet(outputPath)

Даже когда вы обрабатываете формат, в котором схема не является частью данных, процесс преобразования довольно прост, поскольку Spark позволяет вам указать схему программным способом. Документация Spark довольно проста и содержит примеры на Scala, Java и Python . Кроме того, не слишком сложно определить схемы на других языках. Например, здесь (AppsFlyer) мы используем Clojure в качестве основного языка разработки, поэтому для этого мы разработали несколько вспомогательных функций. Пример кода ниже предоставляет детали:

Первое — это извлечь данные из любой имеющейся у нас структуры и указать схему, которая нам нравится. Приведенный ниже код берет запись события и извлекает из нее различные точки данных в вектор в форме [: значение столбца_обязательно_данные_данных_типа]. Обратите внимание, что тип данных является необязательным, поскольку по умолчанию он является строкой, если не указан.

01
02
03
04
05
06
07
08
09
10
(defn record-builder
  [event-record]
  (let [..
        raw-device-params (extract event-record "raw_device_params")
        result [...
                [:operator (get raw-device-params "operator")]
                [:model (get raw-device-params "model")]
                ...
                [:launch_counter counter DataTypes/LongType]]]
  result))

Следующим шагом является использование вышеупомянутой структуры для извлечения схемы и преобразования в строки DataFrame:

01
02
03
04
05
06
07
08
09
10
11
12
13
(defn extract-dataframe-schema
  [rec]
  (let [fields (reduce (fn [lst schema-line]
                         (let [k (first schema-line)
                               t (if (= (count schema-line) 3) (last schema-line) DataTypes/StringType) ]
                           (conj lst (DataTypes/createStructField (name k) t NULLABLE)))) [] rec)
        arr (ArrayList. fields)]
    (DataTypes/createStructType arr)))
 
(defn as-rows
  [rec]
  (let [values (object-array (reduce (fn [lst v] (conj lst v)) [] rec))]
    (RowFactory/create values)))

Наконец, мы применяем эти функции к СДР, конвертируем их во фрейм данных и сохраняем как паркет:

1
2
3
4
5
6
7
8
(let [..
     schema (trans/extract-dataframe-schema (record-builder nil))
     ..
     rdd (spark/map record-builder some-rdd-we-have)
     rows (spark/map trans/as-rows rdd)
     dataframe (spark/create-data-frame sql-context rows schema)
    ]
(spark/save-parquert dataframe output-path :overwrite))

Как упомянуто выше, для Паркет и Спарк дела идут хорошо, но дорога еще не ясна. Вот некоторые из проблем, с которыми мы столкнулись:

  • критическая ошибка в выпуске 1.4, когда состояние гонки при записи файлов паркета приводило к огромной потере данных на заданиях (эта ошибка исправлена ​​в 1.4.1 — так что если вы используете Spark 1.4 и обновление паркета вчера!)
  • Оптимизация фильтра при нажатии кнопки , которая по умолчанию отключена, поскольку Spark по-прежнему использует Parquet 1.6.0rc3, даже если 1.6.0 уже давно отсутствует (кажется, что Spark 1.5 будет использовать паркет 1.7.0, поэтому проблема будет решена)
  • Parquet не «изначально» поддерживается в Spark, вместо этого Spark полагается на поддержку Hadoop для формата паркета — это само по себе не проблема, но для нас это вызвало серьезные проблемы с производительностью, когда мы пытались использовать Spark и Parquet с S3 — подробнее об этом в следующем разделе

Паркет, Искра & S3

Amazon S3 (Simple Storage Services) — это решение для хранения объектов, которое относительно дешево в использовании. У него есть несколько недостатков по сравнению с «настоящей» файловой системой; основной из них является возможная согласованность, то есть изменения, сделанные одним процессом, не сразу видны другим приложениям. (Если вы используете EMR Amazon, вы можете использовать EMRFS «согласованное представление», чтобы преодолеть это.) Однако, если вы понимаете это ограничение, S3 по-прежнему является жизнеспособным источником ввода и вывода, по крайней мере для пакетных заданий.

Как упоминалось выше, Spark не имеет встроенной реализации S3 и использует классы Hadoop для абстрагирования доступа к данным в Parquet. Hadoop предоставляет 3 клиента файловой системы для S3:

  • Блочная файловая система S3 (схема URI вида «s3: // ..») который не работает со Spark
  • Собственная файловая система S3 (URI «s3n: // ..») — загрузите дистрибутив Spark, поддерживающий Hadoop 2. * и выше, если хотите использовать это (tl; dr — нет)
  • S3a — замена для S3n, которая снимает некоторые ограничения и проблемы S3n. Загрузите поддержку «Spark with Hadoop 2.6 и выше», чтобы использовать ее (tl; dr — вы хотите это, но перед тем, как ее можно будет использовать, нужно немного поработать)

Когда мы использовали Spark 1.3, мы столкнулись со многими проблемами, когда пытались использовать S3, поэтому мы начали использовать s3n — который работал по большей части, то есть мы выполняли и выполняли задания, но многие из них терпели неудачу с различным таймаутом чтения и неизвестным хостом. исключения. Глядя на задачи в рабочих местах, картина была еще более мрачной с высоким процентом отказов, что подтолкнуло нас к увеличению таймаутов и повторений до смешных уровней. Когда мы перешли на Spark 1.4.1, мы попробовали s3a. На этот раз мы заставили его работать. Первое, что нам нужно было сделать, это установить и spark.executor.extraClassPath, и spark.executor.extraDriverPath так, чтобы они указывали на jar-файлы aws-java-sdk и hadoop-aws, поскольку, по- видимому, оба отсутствуют в «Spark с Hadoop 2.6». построить . Естественно, мы использовали версию 2.6 для этих файлов, но затем мы столкнулись с этой маленькой проблемой . В реализации Hadoop 2.6 AWS есть ошибка, которая приводит к неожиданному разделению файлов S3 (например, 400 файловых заданий, выполненных с 18 миллионами задач), к счастью, используя Jar Hadoop AWS до версии 2.7.0 вместо версии 2.6, решил эту проблему. Итак, при этом все префиксы s3a работают без заминок (и обеспечивают лучшую производительность, чем s3n).

Поиск подходящей библиотеки S3 Hadoop способствует стабильности наших заданий, но независимо от того, какая библиотека S3 (s3n или s3a), производительность заданий Spark, использующих файлы Parquet, все еще была ужасной. Если посмотреть на пользовательский интерфейс Spark, то фактическая работа с данными казалась вполне разумной, но Spark потратил огромное количество времени, прежде чем фактически начать работу и после того, как работа была «завершена», прежде чем она фактически прекратилась. Нам нравится называть это явление «Паркетный налог».

Очевидно, что мы не могли жить с «Паркетным налогом», поэтому мы изучили файлы журналов наших работ и обнаружили несколько проблем. Этот первый имеет отношение ко времени запуска рабочих мест Parquet. Люди, создавшие Spark, понимали, что схема со временем может эволюционировать и предоставляет замечательную функцию для DataFrames, которая называется «объединение схем». Если вы посмотрите на схему в большом озере / водохранилище данных (или как оно называется сегодня), вы определенно можете ожидать, что схема со временем будет развиваться. Однако, если вы посмотрите на каталог, который является результатом одного задания, в схеме нет различий … Оказывается, что когда Spark инициализирует задание, он читает нижние колонтитулы всех файлов Parquet для выполнения слияния схемы. Вся эта работа выполняется с помощью драйвера, прежде чем какие-либо задачи будут переданы исполнителю, и может занять много минут, даже часов (например, у нас есть задания, которые возвращаются к полугодовым данным установки). Это не задокументировано, но, глядя на код Spark, вы можете переопределить это поведение, указав mergeSchema как false:

В Скала:

1
val file = sqx.read.option("mergeSchema", "false").parquet(path)

и в Clojure:

1
2
3
4
5
(-> ^SQLContext sqtx
    (.read)
    (.format "parquet")
    (.options (java.util.HashMap. {"mergeSchema" "false" "path" path}))
    (.load))

Обратите внимание, что это не работает в Spark 1.3. В Spark 1.4 он работает как положено, а в Spark 1.4.1 он заставляет Spark только смотреть на файл _common_metadata, который не является концом света, поскольку это небольшой файл, и в каждом каталоге есть только один из них. Тем не менее, это подводит нас к другому аспекту «налога на паркет» — задержкам с окончанием работы.

Отключение схемы слияния и управления схемой, используемой Spark, помогло сократить время запуска задания, но, как уже упоминалось, мы по-прежнему страдали от длительных задержек в конце задания. Мы уже знали об одной проблеме, связанной с Hadoop <-> S3, при использовании текстовых файлов. Hadoop, будучи неизменным, сначала записывает файлы во временный каталог, а затем копирует их. С S3 это не проблема, но операция копирования очень и очень дорогая. С помощью текстовых файлов DataBricks создала DirectOutputCommitter (вероятно, для своего предложения Spark SaaS). Замена выходного коммиттера для текстовых файлов довольно проста — вам просто нужно установить «spark.hadoop.mapred.output.committer.class» в конфигурации Spark, например:

1
(spark-conf/set "spark.hadoop.mapred.output.committer.class" "com.appsflyer.spark.DirectOutputCommitter")

Аналогичное решение существует для Parquet, и в отличие от решения для текстовых файлов, оно даже является частью дистрибутива Spark. Однако, чтобы усложнить задачу, вы должны настроить ее в конфигурации Hadoop, а не в конфигурации Spark. Чтобы получить конфигурацию Hadoop, сначала необходимо создать контекст Spark из конфигурации Spark, вызвать для него hadoopConfiguration, а затем установить «spark.sql.parquet.output.committer.class» следующим образом:

1
2
3
(let [ctx (spark/spark-context conf)
      hadoop-conf (.hadoopConfiguration ^JavaSparkContext ctx)]
     (.set hadoop-conf "spark.sql.parquet.output.committer.class" "org.apache.spark.sql.parquet.DirectParquetOutputCommitter"))

Использование DirectParquetOutputCommitter значительно снизило «налог на паркет», но мы все же обнаружили, что для выполнения некоторых работ требуется очень много времени. Опять же, проблема заключалась в предположениях файловой системы Spark и Hadoop, которые были виновниками. Помните, что «_common_metadata» Spark просматривает начало задания — хорошо, Spark тратит много времени в конце задания, создавая как этот файл, так и дополнительный файл MetaData с дополнительной информацией из файлов, находящихся в каталоге. Опять же, все это делается из одного места (водителя), а не обрабатывается исполнителями. Когда работа приводит к маленьким файлам (даже если их несколько тысяч), процесс занимает разумное время. Однако, когда работа приводит к большим файлам (например, когда мы принимаем полный день запуска приложений), это занимает более часа. Как и в случае с mergeSchema, решение состоит в том, чтобы управлять метаданными вручную, поэтому мы устанавливаем для «parquet.enable.summary-metadata» значение false (снова в конфигурации Hadoop и сами генерируем файл _common_metadata (для больших заданий).

Подводя итог, можно сказать, что Parquet и особенно Spark находятся в стадии разработки — создание передовых технологий для вас может быть сложной задачей и потребовать много усилий. Временами документация далека от совершенства, но, к счастью, все соответствующие технологии имеют открытый исходный код (даже Amazon SDK ), поэтому вы всегда можете погрузиться в отчеты об ошибках, код и т . Д. понять, как все работает на самом деле и найти решения, которые вам нужны. Кроме того, время от времени вы можете найти статьи и сообщения в блогах, которые объясняют, как преодолеть общие проблемы в технологиях, которые вы используете. Я надеюсь, что этот пост устраняет некоторые сложности интеграции Spark, Parquet и S3, которые, в конце концов, представляют собой все великолепные технологии с большим потенциалом.

Ссылка: Spark, Parquet и S3 — это сложно от нашего партнера JCG Арнона Ротема Гал Оз в блоге Cirrus Minor .