Для параллельной обработки Apache Spark использует общие переменные. Копия общей переменной отправляется на каждый узел кластера, когда драйвер отправляет задачу исполнителю в кластере, чтобы ее можно было использовать для выполнения задач.
Apache Spark поддерживает два типа общих переменных:
- Broadcast
- аккумуляторный
Позвольте нам понять их в деталях.
Broadcast
Широковещательные переменные используются для сохранения копии данных на всех узлах. Эта переменная кэшируется на всех машинах и не отправляется на машины с задачами. Следующий блок кода содержит подробные данные о классе Broadcast для PySpark.
class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None )
В следующем примере показано, как использовать переменную Broadcast. Переменная Broadcast имеет атрибут value, который хранит данные и используется для возврата широковещательного значения.
----------------------------------------broadcast.py-------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Broadcast app") words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) data = words_new.value print "Stored data -> %s" % (data) elem = words_new.value[2] print "Printing a particular element in RDD -> %s" % (elem) ----------------------------------------broadcast.py--------------------------------------
Команда — Команда для широковещательной переменной выглядит следующим образом —
$SPARK_HOME/bin/spark-submit broadcast.py
Выход — Выход для следующей команды приведен ниже.
Stored data -> [ 'scala', 'java', 'hadoop', 'spark', 'akka' ] Printing a particular element in RDD -> hadoop
аккумуляторный
Аккумуляторные переменные используются для агрегирования информации посредством ассоциативных и коммутативных операций. Например, вы можете использовать аккумулятор для операции с суммой или счетчиков (в MapReduce). В следующем блоке кода содержатся сведения о классе Accumulator для PySpark.
class pyspark.Accumulator(aid, value, accum_param)
В следующем примере показано, как использовать переменную Accumulator. У переменной Accumulator есть атрибут с именем value, аналогичный тому, который имеет широковещательная переменная. Он хранит данные и используется для возврата значения аккумулятора, но может использоваться только в программе драйвера.
В этом примере переменная аккумулятора используется несколькими рабочими и возвращает накопленное значение.
----------------------------------------accumulator.py------------------------------------ from pyspark import SparkContext sc = SparkContext("local", "Accumulator app") num = sc.accumulator(10) def f(x): global num num+=x rdd = sc.parallelize([20,30,40,50]) rdd.foreach(f) final = num.value print "Accumulated value is -> %i" % (final) ----------------------------------------accumulator.py------------------------------------
Команда — Команда для переменной аккумулятора выглядит следующим образом:
$SPARK_HOME/bin/spark-submit accumulator.py
Выходные данные — выходные данные для вышеуказанной команды приведены ниже.