Статьи

Функциональный подход к ведению журнала в Apache Spark

Вход в Apache Spark очень прост, так как Spark предлагает доступ к логобъекту из коробки; необходимо выполнить только некоторые настройки конфигурации. В предыдущем посте мы рассмотрели, как это сделать, и выявили некоторые проблемы, которые могут возникнуть. Однако представленное решение может вызвать некоторые проблемы, когда вы будете готовы собирать журналы, поскольку они распределены по всему кластеру. Даже если вы используете возможности агрегирования журналов YARN, могут возникнуть некоторые споры, которые могут повлиять на производительность, или вы можете получить чередования журналов, которые повреждают природу самого журнала.

В этом посте я покажу, как решить эти проблемы, используя другой, более функциональный подход.

Писатель Монады

Я не собираюсь вдаваться в подробности о монадах или Writer Monad, поэтому, если вы хотите узнать больше, пожалуйста, прочитайте « Функторы, аппликативы и монады в картинках », которые очень информативны по этой теме.

Просто чтобы поместить вещи в контекст, допустим, что средство записи монад ( writer ) — это контейнер, который содержит текущее значение вычисления в дополнение к истории (log) значения (набора преобразования значения).

Поскольку писатель обладает монадическими свойствами, он позволяет нам выполнять функциональные преобразования, и мы скоро увидим, как все слипается.

Упрощенный журнал

Следующий код демонстрирует упрощенный журнал.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
object app {
  def main(args: Array[String]) {
    val log = LogManager.getRootLogger
    log.setLevel(Level.WARN)
 
    val conf = new SparkConf().setAppName("demo-app")
    val sc = new SparkContext(conf)
 
    log.warn("Hello demo")
 
    val data = sc.parallelize(1 to 100000)
 
    log.warn("I am done")
  }
}

Единственное, на что следует обратить внимание, так это то, что регистрация на самом деле происходит в драйвере Spark, поэтому у нас нет проблем с синхронизацией или конфликтами. Однако все начинает усложняться, как только мы начинаем распространять наши вычисления.

Следующий код не будет работать (прочитайте предыдущий пост, чтобы узнать, почему)

1
2
3
4
5
6
7
val log = LogManager.getRootLogger
val data = sc.parallelize(1 to 100000)
 
data.map { value =>
    log.info(value)
    value.toString
}

Решение этой проблемы также было представлено в предыдущем посте , но для управления журналами требуется дополнительная работа.

Как только мы начинаем регистрироваться на каждом узле кластера, нам нужно перейти на каждый узел и собрать каждый файл журнала, чтобы понять, что находится в журналах. Надеемся, что вы используете какой-то инструмент для решения этой задачи, такой как Splunk, Datalog и т. Д. Однако вам все равно нужно знать, как получить эти журналы в вашей системе.

Наш набор данных

Наш набор данных представляет собой набор класса «Person», который будет преобразован при сохранении единого журнала операций над нашим набором данных.

Предположим, мы хотим, чтобы наш набор данных был загружен, отфильтровал каждого человека, которому меньше 20 лет, и, наконец, извлек его / ее имя. Это очень глупый пример, но он продемонстрирует, как создаются журналы. Вы можете заменить эти вычисления, но идея создания единого журнала останется.

Получение писателя

Чтобы использовать библиотеку TypeLevel / Cats для импорта модуля записи монад, мы добавляем следующую строку в наш файл build.sbt.

libraryDependencies += "org.typelevel" %% "cats" % "0.6.1"

Играя с нашими данными

Теперь давайте определим преобразования, которые мы собираемся использовать. Сначала давайте загрузим данные.

1
2
3
def loadPeopleFrom(path: String)(implicit sc: SparkContext) =
  s"loading people from $path" ~> sc.textFile(path)
                                    .map(x => User(x.split(",")(0), x.split(",")(1).toInt))

Здесь операция ~> определяется через неявные преобразования следующим образом:

1
2
3
implicit class toW(s: String) {
  def ~>[A](rdd: RDD[A]): Writer[List[String], RDD[A]] = Writer(s :: Nil, rdd)
}

Если вы внимательно посмотрите, наша операция загрузки не возвращает RDD; на самом деле, он возвращает средство записи монад, которое отслеживает журналы.

Давайте определим фильтр, который мы хотим применить к коллекции пользователей.

1
def filter(rdd: RDD[User])(f: User => Boolean) = "filtering users" ~> rdd.filter(f)

Опять же, мы применяем ту же функцию (~>) для отслеживания этого преобразования.

Наконец, мы определяем отображение, которое следует тому же шаблону, который мы только что видели.

1
2
def mapUsers(rdd: RDDUser])(prefix: String): Writer[List[String], RDD[String]] =
  "mapping users" ~> rdd.map(p => prefix + p.name)

Положить его вместе

До сих пор мы только определили наши преобразования, но нам нужно соединить их вместе. Scala for — это очень удобный способ работы с монадическими структурами. Посмотрим как.

1
2
3
4
5
6
7
val result = for {
  person          <- loadPeopleFrom("~/users_dataset/")(sc)
  filtered        <- filter(person)(_.age < 20)
  namesWithPrefix <- mapUsers(filtered)("hello")
} yield namesWithPrefix
 
val (log, rdd) = result.run

Обратите внимание, что результат имеет тип: Writer[List[String], RDD[String]] .

alling result.run даст нам log: List[String] а окончательное вычисление будет выражено как rdd: RDD[String] .

На этом этапе мы могли бы использовать Spark logger для записи журнала, сгенерированного цепочкой преобразований. Обратите внимание, что эта операция будет выполнена на ведущем устройстве Spark, что означает, что будет создан один файл журнала, содержащий всю информацию журнала. Мы также устраняем потенциальные проблемы конфликта во время записи журнала. Кроме того, мы не блокируем файл журнала, что позволяет избежать проблем с производительностью путем последовательного создания и записи в файл.

Вывод

В этой записи блога я показал вам, как улучшить вход в Apache Spark с помощью Monad Writer. Этот функциональный подход позволяет вам распределять создание журналов вместе с вашими вычислениями, что очень хорошо делает Spark. Однако вместо записи журналов на каждом рабочем узле вы собираете их обратно в мастер, чтобы записать их.

Этот механизм имеет определенные преимущества перед предыдущей реализацией. Теперь вы можете точно контролировать, как и когда будут записываться ваши журналы, вы можете повысить производительность, удалив операции ввода-вывода на рабочих узлах, вы можете устранить проблемы синхронизации, записав журналы в последовательном порядке, и вы можете избежать опасности рыболовных журналов по всему вашему кластеру. Если у вас есть какие-либо вопросы об этом функциональном подходе к регистрации в Apache Spark, пожалуйста, задавайте их в разделе комментариев ниже.

Ссылка: Функциональный подход к регистрации в Apache Spark от нашего партнера JCG Николаса А. Переса в блоге Mapr .