Теперь, когда мы установили и настроили PySpark в нашей системе, мы можем программировать на Python для Apache Spark. Однако, прежде чем сделать это, давайте разберемся с фундаментальной концепцией Spark — RDD.
СДР обозначает Resilient Distributed Dataset , это элементы, которые работают и работают на нескольких узлах для параллельной обработки в кластере. СДР являются неизменяемыми элементами, что означает, что после создания СДР вы не сможете их изменить. СДР также являются отказоустойчивыми, поэтому в случае любого сбоя они восстанавливаются автоматически. Вы можете применить несколько операций к этим СДР для достижения определенной задачи.
Чтобы применить операции к этим RDD, есть два способа:
- Преобразование и
- действие
Позвольте нам понять эти два способа в деталях.
Преобразование — это операции, которые применяются к СДР для создания нового СДР. Filter, groupBy и map являются примерами преобразований.
Действие — это операции, которые применяются к RDD, который инструктирует Spark выполнять вычисления и отправлять результат обратно драйверу.
Чтобы применить любую операцию в PySpark, нам нужно сначала создать PyDpark RDD . Следующий блок кода содержит подробности класса PyDpark RDD —
class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) )
Давайте посмотрим, как выполнить несколько основных операций с помощью PySpark. Следующий код в файле Python создает слова RDD, в которых хранится упомянутый набор слов.
words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] )
Теперь мы выполним несколько операций над словами.
кол-()
Количество элементов в СДР возвращается.
----------------------------------------count.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() print "Number of elements in RDD -> %i" % (counts) ----------------------------------------count.py---------------------------------------
Команда — Команда для count () является —
$SPARK_HOME/bin/spark-submit count.py
Выходные данные — выход для вышеуказанной команды —
Number of elements in RDD → 8
собирать ()
Все элементы в СДР возвращаются.
----------------------------------------collect.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Collect app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) coll = words.collect() print "Elements in RDD -> %s" % (coll) ----------------------------------------collect.py---------------------------------------
Command — команда для collect () —
$SPARK_HOME/bin/spark-submit collect.py
Выходные данные — выход для вышеуказанной команды —
Elements in RDD -> [ 'scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
Еогеасп (е)
Возвращает только те элементы, которые удовлетворяют условию функции внутри foreach. В следующем примере мы вызываем функцию print в foreach, которая печатает все элементы в RDD.
----------------------------------------foreach.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "ForEach app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def f(x): print(x) fore = words.foreach(f) ----------------------------------------foreach.py---------------------------------------
Команда — команда для foreach (f) —
$SPARK_HOME/bin/spark-submit foreach.py
Выходные данные — выход для вышеуказанной команды —
scala java hadoop spark akka spark vs hadoop pyspark pyspark and spark
фильтр (F)
Возвращается новый RDD, содержащий элементы, которые удовлетворяют функции внутри фильтра. В следующем примере мы отфильтровываем строки, содержащие искру.
----------------------------------------filter.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print "Fitered RDD -> %s" % (filtered) ----------------------------------------filter.py----------------------------------------
Команда — Команда для фильтра (f) —
$SPARK_HOME/bin/spark-submit filter.py
Выходные данные — выход для вышеуказанной команды —
Fitered RDD -> [ 'spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
карта (f, preservePartitioning = False)
Новый RDD возвращается путем применения функции к каждому элементу в RDD. В следующем примере мы формируем пару ключ-значение и сопоставляем каждую строку со значением 1.
----------------------------------------map.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print "Key value pair -> %s" % (mapping) ----------------------------------------map.py---------------------------------------
Команда — Команда для карты (f, preservePartitioning = False) —
$SPARK_HOME/bin/spark-submit map.py
Выходные данные — выходные данные вышеупомянутой команды —
Key value pair -> [ ('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1) ]
уменьшить (е)
После выполнения указанной коммутативной и ассоциативной двоичной операции возвращается элемент в СДР. В следующем примере мы импортируем пакет add из оператора и применяем его к ‘num’, чтобы выполнить простую операцию добавления.
----------------------------------------reduce.py--------------------------------------- from pyspark import SparkContext from operator import add sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print "Adding all the elements -> %i" % (adding) ----------------------------------------reduce.py---------------------------------------
Команда — Команда для уменьшения (f) —
$SPARK_HOME/bin/spark-submit reduce.py
Выходные данные — выходные данные вышеупомянутой команды —
Adding all the elements -> 15
присоединиться (другое, numPartitions = Нет)
Он возвращает RDD с парой элементов с соответствующими ключами и всеми значениями для этого конкретного ключа. В следующем примере есть две пары элементов в двух разных RDD. После объединения этих двух RDD мы получаем RDD с элементами, имеющими совпадающие ключи и их значения.
----------------------------------------join.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4)]) y = sc.parallelize([("spark", 2), ("hadoop", 5)]) joined = x.join(y) final = joined.collect() print "Join RDD -> %s" % (final) ----------------------------------------join.py---------------------------------------
Команда — Команда для объединения (другое, numPartitions = None) является —
$SPARK_HOME/bin/spark-submit join.py
Выходные данные — выход для вышеуказанной команды —
Join RDD -> [ ('spark', (1, 2)), ('hadoop', (4, 5)) ]
Кэш ()
Сохраните этот RDD с уровнем хранения по умолчанию (MEMORY_ONLY). Вы также можете проверить, кэшируется ли RDD или нет.
----------------------------------------cache.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words.cache() caching = words.persist().is_cached print "Words got chached > %s" % (caching) ----------------------------------------cache.py---------------------------------------
Команда — команда для cache () —
$SPARK_HOME/bin/spark-submit cache.py
Выходные данные — выход для вышеуказанной программы —
Words got cached -> True
Это были некоторые из наиболее важных операций, выполняемых в PySpark RDD.