В последнем квартале 2019 года я разработал движок приема метаданных, использующий Spark. Каркас / библиотека имеет несколько шаблонов для обслуживания нескольких комбинаций источника и назначения. Например, для загрузки плоских файлов в облачное хранилище доступны два шаблона (один для загрузки данных в AWS S3, а другой для загрузки данных в BLOB-объект Azure).
Поскольку принципы загрузки данных изменились с Extract-Transform-Load (ETL) на Extract-Load-Transform (ETL) , такая структура очень полезна, поскольку сокращает время, необходимое для настройки заданий приема.
Важным аспектом любого механизма приема пищи является знание того, сколько записей было прочитано из данного источника и записано в место назначения. Обычно подход, который приходит на ум, заключается в выполнении операции подсчета в DataFrame.это было загружено. Это даст нам количество записей, загруженных из источника. В случае записи данных в хранилище нам нужно будет загрузить данные в другой DataFrame и рассчитать его.
Но операция подсчета в DataFrame может быть дорогой. Есть ли альтернатива? Как оказалось, есть. Альтернативой является регистрация на события Spark. Это делается путем расширения нашего класса от th- SparkListener
класса и переопределения либо OnStageCompleted
метода, либо OnTaskEnd
метода (в зависимости от того, что мы хотим сделать).
Всякий раз, когда действие завершается, Spark вызывает OnStageCompleted
метод на зарегистрированном слушателе. Этот метод позволяет нам отслеживать время выполнения и время ЦП, затраченное исполнителем. Когда задача завершена, Spark вызываетOnTaskEnd
метод на слушателе Spark. Этот метод можно использовать для определения количества прочитанных и записанных записей.
Вам также может понравиться:
Понимание модели исполнения Apache Spark с использованием SparkListeners — часть 1 .
Чтобы отслеживать время выполнения, количество прочитанных записей и количество записанных записей, я представлю несколько вспомогательных классов в этой статье. Чтобы зарегистрироваться для выполнения задания, вам нужно вывести свой класс из StageCompletedEventConsumer
черты. Чтобы зарегистрироваться на счетчик чтения, вам нужно вывести свой класс из этой RecordsLoadedEventConsumer
черты.
Чтобы зарегистрироваться для подсчета записи, вам нужно вывести свой класс из RecordsWrittenEventConsumer
черты. После получения классов по заданным признакам, вам нужно добавить класс в соответствующие классы менеджера. Когда событие происходит, Spark вызывает класс менеджера, который, в свою очередь, информирует всех зарегистрированных слушателей.
Scala
1
import java.util.Properties
2
import org.apache.spark.SparkContext
3
import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
4
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerStageCompleted}
5
import org.apache.spark.scheduler._
6
import scala.collection.mutable.
7
val spatkContext = sc
9
val sparkSession = spark
10
val sqlContext = sparkSession.sqlContext
11
trait StageCompletedEventConsumer {
13
def execute(executotRunTime: Long, executorCPUTime: Long)
14
}
15
class StageCompletionManager extends SparkListener
17
{
18
var consumerMap: scala.collection.mutable.Map[String, StageCompletedEventConsumer] = scala.collection.mutable.Map[String, StageCompletedEventConsumer)()
19
def addEventConsumer(SparkContext: SparkContext, id: String, consumer: StageCompletedEventConsumer)
21
{
22
consumerMap += (id -> consumer)
23
}
24
def removeEventConsumcr(id: String)
26
{
27
consumerMap -= id
28
}
29
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit =
31
{
32
for ( (k, v) <- consumerMap ) {
33
if ( v != null ) {
34
v.execute(stageCompleted.stageInfo.taskMetcics.executionRunTime, stageCompleted.stageInfo.taskMetcics.executorCpuTime)
35
}
36
}
37
}
38
}
39
trait RecordsLoadedEventConsumer {
41
def execute(recordsRead: Long)
42
}
43
class RecordsLoadedManager extends SparkListener
45
{
46
var consumerMap: scala.collection.mutable.Map[String, RecordsLoadedEventConsumer] = scala.collection.mutable.Map[String, RecordsLoadedEventConsumer)()
47
def addEventConsumer(SparkContext: SparkContext, id: String, consumer: RecordsLoadedEventConsumer)
49
{
50
consumerMap += (id -> consumer)
51
}
52
def removeEventConsumer(id: String)
54
{
55
consumerMap -= id
56
}
57
override def onTaskEnd(stageCompleted: SparkListenerTaskEnd): Unit =
59
{
60
val recordsRead = taskEnd.taskMetrics.inputMetrics.recordsRead
61
for ( (k, v) <- consumerMap ) {
62
if ( v != null ) {
63
v.execute(recordsRead)
64
}
65
}
66
}
67
}
68
trait RecordsWrittenEventConsumer {
70
def execute(recordsWritten: Long)
71
}
72
class RecordsWrittenManager extends SparkListener
74
{
75
var consumerMap: scala.collection.mutable.Map[String, RecordsWrittenEventConsumer] = scala.collection.mutable.Map[String, RecordsWrittenEventConsumer)()
76
def addEventConsumer(SparkContext: SparkContext, id: String, consumer: RecordsWrittenEventConsumer)
78
{
79
consumerMap += (id -> consumer)
80
}
81
def removeEventConsumer(id: String)
83
{
84
consumerMap -= id
85
}
86
override def onTaskEnd(stageCompleted: SparkListenerTaskEnd): Unit =
88
{
89
val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
90
for ( (k, v) <- consumerMap ) {
91
if ( v != null ) {
92
v.execute(recordsWritten)
93
}
94
}
95
}
96
}
97
class Consumer1 extends RecordsLoadedEventConsumer
99
{
100
override def execute (recordsRead: Long) {
101
println("Consumer 1: " + recordsRead.toString)
102
}
103
}
104
class Consumer2 extends RecordsLoadedEventConsumer
106
{
107
override def execute(recordsRead: Long) {
108
println("Consumer 2 : " + recordsRead.toString)
109
}
110
)
111
class Consumer3 extends StageCompletedEventConsumer
113
{
114
override def execute(executorRunTime: Long, executorRunTime: Long)
115
{
116
println ("Consumer 3: " + executorRunTime.toString + ", " + executorCPUTime.tostring)
117
}
118
}
119
val cl: Consumer1 = new Consumer1
121
val c2: Consumer2 = new Consumer2
122
val c3: Consumer3 = new Consumer3
123
val rm: RecordsLoadedManager = new RecordsLoadedManager
125
sparkContext.addSparkListener(rm)
126
rm.addEventConsumer(sparkContext, "cl", c1)
127
rm.addEventConsumer(sparkContext, "c2", c2)
128
val sm: StageCompletionManager = new StageCompletionManager
130
sparkContext.addSparkListene(sm)
131
sm.addEventConsumer(sparkContext, "c3", c3)
132
val inputPath = "stations.csv"
134
val df = spackSession.read.format("csv").option("header". "true").option("sep", ",").option("inferSchema", "true").csv(inputPath)
135
rm.removeEventConsuaer("c2")
137
val df = sparkSession.read.format("csv").option("header", "true").option(sep, ",").option("inferSchema", "true").csv(inputPath)
139
Прочитанное событие представляет интересную ситуацию. Когда Spark считывает данные, событие чтения вызывается дважды - в первый раз после чтения первой записи и во второй раз после загрузки всех записей. Другими словами, слушатель события чтения вызывается с двумя значениями. В первый раз значение прочитанных записей равно единице. Во второй раз значение прочитанных записей - это количество записей в наборе данных.