Статьи

Apache Mesos: создание собственной распределенной среды

В предыдущем  посте мы увидели, что такое мезо, как оно полезно и с чего начать. В этом посте мы увидим, как написать собственный фреймворк на мезо.
(В mesos под фреймворком
понимается любое работающее на нем приложение.) В  этом посте рассказывается о фреймворке под названием «mesos-pinspider», который извлекает информацию о профиле пользователя и информацию о пользовательской панели страницы интереса пользователя.

Mesos Framework

В целом, платформа Mesos состоит из трех основных компонентов. 
—  Драйвер,  который отправляет задачи в платформу
—  Планировщик,  который регистрируется на главном сервере для предлагаемых ресурсов, принимает задачи и запускает их на исполнителе
—   Процесс исполнителя, который запускается на подчиненных узлах для выполнения задач платформы

Пример Pinspider Framework

Вы можете проверить код здесь на  GitHub . Давайте разберем его с PinDriver, PinScheduler и PinUserProfileExecutor.

Водитель

Компонент драйвера структуры является PinDriver. 

  • Создать информацию об исполнителе

Опишите информацию об исполнителе, используя шаблон Builder, а мезо — используйте Google Protocol Buffers для обмена данными. Здесь нам нужно установить executorID, команду, которая в основном представляет собой команду оболочки, выполняемую через: ‘/ bin / sh -c value’. Любые указанные URI выбираются перед выполнением команды. Имя задается с помощью setName (). Источник устанавливается setSource (), строкой стиля идентификатора, используемой фреймворками для отслеживания источника исполнителя. Это полезно, когда возможно, чтобы разные идентификаторы исполнителей были связаны семантически.

Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
  • Создать каркасную информацию

Опишите основы информации. Поле пользователя используется для определения пользователя Unix, для которого должен быть запущен исполнитель / задача. Если поле пользователя установлено в пустую строку, Mesos автоматически установит его для текущего пользователя. Период времени, в течение которого мастер будет ожидать отработки отказа планировщика, прежде чем удалять структуру, задается setFailoverTimeout (). Имя фреймворка задается с помощью setName ()

Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework"); 
  • Создание планировщика

Вам нужно создать экземпляр планировщика с количеством задач, которые необходимо отправить для запуска исполнителю.

Scheduler scheduler = args.length == 1 ?new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) : new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);

Примечание: обратите внимание, что используются два ExecutorInfo, т.е. один для получения информации о профиле пользователя и другой для информации о плате пользователя для демонстрации. Это объяснение включает только одну executorinfo — userProfileExecutorInfo

  • Запуск драйвера планировщика мезос.

MesosSchedulerDriver является реализацией SchedulerDriver, который является абстрактным интерфейсом для подключения планировщика к мезо. Это осуществляется путем управления жизненным циклом планировщика (запуск, остановка и ожидание завершения задач), а также для взаимодействия с Mesos (запуск задач, уничтожение задач и т. Д.). 

MesosSchedulerDriver schedulerDriver = new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]);
int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop();
System.exit(status);

Реализация исполнителя

Компонент Executor этой структуры — PinUserProfileExecutor.
Executor — это интерфейс обратного вызова, который реализуется исполнителями фреймворков. В нашей реализации давайте сосредоточимся на launchTask ()

@Override public void launchTask(final ExecutorDriver executorDriver, final Protos.TaskInfo taskInfo) { 
}
  •  Установите статус задачи, установив идентификатор и состояние с помощью шаблона построителя.
Protos.TaskStatus taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build();
  •  Отправьте обновление состояния в планировщик инфраструктуры, повторяя при необходимости, пока не будет получено подтверждение или исполнитель не будет завершен, и в этом случае будет отправлено обновление состояния TASK_LOST.
executorDriver.sendStatusUpdate(taskStatus);
  • Получить данные из задач и запустить свою логику.
try {   message = ("userprofile :" + getUserProfileInfo(url)).getBytes();  } catch (IOException e) {   LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage());  }
  •  Отправьте рамке сообщение.
executorDriver.sendFrameworkMessage(message);
  • Отметьте состояние задачи как выполненное и отправьте обновление статуса планировщику платформы.
taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()) .setState(Protos.TaskState.TASK_FINISHED).build();executorDriver.sendStatusUpdate(taskStatus);
  •  Метод main () для создания экземпляра MesosExecutorDriver и запуска 
mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1

Реализация планировщика

Компонент Планировщика структуры является PinScheduler.

Планировщик — это интерфейс обратного вызова, который реализуется планировщиками фреймворков. В нашей реализации давайте сконцентрируемся на resourceOffers (), statusUpdate () и frameworkMessage ()

  • Конструктор: построить с информацией об исполнителе и количество задач запуска.
 public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {
  this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch");
 } 
public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor,  int totalTasks, String url) {  this.pinUserProfileExecutor = pinUserProfileExecutor;  this.pinUserBoardExecutor = pinUserBoardExecutor;  this.totalTasks = totalTasks;  this.crawlQueue = Collections.synchronizedList(new ArrayList<String>());  this.crawlQueue.add(url); }
  •  Ресурсные предложения
    •  Предложение ресурсов может быть ресурсами, такими как процессор, память и т. Д. Из списка предложений получите скалярное значение ресурсов. Мы должны предоставить наши требования к ресурсам для задач при настройке информации о задачах.
for (Protos.Offer offer : list) {   List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>();   double offerCpus = 0;   double offerMem = 0;   for (Protos.Resource resource : offer.getResourcesList()) {    if (resource.getName().equals("cpus")) {     offerCpus += resource.getScalar().getValue();    } else if (resource.getName().equals("mem")) {     offerMem += resource.getScalar().getValue();    }   }   LOGGER.info("Received Offer : " + offer.getId().getValue() + " with cpus = " + offerCpus + " and mem ="     + offerMem);
  • Создать идентификатор задачи.
Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
  • Создать информацию о задаче, установив идентификатор задачи, добавив ресурсы, установив данные и установив исполнителя.
Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue()).setTaskId(taskID).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(CPUS_PER_TASK))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK))).setData(ByteString.copyFromUtf8(crawlQueue.get(0))).setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build();
  • Запустите задачи через SchedulerDriver.
...  taskInfoList.add(pinUserProfileTaskInfo);taskInfoList.add(pinUserBoardTaskInfo);}schedulerDriver.launchTasks(offer.getId(), taskInfoList);
  •  Обновление статуса

Это вызывается, когда статус задачи изменился, т. Е. Ведомое устройство потеряно, и поэтому задание потеряно, задача завершается, и исполнитель отправляет обновление состояния, сообщая об этом.

@Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {... }
  • Остановите SchedulerDriver, если задачи завершены
  if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) {   finishedTasks++;   LOGGER.info("Finished tasks : " + finishedTasks);   if (finishedTasks == totalTasks) {    schedulerDriver.stop();   }  }
  • Прервать SchedulerDriver, если задачи убиты, потеряны или не выполнены
 if (taskStatus.getState() == Protos.TaskState.TASK_FAILED
    || taskStatus.getState() == Protos.TaskState.TASK_KILLED
    || taskStatus.getState() == Protos.TaskState.TASK_LOST) {
   LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() + " is in unexpected state : "
     + taskStatus.getState().getValueDescriptor().getName() + "with reason : " + taskStatus.getReason()
                            .getValueDescriptor()
                            .getName()
     + " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : "
     + taskStatus.getMessage());
   schedulerDriver.abort();
  •  Рамочное сообщение

Это вызывается, когда исполнитель отправляет сообщение.

  • Обработайте ваше сообщение
@Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,   Protos.SlaveID slaveID, byte[] bytes) {  String data = new String(bytes);  System.out.println(data);  LOGGER.info("User Profile Information : " + data); }

Полный код доступен 
здесь  с инструкциями для запуска и примера вывода.

Счастливого обучения! 🙂