Статьи

Объединяя Spark, Parquet и S3 как альтернативу Hadoop

TL; DR; Комбинация Spark, Parquet и S3 (& Mesos) представляет собой мощную, гибкую и экономически эффективную аналитическую платформу (и, кстати, альтернативу Hadoop). Однако сделать все эти технологии идеальными для совместной игры — непростая задача. В этом посте описываются проблемы, с которыми мы (AppsFlyer) столкнулись при создании нашей аналитической платформы на этих технологиях, и шаги, которые мы предприняли, чтобы смягчить их и заставить все это работать.

Spark становится ведущей альтернативой Map / Reduce по нескольким причинам, включая широкое применение различными дистрибутивами Hadoop, объединение пакетного и потокового обмена на одной платформе и растущую библиотеку интеграции машинного обучения (оба с точки зрения включенных алгоритмов). и интеграция с языками машинного обучения, а именно R и Python). В AppsFlyer мы уже некоторое время используем Spark в качестве основной платформы для ETL (Extract, Transform & Load) и аналитики. Недавним примером является новая версия нашего отчета  о хранении, которую мы недавно выпустили, в которой Spark использовал несколько потоков данных (> 1 ТБ в день) с ETL (главным образом очистка данных) и аналитикой (ступенькой к полному обнаружению мошенничества с кликами). подготовить отчет.

Одним из основных изменений, которые мы внесли в этот отчет, является переход от  использования файлов Sequence к использованию файлов Parquet . Parquet — это столбчатый формат данных, который, вероятно, является наилучшим на сегодняшний день вариантом для хранения долгосрочных больших данных в аналитических целях (если только вы не вложили значительные средства в Hive, где Orc является более подходящим форматом). Преимущества файлов Parquet и Sequence — это производительность и сжатие, не теряя преимущества широкой поддержки инструментов с большими данными (Spark, Hive, Drill, Tajo, Presto и т. Д.).

Одним из относительно уникальных аспектов нашей инфраструктуры для больших данных является то, что мы не используем Hadoop (возможно, это тема отдельного поста). Мы используем Mesos в качестве менеджера ресурсов вместо YARN, и мы используем Amazon S3 вместо HDFS в качестве решения распределенного хранения. HDFS имеет несколько преимуществ по сравнению с S3, однако цена / выгода для поддержки длительно работающих кластеров HDFS в AWS по сравнению с использованием S3 огромна в пользу S3.

Тем не менее, сочетание Spark, Parquet и S3 поставило перед нами несколько проблем, и в этом посте будут перечислены основные из них и решения, которые мы придумали для их решения.

Паркет и Искра

Паркет и Спарк, похоже, уже давно были в любовно-ненавистных отношениях. С одной стороны, документация Spark рекламирует Parquet как один из лучших форматов для анализа больших данных (так оно и есть), а с другой стороны, поддержка Parquet в Spark является неполной и раздражающей в использовании. Вещи, безусловно, движутся в правильном направлении, но есть еще несколько причуд и подводных камней, на которые стоит обратить внимание.

Начнем с позитивной ноты: интеграция Spark и Parquet прошла долгий путь за последние несколько месяцев. Раньше нужно было прыгать через обручи, чтобы иметь возможность конвертировать существующие данные в паркет . Введение DataFrames в Spark значительно упростило этот процесс. Когда формат ввода поддерживается API DataFrame, например, вводом является JSON (встроенный) или Avro (который еще не встроен в Spark, но вы  можете использовать библиотеку для его чтения), конвертация в Parquet — это всего лишь вопрос чтение входного формата с одной стороны и сохранение его как паркет с другой. Рассмотрим, например, следующий фрагмент в Scala:

val inputPath = "../data/json"
val outputPath = "../data/parquet"
val data = sqlContext.read.json(inputPath)
date.write.parquet(outputPath)

Даже когда вы обрабатываете формат, в котором схема не является частью данных, процесс преобразования довольно прост, поскольку Spark позволяет вам указать схему программным способом. Документация Spark довольно проста и содержит примеры на Scala, Java и Python . Кроме того, не слишком сложно определить схемы на других языках. Например, здесь (AppsFlyer) мы используем Clojure в качестве основного языка разработки, поэтому для этого мы разработали несколько вспомогательных функций. Пример кода ниже предоставляет детали:

Первое — это извлечь данные из любой имеющейся у нас структуры и указать схему, которая нам нравится. Приведенный ниже код берет запись события и извлекает из нее различные точки данных в вектор в форме [: значение столбца_обязательно_данные_данных_типа]. Обратите внимание, что тип данных является необязательным, поскольку по умолчанию он является строкой, если не указан.

(defn record-builder
  [event-record]
  (let [..
        raw-device-params (extract event-record "raw_device_params")
        result [...
                [:operator (get raw-device-params "operator")]
                [:model (get raw-device-params "model")]
                ...
                [:launch_counter counter DataTypes/LongType]]]
  result))

Следующим шагом является использование вышеупомянутой структуры для извлечения схемы и преобразования в строки DataFrame:

(defn extract-dataframe-schema
  [rec]
  (let [fields (reduce (fn [lst schema-line]
                         (let [k (first schema-line)
                               t (if (= (count schema-line) 3) (last schema-line) DataTypes/StringType) ]
                           (conj lst (DataTypes/createStructField (name k) t NULLABLE)))) [] rec)
        arr (ArrayList. fields)]
    (DataTypes/createStructType arr)))

(defn as-rows
  [rec]
  (let [values (object-array (reduce (fn [lst v] (conj lst v)) [] rec))]
    (RowFactory/create values)))

Наконец, мы применяем эти функции к СДР, конвертируем их во фрейм данных и сохраняем как паркет:

(let [..
     schema (trans/extract-dataframe-schema (record-builder nil))
     ..
     rdd (spark/map record-builder some-rdd-we-have)
     rows (spark/map trans/as-rows rdd)
     dataframe (spark/create-data-frame sql-context rows schema)
    ]
(spark/save-parquert dataframe output-path :overwrite))

Как упомянуто выше, для Паркет и Спарк дела идут хорошо, но дорога еще не ясна. Вот некоторые из проблем, с которыми мы столкнулись:

  • критическая ошибка в выпуске 1.4, когда состояние гонки при записи файлов паркета  приводило к огромной потере данных на заданиях (эта ошибка исправлена ​​в 1.4.1 — так что если вы используете Spark 1.4 и обновление паркета вчера!)
  • Оптимизация фильтра нажатием, которая по умолчанию отключена, поскольку Spark по-прежнему использует Parquet 1.6.0rc3, даже если 1.6.0 уже давно отсутствует (кажется, Spark 1.5 будет использовать паркет 1.7.0, поэтому проблема будет решена)
  • Parquet не «изначально» поддерживается в Spark, вместо этого Spark полагается на поддержку Hadoop для формата паркета — это само по себе не проблема, но для нас это вызвало серьезные проблемы с производительностью, когда мы пытались использовать Spark и Parquet с S3 — подробнее об этом в следующем разделе

Паркет, Искра & S3

Amazon S3 (Simple Storage Services) — это решение для хранения объектов, которое относительно дешево в использовании. У него есть несколько недостатков по сравнению с «настоящей» файловой системой; основной из них является возможная согласованность, то есть изменения, сделанные одним процессом, не сразу видны другим приложениям. (Если вы используете EMR Amazon, вы можете использовать EMRFS «согласованное представление», чтобы преодолеть это.) Однако, если вы понимаете это ограничение, S3 все еще является жизнеспособным источником ввода и вывода, по крайней мере для пакетных заданий.

Как упоминалось выше, Spark не имеет встроенной реализации S3 и использует классы Hadoop для абстрагирования доступа к данным в Parquet. Hadoop предоставляет 3 клиента файловой системы для S3:

  • Блочная файловая система S3 (схема URI вида «s3: // ..»), которая, похоже, не работает с Spark
  • Собственная файловая система S3 (URI «s3n: // ..») — загрузите дистрибутив Spark, поддерживающий Hadoop 2. * и выше, если вы хотите использовать это (tl; dr — нет)
  • S3a — замена для S3n, которая снимает некоторые ограничения и проблемы S3n. Загрузите поддержку «Spark with Hadoop 2.6 и выше», чтобы использовать ее (tl; dr — вы хотите это, но перед тем, как ее можно будет использовать, нужно немного поработать)

Когда мы использовали Spark 1.3, мы столкнулись со многими проблемами, когда пытались использовать S3, поэтому мы начали использовать s3n — который работал по большей части, то есть у нас выполнялись и выполнялись задания, но многие из них терпели неудачу с различным временем ожидания чтения и неизвестным хостом. исключения. Глядя на задачи в рабочих местах, картина была еще более мрачной с высоким процентом отказов, что подтолкнуло нас к увеличению тайм-аутов и повторений до смешных уровней. Когда мы перешли на Spark 1.4.1, мы попробовали s3a. На этот раз мы заставили его работать. Первое, что нам нужно было сделать, это установить и spark.executor.extraClassPath, и spark.executor.extraDriverPath так, чтобы они указывали на jar-файлы aws-java-sdk и hadoop-aws, поскольку, по-  видимому, оба отсутствуют в «Spark с Hadoop 2.6». строить, Естественно, мы использовали версию 2.6 для этих файлов, но затем мы столкнулись с этой маленькой проблемой . В реализации Hadoop 2.6 AWS есть ошибка, которая приводит к неожиданному разделению файлов S3 (например, 400 файловых заданий, выполненных с 18 миллионами задач), к счастью, используя   Jar Hadoop AWS до версии 2.7.0  вместо версии 2.6, решил эту проблему. Итак, при этом все префиксы s3a работают без заминок (и обеспечивают лучшую производительность, чем s3n).

Поиск подходящей библиотеки S3 Hadoop способствует стабильности наших заданий, но независимо от того, какая библиотека S3 (s3n или s3a), производительность заданий Spark, использующих файлы Parquet, все еще была ужасной. Если посмотреть на пользовательский интерфейс Spark, то фактическая работа с данными казалась вполне разумной, но Spark потратил огромное количество времени, прежде чем фактически начать работу и после того, как работа была «завершена», прежде чем она фактически прекратилась. Нам нравится называть это явление «Паркетный налог».

Очевидно, что мы не могли жить с «Паркетным налогом», поэтому мы изучили файлы журналов наших работ и обнаружили несколько проблем. Этот первый имеет отношение ко времени запуска рабочих мест Parquet. Люди, создавшие Spark, понимали, что схема со временем может эволюционировать и предоставляет замечательную функцию для DataFrames, которая называется «объединение схем».Если вы посмотрите на схему в большом озере / водохранилище данных (или как оно называется сегодня), вы определенно можете ожидать, что схема со временем будет развиваться. Однако, если вы посмотрите на каталог, который является результатом одного задания, в схеме нет разницы … Оказывается, что когда Spark инициализирует задание, он читает нижние колонтитулы всех файлов Parquet для выполнения слияния схемы. Вся эта работа выполняется драйвером, прежде чем какие-либо задачи будут переданы исполнителю, и это может занять долгие минуты, даже часы (например, у нас есть задания, которые возвращают данные об установке за полгода). Это не задокументировано, но, глядя на код Spark, вы можете переопределить это поведение, указав mergeSchema как false:

В Скала:

val file = sqx.read.option("mergeSchema", "false").parquet(path) 

и в Clojure:

(-> ^SQLContext sqtx
    (.read)
    (.format "parquet")
    (.options (java.util.HashMap. {"mergeSchema" "false" "path" path}))
    (.load))

Обратите внимание, что это не работает в Spark 1.3. В Spark 1.4 он работает как положено, а в Spark 1.4.1 он заставляет Spark только смотреть на файл _common_metadata, который не является концом света, так как это небольшой файл, и в каждом каталоге есть только один из них. Тем не менее, это подводит нас к другому аспекту «налога на паркет» — задержкам с окончанием работы.

Отключение схемы слияния и управления схемой, используемой Spark, помогло сократить время запуска задания, но, как уже упоминалось, мы по-прежнему страдали от длительных задержек в конце задания. Мы уже знали об одной проблеме, связанной с Hadoop <-> S3, при использовании текстовых файлов. Hadoop, будучи неизменным, сначала записывает файлы во временный каталог, а затем копирует их. С S3 это не проблема, но операция копирования очень и очень дорогая. С помощью текстовых файлов DataBricks создала DirectOutputCommitter  (вероятно, для своего предложения Spark SaaS). Замена выходного коммиттера для текстовых файлов довольно проста — вам просто нужно установить «spark.hadoop.mapred.output.committer.class» в конфигурации Spark, например:

(spark-conf/set "spark.hadoop.mapred.output.committer.class" "com.appsflyer.spark.DirectOutputCommitter")

A similar solution exists for Parquet and unlike the solution for text files it is even part of the Spark distribution. However, to make things complicated you have to configure it on Hadoop configuration and not on the Spark configuration. To get the Hadoop configuration you first need to create a Spark context from the Spark configuration, call hadoopConfiguration on it and then set “spark.sql.parquet.output.committer.class” as in:

(let [ctx (spark/spark-context conf)
      hadoop-conf (.hadoopConfiguration ^JavaSparkContext ctx)]
     (.set hadoop-conf "spark.sql.parquet.output.committer.class" "org.apache.spark.sql.parquet.DirectParquetOutputCommitter"))

Using the DirectParquetOutputCommitter provided a significant reduction in the “Parquet Tax” but we still found that some jobs were taking a very long time to complete. Again the problem was the file system assumptions Spark and Hadoop hold which were  the culprits. Remember the “_common_metadata” Spark looks at the onset of a job – well, Spark spends a lot of time at the end of the job creating both this file and an additional MetaData file with additional info from the files that are in the directory. Again this is all done from one place (the driver) rather than being handled by the executors. When the job results in small files (even when there are couple of thousands of those) the process takes reasonable time. However, when the job results in larger files (e.g. when we ingest a full day of application launches) this takes upward of an hour. As with mergeSchema the solution is to manage metadata manually so we set “parquet.enable.summary-metadata” to false (again on the Hadoop configuration and generate the _common_metadata file ourselves (for the large jobs).

To sum up, Parquet and especially Spark are works in progress – making cutting edge technologies work for you can be a challenge and require a lot of digging. The documentation is far from perfect at times but luckily all the relevant technologies are open source (even the Amazon SDK), so you can always dive into the bug reports, code etc. to understand how things actually work and find the solutions you need. Also, from time to time you can find articles and blog posts that explain how to overcome the common issues in technologies you are using. I hope this post clears off some of the complications  of integrating Spark, Parquet and S3, which are, at the end of the day, all great technologies with a lot of potential.

(A version of this post was originally posted in AppsFlyer’s blog. Also special thanks to Morri Feldman andMichael Spector from AppsFlyer data team that did most of the work solving the problems discussed in this article)