В первой части мы узнали, как проверить сбор информации о происхождении данных с помощью
Сплайн от оболочки искры . То же самое можно сделать в любом приложении Scala или Java Spark. Те же зависимости для оболочки Spark должны быть зарегистрированы в выбранном вами инструменте сборки (Maven, Gradle или sbt):
|
1
2
3
4
5
6
7
8
9
|
groupId: za.co.absa.splineartifactId: spline-coreversion: 0.3.5groupId: za.co.absa.splineartifactId: spline-persistence-mongoversion:0.3.5groupId: za.co.absa.splineartifactId:spline-core-spark-adapter-2.3version:0.3.5 |
Со ссылкой на Scala и Spark 2.3.x, работа Spark выглядит так:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// Create the Spark sessionval sparkSession = SparkSession.builder().appName("Spline Tester").getOrCreate()// Init SplineSystem.setProperty("spline.persistence.factory", "za.co.absa.spline.persistence.mongo.MongoPersistenceFactory")System.setProperty("spline.mongodb.url", args(0))System.setProperty("spline.mongodb.name", args(1))import za.co.absa.spline.core.SparkLineageInitializer._sparkSession.enableLineageTracking()//Do something with DataFramesimport sparkSession.sqlContext.implicits._val df1 = sparkSession.sparkContext.parallelize(1 to 10000, 42).toDF("FirstValue")val df2 = sparkSession.sparkContext.parallelize(1.to(100000, 17), 42).toDF("SecondValue")val output = df1.crossJoin(df2).where('FirstValue % 42 === 'SecondValue % 42)// Write results to file systemoutput.write.format("parquet").save("splinetester.parquet")// Stop the Spark SessionsparkSession.stop() |
можно отправить в кластер Spark следующим образом:
|
1
|
$SPARK_HOME/bin/spark-submit --class org.googlielmo.splinetest.SplineExample --master <url> --packages "za.co.absa.spline:spline-core:0.3.5,za.co.absa.spline:spline-persistence-mongo:0.3.5,za.co.absa.spline:spline-core-spark-adapter-2.3:0.3.5" splinetest-1.0.jar mongodb://<username>:<password>@<hostname>:<port> <dbname> |
Свойства конфигурации Spline также можно сохранить в файле свойств в пути к классам приложения. Вот полный список доступных свойств Spline:
- spline.mode : 3 возможных значения, BEST_EFFORT (по умолчанию), DISABLED , REQUIRED . Если BEST_EFFORT, Spline пытается инициализировать себя, но в случае неудачи переключается в режим DISABLED, чтобы приложение Spark могло нормально работать без отслеживания происхождения. Если ОТКЛЮЧЕНО, никакого отслеживания происхождения вообще не происходит. Если ТРЕБУЕТСЯ, должен ли Spline по какой-либо причине не выполнить инициализацию самостоятельно, приложение Spark прерывает работу с ошибкой.
- spline.persistence.factory : может быть za.co.absa.spline.persistence.mongo.MongoPersistenceFactory (для сохранения в MongoDB) или za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory (для сохранения в HDFS).
- spline.mongodb.url : строка подключения MongoDB (только для постоянства MongoDB).
- spline.mongodb.name : имя базы данных MongoDB (только для постоянства MongoDB).
- spline.persistence.composition.factories : список фабрик, разделенных запятыми, для делегирования (только в случае Composition Factories).
При первом включении Spline из задания Spark он создает 6 коллекций в целевой базе данных MongoDB:
-
- attribute_v4 : информация об атрибутах задействованных наборов данных Spark.
- dataTypes_v4 : информация о типах данных для каждого происхождения данных.
- набор данных_v4 : информация о наборах данных .
- lineages_v4 : графики линий данных для наборов данных Spark.
- operations_v4 : операции над наборами данных по всем линиям.
- transfors_v4 : преобразования DataSets в разных линиях.
Документы в этих 6 коллекциях используются веб-приложением Spline для создания визуального представления линий в пользовательском интерфейсе.
В третьей и последней части этой серии я собираюсь поделиться результатами после первых недель принятия этого проекта в предсерийной среде Spark.
| См. Оригинальную статью здесь: Изучение Spline Data Tracker и инструмента визуализации для Apache Spark (часть 2)
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |