Статьи

Устранение неполадок развертывания расширенного кластера Apache Spark

В этом учебном примере по Apache Spark мы рассмотрим несколько вариантов, когда ваш код Scala Spark не разворачивается должным образом. Например, ваша программа драйвера Spark полагается на сторонний jar-файл, совместимый только с Scala 2.11, но ваш Spark Cluster основан на Scala 2.10? Может быть, ваш код опирается на более новую версию стороннего jar, также используемого Apache Spark? Или, может быть, вы хотите, чтобы ваш код использовал версию Spark определенного jar вместо jar, указанного вашим кодом.

В любом из этих случаев ваше развертывание в Spark Cluster не будет гладким. Итак, в этом посте мы рассмотрим способы решения всех трех проблем.

обзор

Мы рассмотрим три конкретных вопроса при развертывании в кластере Spark и способы их решения в этом посте:

  1. Использование Apache Spark с Scala 2.11
  2. Перекрывающие банки, используемые Spark с более новыми версиями
  3. Исключая jars из вашего кода, чтобы использовать вместо этого версию Spark

Все эти проблемы будут решаться на основе кода потоковой передачи искры, использованного в предыдущем руководстве по потоковой передаче Spark . Ссылки на скачивание исходного кода и скринкасты доступны в конце этого поста.

Испытание 1 Apache Spark со Scala 2.11

Я понятия не имел, что все может пойти не так со следующим файлом build.sbt.

Spark с Scala 2.11

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
name := "spark-streaming-example"
  
version := "1.0"
  
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
  
scalaVersion := "2.11.8"
  
resolvers += "jitpack" at "https://jitpack.io"
  
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
  "org.scalaj" %% "scalaj-http" % "2.3.0",
  "org.jfarcand" % "wcs" % "1.5"
)

Хотя это не выделяется, проблема будет с клиентской библиотекой WebSocket. Клиентская библиотека WebSocket не работает с кластером Apache Spark, скомпилированным для Scala 2.10.

Была ошибка

NoSuchMethodError: scala.Predef $ .ArrowAssoc

1
2
Exception in thread "Thread-28" java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
 at scalaj.http.HttpConstants$.liftedTree1$1(Http.scala:637)

Такая ошибка указывает на проблему несовместимости версии Scala.

wcs скомпилирован для Scala 2.11, и я не смог найти другого клиента WebSocket для использования в этом проекте, поэтому я исследовал компиляцию Spark для совместимости с Scala 2.11. Оказывается, это не имеет большого значения.

Создайте Apache Spark с помощью Scala 2.11.

  1. Скачать источник (скринкаст ниже)
  2. Запустить скрипт для изменения 2.11
  3. Запустите скрипт make-distribution.sh

Скриншот из этих трех шагов:

Команды запускаются в Screencast:

1
2
3
./dev/change-scala-version.sh 2.11
  
./make-distribution.sh --name spark-1.6.1-scala-2.11 --tgz -Dscala-2.11 -Pyarn -Phadoop-2.4

После создания дистрибутива я запустил мастер и рабочий Spark и снова попытался выполнить оптимистическое развертывание. Вот как я столкнулся с следующим испытанием.

Испытание 2 Несовместимые банки между программой Spark и Scala — используйте свою банку

Когда я попытался развернуть сборку jar на новый пользовательский кластер Spark, созданный для Scala 2.11, я столкнулся с другой проблемой, как показано на этом скриншоте:

Как вы видите на скриншоте, в HttpClient на основе Netty были проблемы с SSL.

Задав вопрос доктору Гугсу, я решил, что Spark использует Akka Actor для RPC и обмена сообщениями, который в свою очередь использует Netty. И оказывается, что версия Spty / Akka для Netty является несовместимой версией с библиотекой scalaj-http, используемой в этом проекте.

Напомним из команды, следующее в собранном виде:

1
2
3
4
5
6
7
8
9
~/Development/spark-course/spark-streaming $ sbt assembly
[info] Loading project definition from /Users/toddmcgrath/Development/spark-course/spark-streaming/project
[info] Set current project to spark-streaming-example (in build file:/Users/toddmcgrath/Development/spark-course/spark-streaming/)
[info] Including from cache: wcs-1.5.jar
[info] Including from cache: slf4j-api-1.7.12.jar
[info] Including from cache: scalaj-http_2.11-2.3.0.jar
[info] Including from cache: async-http-client-1.9.28.jar
[info] Including from cache: netty-3.10.3.Final.jar
[info] Checking every *.class/*.jar file's SHA-1.

Мне нужен был способ настроить Spark для использования моего netty-3.10.3.Final.jar вместо старой версии, используемой в Akka.

Решение состояло в том, чтобы использовать переменную конфигурации при условии ответа. Эта переменная описывается как

1
"Whether to give user-added jars precedence over Spark's own jars when loading classes in the the driver. This feature can be used to mitigate conflicts between Spark's dependencies and user dependencies. It is currently an experimental feature. This is used in cluster mode only."

Итак, я попытался снова выполнить развертывание с этой переменной conf, как показано на следующем скриншоте:

Задача 3 Несовместимые банки между программой Spark и Scala — используйте Spark Jar

Что делать, если вы хотите использовать банку вместо версии Spark? По сути, это противоположность ранее описанной Задаче 2.

Я просмотрел вывод и вижу, что slf4j был включен в сборку. Что ж, из логов мы видим, что Spark уже использует slf4j, и теперь наша программа драйверов пытается порождать другой экземпляр. Давайте вместо этого воспользуемся уже созданным Spark’ом slf4j.

Чтобы удалить или исключить некоторые фляги от включения в фаст-фуд, подключите к плагину sbt-assembly

Обновите файл build.sbt следующим образом:

Исключая банки со сборкой

1
2
3
4
5
excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
  cp filter {
                i => i.data.getName == "slf4j-api-1.7.12.jar"
            }
}

И снова запустите, и вы будете готовы попробовать другое развертывание Spark.

Все работает, как вы можете видеть в этом скриншоте

Вывод

В этом посте представлены три проблемы и решения при устранении неполадок, возникающих при развертывании Apache Spark с Scala в кластер Spark. Мы рассмотрели три сценария:

  • Apache Spark со Scala 2.11
  • Установка приоритета Apache Spark по сравнению с вашим путем исключения из сборки
  • Предоставление jar (s) приоритета перед сопоставимой версией jar Apache Spark.

Дальнейшие ссылки

http://stackoverflow.com/questions/23330449/how-does-spark-use-netty/23333955#23333955

https://issues.apache.org/jira/browse/SPARK-4738

http://spark.apache.org/docs/latest/configuration.html

http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211