Недавно я играл с Apache Amaterasu , это удивительный проект, который помогает развертывать конвейеры данных. Он все еще насиживается и над ним работает супер-дружная команда инженеров. Некоторые интересные функции выстроены в очередь. Не верь мне на слово. Пожалуйста, проверьте это сами .
Amaterasu запускает контейнеры (на YARN / Mesos) самостоятельно для каждого из этапов вашего конвейера данных. Все, что вам нужно, — это ваш репозиторий и конфигурация на основе YAML.
Мне было просто интересно узнать о запуске контейнеров на YARN и о том, как работает API, и подумал, что я должен попробовать сам. Это действительно интуитивно понятно, если мы понимаем небольшой набор конструкций.
Этот пост является попыткой минимального примера приложения YARN в Scala.
Примечание: полный код доступен на github
В этом приложении есть три основных класса:
1. SampleYarnClient
2. ApplicationMaster
3. DummyApplication (бизнес-логика)
1. SampleYarnClient
Это точка входа в программу. Делает следующее:
-
YarnClient. - Согласовывает ресурсы для контейнера ApplicationMaster с помощью
YarnClient. Для этого нужно инициироватьApplicationSubmissionContextкоторый является просто оболочкой дляResource,PriorityиContainerLaunchContext(среди прочих). Давайте быстро рассмотрим код и подробно рассмотрим эти три компонента SubmissionContext.
|
01
02
03
04
05
06
07
08
09
10
11
|
val yarnClient = YarnClient.createYarnClient() ...val application = yarnClient.createApplication() ...val context = application.getApplicationSubmissionContext context.setAMContainerSpec(amContainer) context.setApplicationName("Scala Yarn App") context.setResource(resource) context.setPriority(priority)yarnClient.submitApplication(context) |
а. Ресурс
Ресурсы — это простая оболочка вокруг процессора и памяти
|
1
|
val resource = Resource.newInstance(1024, 2) //1 GB memory and 2 cores |
б. приоритет
Приоритет — это просто целое число — чем выше число, тем выше приоритет
|
1
2
|
val priority = Records.newRecord(classOf[Priority]) priority.setPriority(1) |
с. ContainerLaunchContext
В этом простом примере ContainerLaunchRequest имеет три основных параметра:
- Команды (
List[String]): команда начальной загрузки (в идеалеjava <MainClass>) - LocalResources (
Map[String,LocalResource]):Map[String,LocalResource]и другие артефакты (свойства, библиотеки и т. Д.)Map[String,LocalResource]необходимы для запуска вашей команды. - Environment (
Map[String,String]): переменные среды, необходимые для программы
(другой важный — токены безопасности, которые здесь не используются, потому что мой локальный кластер не керберизован)
|
1
2
3
4
5
6
7
|
def createContainerContext(commands: List[String], resources: Map[String, LocalResource], environment: Map[String, String]): ContainerLaunchContext = { val launchContext = Records.newRecord(classOf[ContainerLaunchContext]) launchContext.setCommands(commands.asJava) launchContext.setLocalResources(resources.asJava) launchContext.setEnvironment(environment.asJava) launchContext } |
команды
Как я уже сказал, команды — это просто последовательность инструкций, которые вы хотели бы выполнить для запуска ApplicationMaster из оболочки.
|
1
2
3
4
5
6
7
|
val commands = List( "$JAVA_HOME/bin/java " + " -Xmx256m " + s" com.arunma.ApplicationMaster " + " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" ) |
LocalResources
Что касается этой программы, вам не нужны никакие свойства или файлы конфигурации. Все, что вам нужно, это только двоичный файл jar.
Примечание: способ сделать любой двоичный файл или ресурс доступным для YARN — это поместить двоичные файлы в расположение HDFS. Этот конкретный шаг, связанный с настройкой локальных ресурсов, просто означает, что мы сообщаем YARN загружать двоичные файлы из расположения HDFS и помещать их в локальный путь контейнера при запуске.
|
1
2
3
|
val localResources = Map( "SampleYarnApp-assembly-0.1.jar" -> setUpLocalResourceFromPath(yarnPath) ) |
Среда
Это пользовательские переменные среды или просто путь к классу, который необходимо установить для запуска вашего пакета.
2. ApplicationMaster
Теперь, когда мы обсудили SampleYarnClient , давайте обсудим второй. Этот класс, как видно из названия, является вашим ApplicationMaster (Дух!). Он отвечает за запуск контейнеров, которые, как ожидается, будут выполнять вашу «бизнес-логику». Шаги:
- AppMaster использует клиент ResourceManager (
AMRMClient) для создания запроса на контейнер —ContainerRequest. В этом примере используется асинхронная версия клиента —AMRMClientAsyncкоторая реализует серию обратных вызовов —onContainersAllocated,onContainersCompleted,onErrorт. Д. - Когда RM выделяет контейнер для приложения,
onContainersAllocatedобратный вызовonContainersAllocated. - Внутри
onContainersAllocated(теперь, когда у нас есть дескриптор контейнера), AppMaster затем используетNMClientAsyncдля запуска «делового» контейнера (DummyApplication). Это достигается путем создания другогоContainerLaunchContext(тот, который оборачивает команды, локальные ресурсы и переменные среды).
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
override def onContainersAllocated(containers: util.List[Container]): Unit = { val commands = List( "$JAVA_HOME/bin/java " + " -Xmx256m " + s" com.arunma.DummyApplication " + " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" ) val localResources = Map( "SampleYarnApp-assembly-0.1.jar" -> setUpLocalResourceFromPath(FileSystem.get(conf).makeQualified(new Path(sys.env("ARUN_JAR_PATH")))) ) val containerLaunchContext = createContainerContext(commands, localResources, buildEnvironment(Map())) containers.asScala.foreach { container => nmClient.startContainerAsync(container, containerLaunchContext) }} |
3. DummyApplication
Это «бизнес-логика». Не Scala в чистом смысле, но это помогает нам видеть бревна. Обратите внимание, что, поскольку это бесконечный цикл, нам придется принудительно убивать приложение.
|
1
2
3
4
5
6
|
object DummyApplication extends App { while(true) { println("Niceeeeeeeeee !!! This is the core application that is running within the container that got negotiated by from Application Master !!!") Thread.sleep(1000) } } |
Использование:
|
1
|
$ hadoop jar /Users/arun/IdeaProjects/SampleYarnApp/target/scala-2.11/SampleYarnApp-assembly-0.1.jar com.arunma.SampleYarnClient /Users/arun/IdeaProjects/SampleYarnApp/target/scala-2.11/SampleYarnApp-assembly-0.1.jar |
В качестве альтернативы, вы можете просто сделать
|
1
|
sbt assembly |
и запустите SampleYarnClient из вашей IDE, SampleYarnClient в качестве первого аргумента абсолютный путь к файлу сборки.
| Опубликовано на Java Code Geeks с разрешения Аруна Маниваннана, партнера нашей программы JCG. Смотрите оригинальную статью здесь: Создание приложения YARN с использованием Scala
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |