Недавно я играл с Apache Amaterasu , это удивительный проект, который помогает развертывать конвейеры данных. Он все еще насиживается и над ним работает супер-дружная команда инженеров. Некоторые интересные функции выстроены в очередь. Не верь мне на слово. Пожалуйста, проверьте это сами .
Amaterasu запускает контейнеры (на YARN / Mesos) самостоятельно для каждого из этапов вашего конвейера данных. Все, что вам нужно, — это ваш репозиторий и конфигурация на основе YAML.
Мне было просто интересно узнать о запуске контейнеров на YARN и о том, как работает API, и подумал, что я должен попробовать сам. Это действительно интуитивно понятно, если мы понимаем небольшой набор конструкций.
Этот пост является попыткой минимального примера приложения YARN в Scala.
Примечание: полный код доступен на github
В этом приложении есть три основных класса:
1. SampleYarnClient
2. ApplicationMaster
3. DummyApplication
(бизнес-логика)
1. SampleYarnClient
Это точка входа в программу. Делает следующее:
-
YarnClient
. - Согласовывает ресурсы для контейнера ApplicationMaster с помощью
YarnClient
. Для этого нужно инициироватьApplicationSubmissionContext
который является просто оболочкой дляResource
,Priority
иContainerLaunchContext
(среди прочих). Давайте быстро рассмотрим код и подробно рассмотрим эти три компонента SubmissionContext.
01
02
03
04
05
06
07
08
09
10
11
|
val yarnClient = YarnClient.createYarnClient() ... val application = yarnClient.createApplication() ... val context = application.getApplicationSubmissionContext context.setAMContainerSpec(amContainer) context.setApplicationName( "Scala Yarn App" ) context.setResource(resource) context.setPriority(priority) yarnClient.submitApplication(context) |
а. Ресурс
Ресурсы — это простая оболочка вокруг процессора и памяти
1
|
val resource = Resource.newInstance( 1024 , 2 ) //1 GB memory and 2 cores |
б. приоритет
Приоритет — это просто целое число — чем выше число, тем выше приоритет
1
2
|
val priority = Records.newRecord(classOf[Priority]) priority.setPriority( 1 ) |
с. ContainerLaunchContext
В этом простом примере ContainerLaunchRequest имеет три основных параметра:
- Команды (
List[String]
): команда начальной загрузки (в идеалеjava <MainClass>
) - LocalResources (
Map[String,LocalResource]
):Map[String,LocalResource]
и другие артефакты (свойства, библиотеки и т. Д.)Map[String,LocalResource]
необходимы для запуска вашей команды. - Environment (
Map[String,String]
): переменные среды, необходимые для программы
(другой важный — токены безопасности, которые здесь не используются, потому что мой локальный кластер не керберизован)
1
2
3
4
5
6
7
|
def createContainerContext(commands : List[String], resources : Map[String, LocalResource], environment : Map[String, String]) : ContainerLaunchContext = { val launchContext = Records.newRecord(classOf[ContainerLaunchContext]) launchContext.setCommands(commands.asJava) launchContext.setLocalResources(resources.asJava) launchContext.setEnvironment(environment.asJava) launchContext } |
команды
Как я уже сказал, команды — это просто последовательность инструкций, которые вы хотели бы выполнить для запуска ApplicationMaster из оболочки.
1
2
3
4
5
6
7
|
val commands = List( "$JAVA_HOME/bin/java " + " -Xmx256m " + s " com.arunma.ApplicationMaster " + " 1> " + ApplicationConstants.LOG _ DIR _ EXPANSION _ VAR + "/stdout" + " 2> " + ApplicationConstants.LOG _ DIR _ EXPANSION _ VAR + "/stderr" ) |
LocalResources
Что касается этой программы, вам не нужны никакие свойства или файлы конфигурации. Все, что вам нужно, это только двоичный файл jar.
Примечание: способ сделать любой двоичный файл или ресурс доступным для YARN — это поместить двоичные файлы в расположение HDFS. Этот конкретный шаг, связанный с настройкой локальных ресурсов, просто означает, что мы сообщаем YARN загружать двоичные файлы из расположения HDFS и помещать их в локальный путь контейнера при запуске.
1
2
3
|
val localResources = Map( "SampleYarnApp-assembly-0.1.jar" -> setUpLocalResourceFromPath(yarnPath) ) |
Среда
Это пользовательские переменные среды или просто путь к классу, который необходимо установить для запуска вашего пакета.
2. ApplicationMaster
Теперь, когда мы обсудили SampleYarnClient
, давайте обсудим второй. Этот класс, как видно из названия, является вашим ApplicationMaster
(Дух!). Он отвечает за запуск контейнеров, которые, как ожидается, будут выполнять вашу «бизнес-логику». Шаги:
- AppMaster использует клиент ResourceManager (
AMRMClient
) для создания запроса на контейнер —ContainerRequest
. В этом примере используется асинхронная версия клиента —AMRMClientAsync
которая реализует серию обратных вызовов —onContainersAllocated
,onContainersCompleted
,onError
т. Д. - Когда RM выделяет контейнер для приложения,
onContainersAllocated
обратный вызовonContainersAllocated
. - Внутри
onContainersAllocated
(теперь, когда у нас есть дескриптор контейнера), AppMaster затем используетNMClientAsync
для запуска «делового» контейнера (DummyApplication
). Это достигается путем создания другогоContainerLaunchContext
(тот, который оборачивает команды, локальные ресурсы и переменные среды).
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
|
override def onContainersAllocated(containers : util.List[Container]) : Unit = { val commands = List( "$JAVA_HOME/bin/java " + " -Xmx256m " + s " com.arunma.DummyApplication " + " 1> " + ApplicationConstants.LOG _ DIR _ EXPANSION _ VAR + "/stdout" + " 2> " + ApplicationConstants.LOG _ DIR _ EXPANSION _ VAR + "/stderr" ) val localResources = Map( "SampleYarnApp-assembly-0.1.jar" -> setUpLocalResourceFromPath(FileSystem.get(conf).makeQualified( new Path(sys.env( "ARUN_JAR_PATH" )))) ) val containerLaunchContext = createContainerContext(commands, localResources, buildEnvironment(Map())) containers.asScala.foreach { container = > nmClient.startContainerAsync(container, containerLaunchContext) } } |
3. DummyApplication
Это «бизнес-логика». Не Scala
в чистом смысле, но это помогает нам видеть бревна. Обратите внимание, что, поскольку это бесконечный цикл, нам придется принудительно убивать приложение.
1
2
3
4
5
6
|
object DummyApplication extends App { while ( true ) { println( "Niceeeeeeeeee !!! This is the core application that is running within the container that got negotiated by from Application Master !!!" ) Thread.sleep( 1000 ) } } |
Использование:
1
|
$ hadoop jar /Users/arun/IdeaProjects/SampleYarnApp/target/scala- 2.11 /SampleYarnApp-assembly- 0.1 .jar com.arunma.SampleYarnClient /Users/arun/IdeaProjects/SampleYarnApp/target/scala- 2.11 /SampleYarnApp-assembly- 0.1 .jar |
В качестве альтернативы, вы можете просто сделать
1
|
sbt assembly |
и запустите SampleYarnClient
из вашей IDE, SampleYarnClient
в качестве первого аргумента абсолютный путь к файлу сборки.
Опубликовано на Java Code Geeks с разрешения Аруна Маниваннана, партнера нашей программы JCG. Смотрите оригинальную статью здесь: Создание приложения YARN с использованием Scala
Мнения, высказанные участниками Java Code Geeks, являются их собственными. |