Учебники

PySpark — RDD

Теперь, когда мы установили и настроили PySpark в нашей системе, мы можем программировать на Python для Apache Spark. Однако, прежде чем сделать это, давайте разберемся с фундаментальной концепцией Spark — RDD.

СДР обозначает Resilient Distributed Dataset , это элементы, которые работают и работают на нескольких узлах для параллельной обработки в кластере. СДР являются неизменяемыми элементами, что означает, что после создания СДР вы не сможете их изменить. СДР также являются отказоустойчивыми, поэтому в случае любого сбоя они восстанавливаются автоматически. Вы можете применить несколько операций к этим СДР для достижения определенной задачи.

Чтобы применить операции к этим RDD, есть два способа:

  • Преобразование и
  • действие

Позвольте нам понять эти два способа в деталях.

Преобразование — это операции, которые применяются к СДР для создания нового СДР. Filter, groupBy и map являются примерами преобразований.

Действие — это операции, которые применяются к RDD, который инструктирует Spark выполнять вычисления и отправлять результат обратно драйверу.

Чтобы применить любую операцию в PySpark, нам нужно сначала создать PyDpark RDD . Следующий блок кода содержит подробности класса PyDpark RDD —

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Давайте посмотрим, как выполнить несколько основных операций с помощью PySpark. Следующий код в файле Python создает слова RDD, в которых хранится упомянутый набор слов.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

Теперь мы выполним несколько операций над словами.

кол-()

Количество элементов в СДР возвращается.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Команда — Команда для count () является —

$SPARK_HOME/bin/spark-submit count.py

Выходные данные — выход для вышеуказанной команды —

Number of elements in RDD → 8

собирать ()

Все элементы в СДР возвращаются.

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command — команда для collect () —

$SPARK_HOME/bin/spark-submit collect.py

Выходные данные — выход для вышеуказанной команды —

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

Еогеасп (е)

Возвращает только те элементы, которые удовлетворяют условию функции внутри foreach. В следующем примере мы вызываем функцию print в foreach, которая печатает все элементы в RDD.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Команда — команда для foreach (f) —

$SPARK_HOME/bin/spark-submit foreach.py

Выходные данные — выход для вышеуказанной команды —

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

фильтр (F)

Возвращается новый RDD, содержащий элементы, которые удовлетворяют функции внутри фильтра. В следующем примере мы отфильтровываем строки, содержащие искру.

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Команда — Команда для фильтра (f) —

$SPARK_HOME/bin/spark-submit filter.py

Выходные данные — выход для вышеуказанной команды —

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

карта (f, preservePartitioning = False)

Новый RDD возвращается путем применения функции к каждому элементу в RDD. В следующем примере мы формируем пару ключ-значение и сопоставляем каждую строку со значением 1.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Команда — Команда для карты (f, preservePartitioning = False) —

$SPARK_HOME/bin/spark-submit map.py

Выходные данные — выходные данные вышеупомянутой команды —

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

уменьшить (е)

После выполнения указанной коммутативной и ассоциативной двоичной операции возвращается элемент в СДР. В следующем примере мы импортируем пакет add из оператора и применяем его к ‘num’, чтобы выполнить простую операцию добавления.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Команда — Команда для уменьшения (f) —

$SPARK_HOME/bin/spark-submit reduce.py

Выходные данные — выходные данные вышеупомянутой команды —

Adding all the elements -> 15

присоединиться (другое, numPartitions = Нет)

Он возвращает RDD с парой элементов с соответствующими ключами и всеми значениями для этого конкретного ключа. В следующем примере есть две пары элементов в двух разных RDD. После объединения этих двух RDD мы получаем RDD с элементами, имеющими совпадающие ключи и их значения.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Команда — Команда для объединения (другое, numPartitions = None) является —

$SPARK_HOME/bin/spark-submit join.py

Выходные данные — выход для вышеуказанной команды —

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

Кэш ()

Сохраните этот RDD с уровнем хранения по умолчанию (MEMORY_ONLY). Вы также можете проверить, кэшируется ли RDD или нет.

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Команда — команда для cache () —

$SPARK_HOME/bin/spark-submit cache.py

Выходные данные — выход для вышеуказанной программы —

Words got cached -> True

Это были некоторые из наиболее важных операций, выполняемых в PySpark RDD.