Учебники

Apache Spark — Развертывание

Приложение Spark, использующее spark-submit, — это команда оболочки, используемая для развертывания приложения Spark в кластере. Он использует все соответствующие менеджеры кластеров через единый интерфейс. Следовательно, вам не нужно настраивать приложение для каждого из них.

пример

Давайте возьмем тот же пример подсчета слов, который мы использовали ранее, используя команды оболочки. Здесь мы рассмотрим тот же пример в качестве искрового приложения.

Пример ввода

Следующий текст представляет собой входные данные, и файл с именем находится в .txt .

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Посмотрите на следующую программу —

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line  line.split(" ")) 
      .map(word  (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
} 

Сохраните вышеуказанную программу в файл с именем SparkWordCount.scala и поместите его в пользовательский каталог с именем spark-application .

Примечание. При преобразовании inputRDD в countRDD мы используем flatMap () для токенизации строк (из текстового файла) в слова, метод map () для подсчета частоты слова и метод reduByKey () для подсчета каждого повторения слова.

Используйте следующие шаги, чтобы отправить эту заявку. Выполните все шаги в каталоге spark-application через терминал.

Шаг 1: Загрузите Spark Ja

Для компиляции необходим jar ядра Spark, поэтому загрузите spark-core_2.10-1.3.0.jar по следующей ссылке Spar core jar и переместите файл jar из каталога загрузки в каталог spark-application .

Шаг 2: Скомпилируйте программу

Скомпилируйте вышеуказанную программу, используя команду, приведенную ниже. Эта команда должна быть выполнена из каталога spark-application. Здесь /usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar — это jar поддержки Hadoop, взятый из библиотеки Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Шаг 3: Создайте JAR

Создайте jar-файл приложения spark, используя следующую команду. Здесь wordcount — это имя файла для jar-файла.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Шаг 4: Отправить искру

Отправьте приложение spark, используя следующую команду —

spark-submit --class SparkWordCount --master local wordcount.jar

Если он выполнен успешно, вы найдете вывод, приведенный ниже. ОК, пропускающий следующий вывод, предназначен для идентификации пользователя, и это последняя строка программы. Если вы внимательно прочитаете следующий вывод, вы найдете разные вещи, такие как —

  • успешно запущен сервис ‘sparkDriver’ на порту 42954
  • MemoryStore запущен с объемом 267,3 МБ
  • Запущен SparkUI по адресу http://192.168.1.217:4040
  • Добавлен JAR-файл: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile в SparkPi.scala: 11) завершился за 0,566 с
  • Остановлен веб-интерфейс Spark на http://192.168.1.217:4040
  • MemoryStore очищен
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!  

Шаг 5: Проверка вывода

После успешного выполнения программы вы найдете каталог с именем outfile в каталоге spark-application.

Следующие команды используются для открытия и проверки списка файлов в каталоге outfile.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Команды для проверки вывода в файле part-00000 :

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Команды для проверки вывода в файле part-00001:

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Просмотрите следующий раздел, чтобы узнать больше о команде «spark-submit».

Дополнительные записи пути к классу для передачи драйверу.

Обратите внимание, что фляги, добавленные с помощью —jars, автоматически включаются в путь к классам.