Статьи

Создание приложения YARN с использованием Scala

Недавно я играл с Apache Amaterasu , это удивительный проект, который помогает развертывать конвейеры данных. Он все еще насиживается и над ним работает супер-дружная команда инженеров. Некоторые интересные функции выстроены в очередь. Не верь мне на слово. Пожалуйста, проверьте это сами .

Amaterasu запускает контейнеры (на YARN / Mesos) самостоятельно для каждого из этапов вашего конвейера данных. Все, что вам нужно, — это ваш репозиторий и конфигурация на основе YAML.

Мне было просто интересно узнать о запуске контейнеров на YARN и о том, как работает API, и подумал, что я должен попробовать сам. Это действительно интуитивно понятно, если мы понимаем небольшой набор конструкций.

Этот пост является попыткой минимального примера приложения YARN в Scala.

Примечание: полный код доступен на github

В этом приложении есть три основных класса:
1. SampleYarnClient
2. ApplicationMaster
3. DummyApplication (бизнес-логика)

1. SampleYarnClient

Это точка входа в программу. Делает следующее:

  1. YarnClient .
  2. Согласовывает ресурсы для контейнера ApplicationMaster с помощью YarnClient . Для этого нужно инициировать ApplicationSubmissionContext который является просто оболочкой для Resource , Priority и ContainerLaunchContext (среди прочих). Давайте быстро рассмотрим код и подробно рассмотрим эти три компонента SubmissionContext.
01
02
03
04
05
06
07
08
09
10
11
val yarnClient = YarnClient.createYarnClient() 
...
val application = yarnClient.createApplication() 
...
val context = application.getApplicationSubmissionContext 
context.setAMContainerSpec(amContainer) 
context.setApplicationName("Scala Yarn App"
context.setResource(resource) 
context.setPriority(priority)
 
yarnClient.submitApplication(context)

а. Ресурс

Ресурсы — это простая оболочка вокруг процессора и памяти

1
val resource = Resource.newInstance(1024, 2) //1 GB memory and 2 cores

б. приоритет

Приоритет — это просто целое число — чем выше число, тем выше приоритет

1
2
val priority = Records.newRecord(classOf[Priority]) 
priority.setPriority(1)

с. ContainerLaunchContext

В этом простом примере ContainerLaunchRequest имеет три основных параметра:

  1. Команды ( List[String] ): команда начальной загрузки (в идеале java <MainClass> )
  2. LocalResources ( Map[String,LocalResource] ): Map[String,LocalResource] и другие артефакты (свойства, библиотеки и т. Д.) Map[String,LocalResource] необходимы для запуска вашей команды.
  3. Environment ( Map[String,String] ): переменные среды, необходимые для программы

(другой важный — токены безопасности, которые здесь не используются, потому что мой локальный кластер не керберизован)

1
2
3
4
5
6
7
def createContainerContext(commands: List[String], resources: Map[String, LocalResource], environment: Map[String, String]): ContainerLaunchContext = {
    val launchContext = Records.newRecord(classOf[ContainerLaunchContext])
    launchContext.setCommands(commands.asJava)
    launchContext.setLocalResources(resources.asJava)
    launchContext.setEnvironment(environment.asJava)
    launchContext
  }

команды

Как я уже сказал, команды — это просто последовательность инструкций, которые вы хотели бы выполнить для запуска ApplicationMaster из оболочки.

1
2
3
4
5
6
7
val commands = List( 
      "$JAVA_HOME/bin/java " +
        " -Xmx256m " +
        s" com.arunma.ApplicationMaster " +
        " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
        " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
    )

LocalResources

Что касается этой программы, вам не нужны никакие свойства или файлы конфигурации. Все, что вам нужно, это только двоичный файл jar.

Примечание: способ сделать любой двоичный файл или ресурс доступным для YARN — это поместить двоичные файлы в расположение HDFS. Этот конкретный шаг, связанный с настройкой локальных ресурсов, просто означает, что мы сообщаем YARN загружать двоичные файлы из расположения HDFS и помещать их в локальный путь контейнера при запуске.

1
2
3
val localResources = Map( 
      "SampleYarnApp-assembly-0.1.jar" -> setUpLocalResourceFromPath(yarnPath)
    )

Среда

Это пользовательские переменные среды или просто путь к классу, который необходимо установить для запуска вашего пакета.

2. ApplicationMaster

Теперь, когда мы обсудили SampleYarnClient , давайте обсудим второй. Этот класс, как видно из названия, является вашим ApplicationMaster (Дух!). Он отвечает за запуск контейнеров, которые, как ожидается, будут выполнять вашу «бизнес-логику». Шаги:

  1. AppMaster использует клиент ResourceManager ( AMRMClient ) для создания запроса на контейнер — ContainerRequest . В этом примере используется асинхронная версия клиента — AMRMClientAsync которая реализует серию обратных вызовов — onContainersAllocated , onContainersCompleted , onError т. Д.
  2. Когда RM выделяет контейнер для приложения, onContainersAllocated обратный вызов onContainersAllocated .
  3. Внутри onContainersAllocated (теперь, когда у нас есть дескриптор контейнера), AppMaster затем использует NMClientAsync для запуска «делового» контейнера ( DummyApplication ). Это достигается путем создания другого ContainerLaunchContext (тот, который оборачивает команды, локальные ресурсы и переменные среды).
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
override def onContainersAllocated(containers: util.List[Container]): Unit = {
 
    val commands = List(
      "$JAVA_HOME/bin/java " +
        " -Xmx256m " +
        s" com.arunma.DummyApplication " +
        " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
        " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
    )
 
    val localResources = Map(
      "SampleYarnApp-assembly-0.1.jar" -> setUpLocalResourceFromPath(FileSystem.get(conf).makeQualified(new Path(sys.env("ARUN_JAR_PATH"))))
    )
 
    val containerLaunchContext = createContainerContext(commands, localResources, buildEnvironment(Map()))
 
    containers.asScala.foreach { container =>
      nmClient.startContainerAsync(container, containerLaunchContext)
    }
}

3. DummyApplication

Это «бизнес-логика». Не Scala в чистом смысле, но это помогает нам видеть бревна. Обратите внимание, что, поскольку это бесконечный цикл, нам придется принудительно убивать приложение.

1
2
3
4
5
6
object DummyApplication extends App { 
     while(true) {
       println("Niceeeeeeeeee !!! This is the core application that is running within the container that got negotiated by from Application Master !!!")
       Thread.sleep(1000)
     }
   }

Использование:

1
$ hadoop jar /Users/arun/IdeaProjects/SampleYarnApp/target/scala-2.11/SampleYarnApp-assembly-0.1.jar com.arunma.SampleYarnClient /Users/arun/IdeaProjects/SampleYarnApp/target/scala-2.11/SampleYarnApp-assembly-0.1.jar

В качестве альтернативы, вы можете просто сделать

1
sbt assembly

и запустите SampleYarnClient из вашей IDE, SampleYarnClient в качестве первого аргумента абсолютный путь к файлу сборки.

Опубликовано на Java Code Geeks с разрешения Аруна Маниваннана, партнера нашей программы JCG. Смотрите оригинальную статью здесь: Создание приложения YARN с использованием Scala

Мнения, высказанные участниками Java Code Geeks, являются их собственными.