Учебники

Apache Spark — основное программирование

Spark Core — основа всего проекта. Он обеспечивает распределенную диспетчеризацию задач, планирование и основные функции ввода / вывода. Spark использует специализированную фундаментальную структуру данных, известную как RDD (Resilient Distributed Datasets), которая представляет собой логический набор данных, распределенных по компьютерам. СДР могут быть созданы двумя способами; один из них — ссылка на наборы данных во внешних системах хранения, а второй — применение преобразований (например, map, filter, reducer, join) к существующим RDD.

Абстракция RDD предоставляется через встроенный в язык API. Это упрощает сложность программирования, потому что способ, которым приложения манипулируют RDD, подобен манипулированию локальными коллекциями данных.

Spark Shell

Spark предоставляет интерактивную оболочку — мощный инструмент для интерактивного анализа данных. Он доступен на языке Scala или Python. Основная абстракция Spark — это распределенная коллекция элементов, называемая Resilient Distributed Dataset (RDD). СДР могут быть созданы из входных форматов Hadoop (например, файлов HDFS) или путем преобразования других СДР.

Open Spark Shell

Следующая команда используется для открытия оболочки Spark.

$ spark-shell

Создать простую СДР

Давайте создадим простой RDD из текстового файла. Используйте следующую команду для создания простого СДР.

scala> val inputfile = sc.textFile(“input.txt”)

Выход для вышеуказанной команды

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API вводит несколько трансформаций и несколько действий для управления RDD.

Преобразования СДР

Преобразования RDD возвращают указатель на новый RDD и позволяют создавать зависимости между RDD. Каждый RDD в цепочке зависимостей (String of Dependencies) имеет функцию для вычисления своих данных и имеет указатель (зависимость) на свой родительский RDD.

Spark ленив, поэтому ничего не будет выполнено, если вы не вызовете какое-либо преобразование или действие, которое вызовет создание и выполнение задания. Посмотрите на следующий фрагмент примера подсчета слов.

Следовательно, преобразование СДР — это не набор данных, а шаг в программе (может быть, единственный шаг), в котором говорится Spark, как получать данные и что с ними делать.

Ниже приведен список преобразований СДР.

S.No Трансформации и смысл
1

карта (FUNC)

Возвращает новый распределенный набор данных, сформированный путем передачи каждого элемента источника через функцию func .

2

фильтр (FUNC)

Возвращает новый набор данных, сформированный путем выбора тех элементов источника, для которых func возвращает true.

3

flatMap (FUNC)

Аналогично map, но каждый входной элемент может быть сопоставлен с 0 или более выходными элементами (поэтому func должен возвращать Seq, а не один элемент).

4

mapPartitions (FUNC)

Аналогично map, но запускается отдельно на каждом разделе (блоке) RDD, поэтому func должен иметь тип Iterator <T> ⇒ Iterator <U> при работе на RDD типа T.

5

mapPartitionsWithIndex (FUNC)

Аналогично разделам карты, но также предоставляет func целочисленное значение, представляющее индекс раздела, поэтому func должен иметь тип (Int, Iterator <T>) ⇒ Iterator <U> при работе на RDD типа T.

6

образец (с заменой, дробью, семенами)

Выборка части данных с заменой или без нее с использованием заданного начального числа генератора случайных чисел.

7

союз (otherDataset)

Возвращает новый набор данных, который содержит объединение элементов в исходном наборе данных и аргумент.

8

Пересечение (otherDataset)

Возвращает новый RDD, содержащий пересечение элементов в наборе исходных данных и аргумента.

9

различны ([numTasks])

Возвращает новый набор данных, который содержит отдельные элементы исходного набора данных.

10

groupByKey ([numTasks])

При вызове из набора данных (K, V) пар возвращает набор данных из (K, Iterable <V>) пар.

Примечание. Если вы группируете данные для выполнения агрегации (например, суммы или среднего значения) по каждому ключу, то при использовании lowerByKey или aggregateByKey производительность будет значительно выше.

11

reduByKey (func, [numTasks])

При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, V), в которых значения для каждого ключа агрегируются с использованием заданной функции функции сокращения, которая должна иметь тип (V, V) ⇒ V Как и в groupByKey, число задач сокращения настраивается через необязательный второй аргумент.

12

aggregateByKey (zeroValue) (seqOp, combOp, [numTasks])

При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, U), в которых значения для каждого ключа агрегируются с использованием заданных функций объединения и нейтрального «нулевого» значения. Позволяет тип агрегированного значения, который отличается от типа входного значения, избегая ненужных распределений. Как и в groupByKey, число задач сокращения настраивается через необязательный второй аргумент.

13

sortByKey ([по возрастанию], [numTasks])

При вызове набора данных из пар (K, V), где K реализует Ordered, возвращает набор данных из пар (K, V), отсортированных по ключам в порядке возрастания или убывания, как указано в логическом аргументе возрастания.

14

присоединиться (otherDataset, [numTasks])

При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных из (K, (V, W)) пар со всеми парами элементов для каждого ключа. Внешние объединения поддерживаются с помощью leftOuterJoin, rightOuterJoin и fullOuterJoin.

15

cogroup (otherDataset, [numTasks])

При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных из (K, (Iterable <V>, Iterable <W>))). Эта операция также называется группой С.

16

декартовы (otherDataset)

При вызове наборов данных типов T и U возвращает набор данных из (T, U) пар (всех пар элементов).

17

труба (команда, [envVars])

Передайте каждый раздел RDD с помощью команды оболочки, например, Perl или bash. Элементы RDD записываются в stdin процесса, а строки, выводимые в его stdout, возвращаются как RDD строк.

18

коалесценции (numPartitions)

Уменьшите количество разделов в СДР до numPartitions. Полезно для более эффективного выполнения операций после фильтрации большого набора данных.

19

передел (numPartitions)

Произвольно переставьте данные в RDD, чтобы создать больше или меньше разделов и распределить их по ним. Это всегда перетасовывает все данные по сети.

20

repartitionAndSortWithinPartitions (разметки)

Разделите СДР в соответствии с указанным разделителем и в каждом полученном разделе сортируйте записи по их ключам. Это более эффективно, чем вызывать перераспределение и последующую сортировку внутри каждого раздела, потому что это может толкнуть сортировку вниз в механизм перемешивания.

карта (FUNC)

Возвращает новый распределенный набор данных, сформированный путем передачи каждого элемента источника через функцию func .

фильтр (FUNC)

Возвращает новый набор данных, сформированный путем выбора тех элементов источника, для которых func возвращает true.

flatMap (FUNC)

Аналогично map, но каждый входной элемент может быть сопоставлен с 0 или более выходными элементами (поэтому func должен возвращать Seq, а не один элемент).

mapPartitions (FUNC)

Аналогично map, но запускается отдельно на каждом разделе (блоке) RDD, поэтому func должен иметь тип Iterator <T> ⇒ Iterator <U> при работе на RDD типа T.

mapPartitionsWithIndex (FUNC)

Аналогично разделам карты, но также предоставляет func целочисленное значение, представляющее индекс раздела, поэтому func должен иметь тип (Int, Iterator <T>) ⇒ Iterator <U> при работе на RDD типа T.

образец (с заменой, дробью, семенами)

Выборка части данных с заменой или без нее с использованием заданного начального числа генератора случайных чисел.

союз (otherDataset)

Возвращает новый набор данных, который содержит объединение элементов в исходном наборе данных и аргумент.

Пересечение (otherDataset)

Возвращает новый RDD, содержащий пересечение элементов в наборе исходных данных и аргумента.

различны ([numTasks])

Возвращает новый набор данных, который содержит отдельные элементы исходного набора данных.

groupByKey ([numTasks])

При вызове из набора данных (K, V) пар возвращает набор данных из (K, Iterable <V>) пар.

Примечание. Если вы группируете данные для выполнения агрегации (например, суммы или среднего значения) по каждому ключу, то при использовании lowerByKey или aggregateByKey производительность будет значительно выше.

reduByKey (func, [numTasks])

При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, V), в которых значения для каждого ключа агрегируются с использованием заданной функции функции сокращения, которая должна иметь тип (V, V) ⇒ V Как и в groupByKey, число задач сокращения настраивается через необязательный второй аргумент.

aggregateByKey (zeroValue) (seqOp, combOp, [numTasks])

При вызове набора данных из пар (K, V) возвращает набор данных из пар (K, U), в которых значения для каждого ключа агрегируются с использованием заданных функций объединения и нейтрального «нулевого» значения. Позволяет тип агрегированного значения, который отличается от типа входного значения, избегая ненужных распределений. Как и в groupByKey, число задач сокращения настраивается через необязательный второй аргумент.

sortByKey ([по возрастанию], [numTasks])

При вызове набора данных из пар (K, V), где K реализует Ordered, возвращает набор данных из пар (K, V), отсортированных по ключам в порядке возрастания или убывания, как указано в логическом аргументе возрастания.

присоединиться (otherDataset, [numTasks])

При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных из (K, (V, W)) пар со всеми парами элементов для каждого ключа. Внешние объединения поддерживаются с помощью leftOuterJoin, rightOuterJoin и fullOuterJoin.

cogroup (otherDataset, [numTasks])

При вызове наборов данных типа (K, V) и (K, W) возвращает набор данных из (K, (Iterable <V>, Iterable <W>))). Эта операция также называется группой С.

декартовы (otherDataset)

При вызове наборов данных типов T и U возвращает набор данных из (T, U) пар (всех пар элементов).

труба (команда, [envVars])

Передайте каждый раздел RDD с помощью команды оболочки, например, Perl или bash. Элементы RDD записываются в stdin процесса, а строки, выводимые в его stdout, возвращаются как RDD строк.

коалесценции (numPartitions)

Уменьшите количество разделов в СДР до numPartitions. Полезно для более эффективного выполнения операций после фильтрации большого набора данных.

передел (numPartitions)

Произвольно переставьте данные в RDD, чтобы создать больше или меньше разделов и распределить их по ним. Это всегда перетасовывает все данные по сети.

repartitionAndSortWithinPartitions (разметки)

Разделите СДР в соответствии с указанным разделителем и в каждом полученном разделе сортируйте записи по их ключам. Это более эффективно, чем вызывать перераспределение и последующую сортировку внутри каждого раздела, потому что это может толкнуть сортировку вниз в механизм перемешивания.

действия

В следующей таблице приведен список действий, которые возвращают значения.

S.No Действие и смысл
1

уменьшить (FUNC)

Агрегируйте элементы набора данных с помощью функции func (которая принимает два аргумента и возвращает один). Функция должна быть коммутативной и ассоциативной, чтобы ее можно было правильно вычислить параллельно.

2

собирать ()

Возвращает все элементы набора данных в виде массива в программе драйвера. Это обычно полезно после фильтра или другой операции, которая возвращает достаточно маленькое подмножество данных.

3

кол()

Возвращает количество элементов в наборе данных.

4

первый()

Возвращает первый элемент набора данных (аналогично take (1)).

5

принять (п)

Возвращает массив с первыми n элементами набора данных.

6

takeSample (withReplacement, num, [seed])

Возвращает массив со случайной выборкой из num элементов набора данных с заменой или без нее, при желании предварительно указав начальное число генератора случайных чисел.

7

takeOrdered (n, [ordering])

Возвращает первые n элементов СДР, используя их естественный порядок или пользовательский компаратор.

8

saveAsTextFile (путь)

Записывает элементы набора данных в виде текстового файла (или набора текстовых файлов) в заданный каталог в локальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой Hadoop. Spark вызывает toString для каждого элемента, чтобы преобразовать его в строку текста в файле.

9

saveAsSequenceFile (путь) (Java и Scala)

Записывает элементы набора данных в виде файла последовательности Hadoop по заданному пути в локальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой Hadoop. Это доступно для RDD пар ключ-значение, которые реализуют интерфейс Hadoop с возможностью записи. В Scala он также доступен для типов, которые неявно преобразуются в Writable (Spark включает преобразования для базовых типов, таких как Int, Double, String и т. Д.).

10

saveAsObjectFile (путь) (Java и Scala)

Записывает элементы набора данных в простом формате, используя сериализацию Java, которую затем можно загрузить с помощью SparkContext.objectFile ().

11

countByKey ()

Доступно только для RDD типа (K, V). Возвращает хэш-карту (K, Int) пар с количеством каждого ключа.

12

Еогеасп (FUNC)

Запускает функцию func для каждого элемента набора данных. Обычно это делается для побочных эффектов, таких как обновление Аккумулятора или взаимодействие с внешними системами хранения.

Примечание. Изменение переменных, отличных от Accumulators, за пределами foreach () может привести к неопределенному поведению. См. Понимание замыканий для более подробной информации.

уменьшить (FUNC)

Агрегируйте элементы набора данных с помощью функции func (которая принимает два аргумента и возвращает один). Функция должна быть коммутативной и ассоциативной, чтобы ее можно было правильно вычислить параллельно.

собирать ()

Возвращает все элементы набора данных в виде массива в программе драйвера. Это обычно полезно после фильтра или другой операции, которая возвращает достаточно маленькое подмножество данных.

кол()

Возвращает количество элементов в наборе данных.

первый()

Возвращает первый элемент набора данных (аналогично take (1)).

принять (п)

Возвращает массив с первыми n элементами набора данных.

takeSample (withReplacement, num, [seed])

Возвращает массив со случайной выборкой из num элементов набора данных с заменой или без нее, при желании предварительно указав начальное число генератора случайных чисел.

takeOrdered (n, [ordering])

Возвращает первые n элементов СДР, используя их естественный порядок или пользовательский компаратор.

saveAsTextFile (путь)

Записывает элементы набора данных в виде текстового файла (или набора текстовых файлов) в заданный каталог в локальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой Hadoop. Spark вызывает toString для каждого элемента, чтобы преобразовать его в строку текста в файле.

saveAsSequenceFile (путь) (Java и Scala)

Записывает элементы набора данных в виде файла последовательности Hadoop по заданному пути в локальной файловой системе, HDFS или любой другой файловой системе, поддерживаемой Hadoop. Это доступно для RDD пар ключ-значение, которые реализуют интерфейс Hadoop с возможностью записи. В Scala он также доступен для типов, которые неявно преобразуются в Writable (Spark включает преобразования для базовых типов, таких как Int, Double, String и т. Д.).

saveAsObjectFile (путь) (Java и Scala)

Записывает элементы набора данных в простом формате, используя сериализацию Java, которую затем можно загрузить с помощью SparkContext.objectFile ().

countByKey ()

Доступно только для RDD типа (K, V). Возвращает хэш-карту (K, Int) пар с количеством каждого ключа.

Еогеасп (FUNC)

Запускает функцию func для каждого элемента набора данных. Обычно это делается для побочных эффектов, таких как обновление Аккумулятора или взаимодействие с внешними системами хранения.

Примечание. Изменение переменных, отличных от Accumulators, за пределами foreach () может привести к неопределенному поведению. См. Понимание замыканий для более подробной информации.

Программирование с помощью RDD

Давайте рассмотрим реализации нескольких преобразований и действий RDD в программировании RDD на примере.

пример

Рассмотрим пример подсчета слов — он подсчитывает каждое слово, встречающееся в документе. Рассмотрим следующий текст в качестве входных данных и сохраняем как файл input.txt в домашнем каталоге.

input.txt — входной файл.

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

Следуйте процедуре, приведенной ниже, чтобы выполнить данный пример.

Открытый Spark-Shell

Следующая команда используется для открытия искровой оболочки. Как правило, искра строится с использованием Scala. Поэтому программа Spark работает в среде Scala.

$ spark-shell

Если оболочка Spark открывается успешно, вы найдете следующий вывод. Посмотрите на последнюю строку вывода «Контекст Spark, доступный как sc» означает, что контейнер Spark автоматически создает объект контекста spark с именем sc . Перед началом первого шага программы должен быть создан объект SparkContext.

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

Создать СДР

Сначала мы должны прочитать входной файл с помощью Spark-Scala API и создать RDD.

Следующая команда используется для чтения файла из заданного местоположения. Здесь создается новый RDD с именем inputfile. String, заданная в качестве аргумента в методе textFile («»), является абсолютным путем к имени входного файла. Однако если указано только имя файла, это означает, что входной файл находится в текущем местоположении.

scala> val inputfile = sc.textFile("input.txt")

Выполнить преобразование количества слов

Наша цель — посчитать слова в файле. Создайте плоскую карту для разбиения каждой строки на слова ( flatMap (строка ⇒ line.split («») ).

Затем прочитайте каждое слово как ключ со значением ‘1’ (<ключ, значение> = <слово, 1>), используя функцию карты (карта (слово ⇒ (слово, 1) ).

Наконец, уменьшите эти ключи, добавив значения похожих ключей ( reduByKey (_ + _) ).

Следующая команда используется для выполнения логики подсчета слов. После выполнения этого вы не найдете никаких выходных данных, потому что это не действие, а преобразование; указывать новый RDD или сообщать искре, что делать с данными данными)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

Текущий СДР

При работе с RDD, если вы хотите узнать о текущем RDD, используйте следующую команду. Он покажет вам описание текущего RDD и его зависимостей для отладки.

scala> counts.toDebugString

Кэширование трансформаций

Вы можете пометить СДР как сохраняемый с помощью методов persist () или cache (). При первом вычислении в действии оно будет храниться в памяти узлов. Используйте следующую команду, чтобы сохранить промежуточные преобразования в памяти.

scala> counts.cache()

Применяя действие

Применение действия, такого как сохранение всех преобразований, приводит к созданию текстового файла. Аргумент String для метода saveAsTextFile («») — это абсолютный путь к выходной папке. Попробуйте следующую команду, чтобы сохранить вывод в текстовом файле. В следующем примере папка «output» находится в текущем местоположении.

scala> counts.saveAsTextFile("output")

Проверка вывода

Откройте другой терминал, чтобы перейти в домашнюю директорию (где в другом терминале выполняется искра). Используйте следующие команды для проверки выходного каталога.

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

Следующая команда используется для просмотра выходных данных из файлов Part-00000 .

[hadoop@localhost output]$ cat part-00000

Выход

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 

Следующая команда используется для просмотра выходных данных из файлов Part-00001 .

[hadoop@localhost output]$ cat part-00001 

Выход

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

ООН сохранит хранилище

Перед тем как UN-persisting, если вы хотите увидеть место для хранения, которое используется для этого приложения, используйте следующий URL в вашем браузере.

http://localhost:4040

Вы увидите следующий экран, который показывает объем памяти, используемый для приложения, которое выполняется в оболочке Spark.

место для хранения

Если вы хотите, чтобы UN-persist хранилище определенного RDD не использовалось, используйте следующую команду.

Scala> counts.unpersist()

Вы увидите вывод следующим образом —

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

Для проверки места в браузере используйте следующий URL.

http://localhost:4040/

Вы увидите следующий экран. Он показывает объем памяти, используемый для приложения, которое выполняется в оболочке Spark.