В предыдущем  посте мы увидели, что такое мезо, как оно полезно и с чего начать. В этом посте мы увидим, как написать собственный фреймворк на мезо. 
(В mesos под фреймворком 
понимается любое работающее на нем приложение.) В  этом посте рассказывается о фреймворке под названием «mesos-pinspider», который извлекает информацию о профиле пользователя и информацию о пользовательской панели страницы интереса пользователя.
Mesos Framework
В целом, платформа Mesos состоит из трех основных компонентов. 
—  Драйвер,  который отправляет задачи в платформу 
—  Планировщик,  который регистрируется на главном сервере для предлагаемых ресурсов, принимает задачи и запускает их на исполнителе 
—   Процесс исполнителя, который запускается на подчиненных узлах для выполнения задач платформы
Пример Pinspider Framework
Вы можете проверить код здесь на GitHub . Давайте разберем его с PinDriver, PinScheduler и PinUserProfileExecutor.
Водитель
Компонент драйвера структуры является PinDriver.
- Создать информацию об исполнителе
 
Опишите информацию об исполнителе, используя шаблон Builder, а мезо — используйте Google Protocol Buffers для обмена данными. Здесь нам нужно установить executorID, команду, которая в основном представляет собой команду оболочки, выполняемую через: ‘/ bin / sh -c value’. Любые указанные URI выбираются перед выполнением команды. Имя задается с помощью setName (). Источник устанавливается setSource (), строкой стиля идентификатора, используемой фреймворками для отслеживания источника исполнителя. Это полезно, когда возможно, чтобы разные идентификаторы исполнителей были связаны семантически.
Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
- Создать каркасную информацию
 
Опишите основы информации. Поле пользователя используется для определения пользователя Unix, для которого должен быть запущен исполнитель / задача. Если поле пользователя установлено в пустую строку, Mesos автоматически установит его для текущего пользователя. Период времени, в течение которого мастер будет ожидать отработки отказа планировщика, прежде чем удалять структуру, задается setFailoverTimeout (). Имя фреймворка задается с помощью setName ()
Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework"); 
- Создание планировщика
 
Вам нужно создать экземпляр планировщика с количеством задач, которые необходимо отправить для запуска исполнителю.
Scheduler scheduler = args.length == 1 ?new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) : new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);
Примечание: обратите внимание, что используются два ExecutorInfo, т.е. один для получения информации о профиле пользователя и другой для информации о плате пользователя для демонстрации. Это объяснение включает только одну executorinfo — userProfileExecutorInfo
- Запуск драйвера планировщика мезос.
 
MesosSchedulerDriver является реализацией SchedulerDriver, который является абстрактным интерфейсом для подключения планировщика к мезо. Это осуществляется путем управления жизненным циклом планировщика (запуск, остановка и ожидание завершения задач), а также для взаимодействия с Mesos (запуск задач, уничтожение задач и т. Д.).
MesosSchedulerDriver schedulerDriver = new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]); int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop(); System.exit(status);
Реализация исполнителя
Компонент Executor этой структуры — PinUserProfileExecutor. 
Executor — это интерфейс обратного вызова, который реализуется исполнителями фреймворков. В нашей реализации давайте сосредоточимся на launchTask ()
@Override public void launchTask(final ExecutorDriver executorDriver, final Protos.TaskInfo taskInfo) { 
}
- Установите статус задачи, установив идентификатор и состояние с помощью шаблона построителя.
 
Protos.TaskStatus taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build();
- Отправьте обновление состояния в планировщик инфраструктуры, повторяя при необходимости, пока не будет получено подтверждение или исполнитель не будет завершен, и в этом случае будет отправлено обновление состояния TASK_LOST.
 
executorDriver.sendStatusUpdate(taskStatus);
- Получить данные из задач и запустить свою логику.
 
try {   message = ("userprofile :" + getUserProfileInfo(url)).getBytes();  } catch (IOException e) {   LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage());  }
- Отправьте рамке сообщение.
 
executorDriver.sendFrameworkMessage(message);
- Отметьте состояние задачи как выполненное и отправьте обновление статуса планировщику платформы.
 
taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()) .setState(Protos.TaskState.TASK_FINISHED).build();executorDriver.sendStatusUpdate(taskStatus);
- Метод main () для создания экземпляра MesosExecutorDriver и запуска
 
mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1
Реализация планировщика
Компонент Планировщика структуры является PinScheduler.
 
Планировщик — это интерфейс обратного вызова, который реализуется планировщиками фреймворков. В нашей реализации давайте сконцентрируемся на resourceOffers (), statusUpdate () и frameworkMessage ()
 
- Конструктор: построить с информацией об исполнителе и количество задач запуска.
 
 public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) {
  this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch");
 } 
public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor,  int totalTasks, String url) {  this.pinUserProfileExecutor = pinUserProfileExecutor;  this.pinUserBoardExecutor = pinUserBoardExecutor;  this.totalTasks = totalTasks;  this.crawlQueue = Collections.synchronizedList(new ArrayList<String>());  this.crawlQueue.add(url); }
- Ресурсные предложения
 - Предложение ресурсов может быть ресурсами, такими как процессор, память и т. Д. Из списка предложений получите скалярное значение ресурсов. Мы должны предоставить наши требования к ресурсам для задач при настройке информации о задачах.
 
for (Protos.Offer offer : list) {   List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>();   double offerCpus = 0;   double offerMem = 0;   for (Protos.Resource resource : offer.getResourcesList()) {    if (resource.getName().equals("cpus")) {     offerCpus += resource.getScalar().getValue();    } else if (resource.getName().equals("mem")) {     offerMem += resource.getScalar().getValue();    }   }   LOGGER.info("Received Offer : " + offer.getId().getValue() + " with cpus = " + offerCpus + " and mem ="     + offerMem);
- Создать идентификатор задачи.
 
Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
- Создать информацию о задаче, установив идентификатор задачи, добавив ресурсы, установив данные и установив исполнителя.
 
Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue()).setTaskId(taskID).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(CPUS_PER_TASK))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK))).setData(ByteString.copyFromUtf8(crawlQueue.get(0))).setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build();
- Запустите задачи через SchedulerDriver.
 
... taskInfoList.add(pinUserProfileTaskInfo);taskInfoList.add(pinUserBoardTaskInfo);}schedulerDriver.launchTasks(offer.getId(), taskInfoList);
- Обновление статуса
 
Это вызывается, когда статус задачи изменился, т. Е. Ведомое устройство потеряно, и поэтому задание потеряно, задача завершается, и исполнитель отправляет обновление состояния, сообщая об этом.
 
@Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {... }
- Остановите SchedulerDriver, если задачи завершены
 
  if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) {   finishedTasks++;   LOGGER.info("Finished tasks : " + finishedTasks);   if (finishedTasks == totalTasks) {    schedulerDriver.stop();   }  }
- Прервать SchedulerDriver, если задачи убиты, потеряны или не выполнены
 
 if (taskStatus.getState() == Protos.TaskState.TASK_FAILED
    || taskStatus.getState() == Protos.TaskState.TASK_KILLED
    || taskStatus.getState() == Protos.TaskState.TASK_LOST) {
   LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() + " is in unexpected state : "
     + taskStatus.getState().getValueDescriptor().getName() + "with reason : " + taskStatus.getReason()
                            .getValueDescriptor()
                            .getName()
     + " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : "
     + taskStatus.getMessage());
   schedulerDriver.abort();
- Рамочное сообщение
 
Это вызывается, когда исполнитель отправляет сообщение.
 
- Обработайте ваше сообщение
 
@Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID,   Protos.SlaveID slaveID, byte[] bytes) {  String data = new String(bytes);  System.out.println(data);  LOGGER.info("User Profile Information : " + data); }