В качестве продолжения поста об анатомии apache-spark-job я поделюсь с вами тем, как вы можете использовать Spark UI для настройки работы. Я продолжу с тем же примером, который использовался в предыдущем посте, новое приложение spark будет делать ниже вещей
— Прочтите парковочный билет Нью-Йорка
— Агрегирование по «Идентификационному номеру» и подсчет дат нарушения
— Сохранить результат
DAG для этого кода выглядит следующим образом
Это многоэтапное задание, поэтому требуется некоторая перестановка данных, для этого примера записи в произвольном порядке — 564 МБ, а выходной — 461 МБ.
Давайте посмотрим, что мы можем сделать, чтобы уменьшить это?
давайте возьмем подход сверху вниз от «Stage2». Первое, что приходит на ум, это изучить сжатие.
Текущий код
1
2
3
|
aggValue.map { case (key, value) = > Array(key, value. _ 1 , value. _ 2 .mkString( "," )).mkString( "\t" ) }.saveAsTextFile(s "/data/output/${now}" ) |
Новый код
1
2
3
|
aggValue.map { case (key, value) = > Array(key, value. _ 1 , value. _ 2 .mkString( "," )).mkString( "\t" ) }.saveAsTextFile(s "/data/output/${now}" , classOf[GzipCodec]) |
Новый код позволяет только gzip при записи, давайте посмотрим, что мы видим в spark UI
При только записи энкодера запись сократилась на 70%. Теперь он 135Mb, и это ускоряет работу.
Давайте посмотрим, что еще возможно, прежде чем мы погрузимся в настройку внутреннего
Окончательный вывод выглядит примерно так, как показано ниже
1
2
3
4
5
|
1RA32 1 05 /07/2014 92062KA 2 07 /29/2013 ,07 /18/2013 GJJ1410 3 12 /07/2016 ,03 /04/2017 ,04 /25/2015 FJZ3486 3 10 /21/2013 ,01 /25/2014 FDV7798 7 03 /09/2014 ,01 /14/2014 ,07 /25/2014 ,11 /21/2015 ,12 /04/2015 ,01 /16/2015 |
Дата нарушения хранится в необработанном формате, к этому можно применить небольшую кодировку, чтобы увеличить скорость.
Java 8 добавила LocalDate, чтобы упростить манипулирование датами, и этот класс поставляется с некоторыми удобными функциями, одной из которых является toEpocDay.
Эта функция преобразовывает дату в день с 1970 года, и это означает, что в 4 байтах (Int) мы можем хранить до 5 тысяч лет, это кажется большой экономией по сравнению с текущим форматом, который занимает 10 байтов.
Фрагмент кода с epocDay
1
2
3
4
|
val issueDate = LocalDate.parse(row(aggFieldsOffset.get( "issue date" ).get), ISSUE _ DATE _ FORMAT) val issueDateValues = mutable.Set[Int]() issueDateValues.add(issueDate.toEpochDay.toInt) result = (fieldOffset.map(fieldInfo = > row(fieldInfo. _ 2 )).mkString( "," ), ( 1 , issueDateValues)) |
Пользовательский интерфейс Spark после этого изменения. Я также сделал еще одно изменение, чтобы использовать KryoSerializer
Это огромное улучшение, запись в случайном порядке изменилась с 564 МБ до 409 МБ (на 27% лучше), а объем вывода с 134 МБ до 124 МБ (на 8% лучше)
Теперь давайте перейдем к другому разделу в Spark UI, который показывает журналы со стороны исполнителя.
Журналы GC для показа выше показа
01
02
03
04
05
06
07
08
09
10
11
|
<i>2018-10-28T17:13:35.332+0800: 130.281: [GC (Allocation Failure) [PSYoungGen: 306176K->20608K(327168K)] 456383K->170815K(992768K), 0.0222440 secs] [Times: user=0.09 sys=0.00, real=0.03 secs]< /i > <i>2018-10-28T17:13:35.941+0800: 130.889: [GC (Allocation Failure) [PSYoungGen: 326784K->19408K(327168K)] 476991K->186180K(992768K), 0.0152300 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]< /i > <i>2018-10-28T17:13:36.367+0800: 131.315: [GC (GCLocker Initiated GC) [PSYoungGen: 324560K->18592K(324096K)] 491332K->199904K(989696K), 0.0130390 secs] [Times: user=0.11 sys=0.00, real=0.01 secs]< /i > <i>2018-10-28T17:13:36.771+0800: 131.720: [GC (GCLocker Initiated GC) [PSYoungGen: 323744K->18304K(326656K)] 505058K->215325K(992256K), 0.0152620 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]< /i > <i>2018-10-28T17:13:37.201+0800: 132.149: [GC (Allocation Failure) [PSYoungGen: 323456K->20864K(326656K)] 520481K->233017K(992256K), 0.0199460 secs] [Times: user=0.12 sys=0.00, real=0.02 secs]< /i > <i>2018-10-28T17:13:37.672+0800: 132.620: [GC (Allocation Failure) [PSYoungGen: 326016K->18864K(327168K)] 538169K->245181K(992768K), 0.0237590 secs] [Times: user=0.17 sys=0.00, real=0.03 secs]< /i > <i>2018-10-28T17:13:38.057+0800: 133.005: [GC (GCLocker Initiated GC) [PSYoungGen: 324016K->17728K(327168K)] 550336K->259147K(992768K), 0.0153710 secs] [Times: user=0.09 sys=0.00, real=0.01 secs]< /i > <i>2018-10-28T17:13:38.478+0800: 133.426: [GC (Allocation Failure) [PSYoungGen: 322880K->18656K(326144K)] 564301K->277690K(991744K), 0.0156780 secs] [Times: user=0.00 sys=0.00, real=0.01 secs]< /i > <i>2018-10-28T17:13:38.951+0800: 133.899: [GC (Allocation Failure) [PSYoungGen: 323808K->21472K(326656K)] 582842K->294338K(992256K), 0.0157690 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]< /i > <i>2018-10-28T17:13:39.384+0800: 134.332: [GC (Allocation Failure) [PSYoungGen: 326624K->18912K(317440K)] 599490K->305610K(983040K), 0.0126610 secs] [Times: user=0.11 sys=0.00, real=0.02 secs]< /i > <i>2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K), 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs]< /i > |
Давайте сосредоточимся на одной строке
1
|
<i>2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] <b>600522K->320486K(987648K)< /b >, 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs]< /i > |
Куча до младшего GC составляла 600 МБ, а после этого — 320 МБ, а общий размер кучи — 987 МБ.
Исполнителю выделяется 2 ГБ, и это приложение Spark не использует всю память, мы можем увеличить нагрузку на исполнителя, отправив больше задач или задач больше.
Я уменьшу входной раздел с 270 до 100
100 входных разделов выглядит лучше примерно на 10 +% меньше данных для перемешивания.
Другие хитрости
Теперь я поделюсь некоторыми вещами, которые будут иметь большое значение в GC!
Код перед оптимизацией
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
|
private def mergeValues(value 1 : (Int, mutable.Set[Int]), value 2 : (Int, mutable.Set[Int])) : (Int, mutable.Set[Int]) = { val newCount = value 1 . _ 1 + value 2 . _ 1 val dates = value 1 . _ 2 dates.foreach(d = > value 2 . _ 2 .add(d)) (newCount, value 2 . _ 2 ) } private def saveData(aggValue : RDD[(String, (Int, mutable.Set[Int]))], now : String) = { aggValue .map { case (key, value) = > Array(key, value. _ 1 , value. _ 2 .mkString( "," )).mkString( "\t" ) }.coalesce( 100 ) .saveAsTextFile(s "/data/output/${now}" , classOf[GzipCodec]) } |
Код после оптимизации
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
private def mergeValues(value 1 : GroupByValue, value 2 : GroupByValue) : GroupByValue = { if (value 2 .days.size > value 1 .days.size) { value 2 .count = value 1 .count + value 2 .count value 1 .days.foreach(d = > value 2 .days.add(d)) value 2 } else { value 1 .count = value 1 .count + value 2 .count value 2 .days.foreach(d = > value 1 .days.add(d)) value 1 } } private def saveData(aggValue : RDD[(String, GroupByValue)], now : String) = { aggValue.mapPartitions(rows = > { val buffer = new StringBuffer() rows.map { case (key, value) = > buffer.setLength( 0 ) buffer .append(key).append( "\t" ) .append(value.count).append( "\t" ) .append(value.days.mkString( "," )) buffer.toString } }) .coalesce( 100 ) .saveAsTextFile(s "/data/output/${now}" , classOf[GzipCodec]) } |
Новый код выполняет оптимизированное слияние наборов, добавляет маленький набор к большому и также вводит класс Case.
Другая оптимизация заключается в функции сохранения, где она использует mapPartitions для уменьшения выделения объектов с помощью StringBuffer.
Я использовал http://gceasy.io для получения статистики GC.
Новый код производит меньше мусора, например.
Всего GC 126 ГБ против 122 ГБ (примерно на 4% лучше)
Максимальное время GC 720 мс против 520 мс (примерно на 25% лучше)
Оптимизация выглядит многообещающе.
Весь код, используемый в этом блоге, доступен на github repo sparkperformance.
Оставайтесь с нами, чтобы узнать больше об этом.
Смотрите оригинальную статью здесь: Insights от Spark UI
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |