Статьи

Как войти в Apache Spark

Важной частью любого приложения является основная система журналов, которую мы включаем в нее. Журналы предназначены не только для отладки и отслеживания, но и для бизнес-аналитики. Построение надежной системы ведения журналов в наших приложениях может послужить отличным примером бизнес-проблем, которые мы решаем.

Log4j в Apache Spark

Spark использует log4j в качестве стандартной библиотеки для собственной регистрации. Все, что происходит внутри Spark, регистрируется в консоли оболочки и в настроенном хранилище. Spark также предоставляет шаблон для разработчиков приложений, чтобы мы могли использовать те же библиотеки log4j для добавления любых сообщений, которые мы хотим, к существующей и действующей реализации ведения журнала в Spark.

Настройка Log4j

В папке SPARK_HOME / conf находится файл log4j.properties.template, который служит отправной точкой для нашей собственной системы ведения журналов .

На основе этого файла мы создали файл log4j.properties и поместили его в тот же каталог.

log4j.properties выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
log4j.appender.myConsoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.myConsoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.myConsoleAppender.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
 
log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppender.File=/var/log/spark.log
log4j.appender.RollingAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n
 
log4j.appender.RollingAppenderU=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppenderU.File=/var/log/sparkU.log
log4j.appender.RollingAppenderU.DatePattern='.'yyyy-MM-dd
log4j.appender.RollingAppenderU.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppenderU.layout.ConversionPattern=[%p] %d %c %M - %m%n
 
 
# By default, everything goes to console and file
log4j.rootLogger=INFO, RollingAppender, myConsoleAppender
 
# My custom logging goes to another file
log4j.logger.myLogger=INFO, RollingAppenderU
 
# The noisier spark logs go to file only
log4j.logger.spark.storage=INFO, RollingAppender
log4j.additivity.spark.storage=false
log4j.logger.spark.scheduler=INFO, RollingAppender
log4j.additivity.spark.scheduler=false
log4j.logger.spark.CacheTracker=INFO, RollingAppender
log4j.additivity.spark.CacheTracker=false
log4j.logger.spark.CacheTrackerActor=INFO, RollingAppender
log4j.additivity.spark.CacheTrackerActor=false
log4j.logger.spark.MapOutputTrackerActor=INFO, RollingAppender
log4j.additivity.spark.MapOutputTrackerActor=false
log4j.logger.spark.MapOutputTracker=INFO, RollingAppender
log4j.additivty.spark.MapOutputTracker=false

По сути, мы хотим скрыть все журналы, которые генерирует Spark, чтобы нам не приходилось иметь дело с ними в оболочке. Мы перенаправляем их для входа в файловую систему. С другой стороны, мы хотим, чтобы наши собственные журналы регистрировались в оболочке и в отдельном файле, чтобы они не смешивались с журналами Spark. Отсюда мы укажем Splunk на файлы, в которых находятся наши собственные журналы, которые в данном конкретном случае являются /var/log/sparkU.log.

Этот файл ( log4j.properties ) выбирается Spark при запуске приложения, поэтому нам не нужно ничего делать, кроме размещения его в указанном месте.

Написание наших собственных журналов

Теперь, когда мы настроили компоненты, необходимые Spark для управления нашими журналами, нам просто нужно начать писать журналы в наших приложениях.

Чтобы показать, как это делается, напишем небольшое приложение, которое поможет нам в демонстрации.

Наше приложение:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
object app {
 def main(args: Array[String]) {
   val log = LogManager.getRootLogger
   log.setLevel(Level.WARN)
 
   val conf = new SparkConf().setAppName("demo-app")
   val sc = new SparkContext(conf)
 
   log.warn("Hello demo")
 
   val data = sc.parallelize(1 to 100000)
 
   log.warn("I am done")
 }
}

Запуск этого приложения Spark продемонстрирует, что наша система журналов работает. Мы сможем увидеть, как на демо Hello и я закончили запись сообщений в оболочке и в файловой системе, в то время как журналы Spark будут идти только в файловую систему.

Пока что все кажется простым, но есть проблема, о которой мы не упоминали.

Класс org.apache.log4j.Logger не сериализуем, что означает, что мы не можем использовать его внутри замыкания при выполнении операций с некоторыми частями Spark API.

Например, если мы делаем в нашем приложении:

1
2
3
4
5
6
7
val log = LogManager.getRootLogger
val data = sc.parallelize(1 to 100000)
 
data.map { value =>
   log.info(value)
   value.toString
}

Это не удастся при работе на Spark. Spark жалуется, что объект журнала не Serializable, поэтому он не может быть отправлен по сети работникам Spark.

Эту проблему на самом деле легко решить. Давайте создадим класс, который будет что-то делать с нашим набором данных, в то же время делая много записей.

1
2
3
4
5
6
7
8
9
class Mapper(n: Int) extends Serializable{
 @transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
 
 def doSomeMappingOnDataSetAndLogIt(rdd: RDD[Int]): RDD[String] =
   rdd.map{ i =>
     log.warn("mapping: " + i)
     (i + n).toString
   }
}

Mapper получает RDD [Int] и возвращает RDD [String], а также регистрирует, какое значение сопоставляется. В этом случае отметили, как объект журнала был помечен как @transient, что позволяет системе сериализации игнорировать объект журнала . Теперь Mapper сериализуется и отправляется каждому работнику, но объект журнала разрешается, когда он необходим в работнике, решая нашу проблему.

Другое решение состоит в том, чтобы обернуть объект журнала в конструкцию объекта и использовать его повсеместно. У нас скорее есть журнал внутри класса, который мы собираемся использовать, но альтернатива также действительна.

На данный момент все наше приложение выглядит следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.log4j.{Level, LogManager, PropertyConfigurator}
import org.apache.spark._
import org.apache.spark.rdd.RDD
 
class Mapper(n: Int) extends Serializable{
 @transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
 def doSomeMappingOnDataSetAndLogIt(rdd: RDD[Int]): RDD[String] =
   rdd.map{ i =>
     log.warn("mapping: " + i)
     (i + n).toString
   }
}
object Mapper {
 def apply(n: Int): Mapper = new Mapper(n)
}
object app {
 def main(args: Array[String]) {
   val log = LogManager.getRootLogger
   log.setLevel(Level.WARN)
   val conf = new SparkConf().setAppName("demo-app")
   val sc = new SparkContext(conf)
 
   log.warn("Hello demo")
 
   val data = sc.parallelize(1 to 100000)
   val mapper = Mapper(1)
   val other = mapper.doSomeMappingOnDataSetAndLogIt(data)
   other.collect()
 
   log.warn("I am done")
 }
}

Выводы

Наши журналы теперь отображаются в оболочке, а также хранятся в своих собственных файлах. Журналы Spark скрываются от оболочки и регистрируются в своем собственном файле. Мы также решили проблему сериализации, которая появляется при попытке войти в систему разных работников.

Теперь мы можем создавать более надежные системы BI на основе наших собственных журналов Spark, как мы это делаем с другими нераспределенными системами и приложениями, которые мы имеем сегодня. Бизнес-аналитика для нас очень важна, и всегда полезно иметь правильные идеи.

Ссылка: Как войти в Apache Spark от нашего партнера JCG Николаса А Переса в блоге Mapr .