В предыдущем посте мы увидели, что такое мезо, как оно полезно и с чего начать. В этом посте мы увидим, как написать собственный фреймворк на мезо.
(В mesos под фреймворком
понимается любое работающее на нем приложение.) В этом посте рассказывается о фреймворке под названием «mesos-pinspider», который извлекает информацию о профиле пользователя и информацию о пользовательской панели страницы интереса пользователя.
Mesos Framework
В целом, платформа Mesos состоит из трех основных компонентов.
— Драйвер, который отправляет задачи в платформу
— Планировщик, который регистрируется на главном сервере для предлагаемых ресурсов, принимает задачи и запускает их на исполнителе
— Процесс исполнителя, который запускается на подчиненных узлах для выполнения задач платформы
Пример Pinspider Framework
Вы можете проверить код здесь на GitHub . Давайте разберем его с PinDriver, PinScheduler и PinUserProfileExecutor.
Водитель
Компонент драйвера структуры является PinDriver.
- Создать информацию об исполнителе
Опишите информацию об исполнителе, используя шаблон Builder, а мезо — используйте Google Protocol Buffers для обмена данными. Здесь нам нужно установить executorID, команду, которая в основном представляет собой команду оболочки, выполняемую через: ‘/ bin / sh -c value’. Любые указанные URI выбираются перед выполнением команды. Имя задается с помощью setName (). Источник устанавливается setSource (), строкой стиля идентификатора, используемой фреймворками для отслеживания источника исполнителя. Это полезно, когда возможно, чтобы разные идентификаторы исполнителей были связаны семантически.
Protos.ExecutorInfo userProfileExecutorInfo = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder().setValue("PinUserProfileExecutor")).setCommand(commandInfoUserProfile).setName("PinUserProfileExecutor Java").setSource("java").build();
- Создать каркасную информацию
Опишите основы информации. Поле пользователя используется для определения пользователя Unix, для которого должен быть запущен исполнитель / задача. Если поле пользователя установлено в пустую строку, Mesos автоматически установит его для текущего пользователя. Период времени, в течение которого мастер будет ожидать отработки отказа планировщика, прежде чем удалять структуру, задается setFailoverTimeout (). Имя фреймворка задается с помощью setName ()
Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder().setFailoverTimeout(120000).setUser("").setName("Pinspider Framework");
- Создание планировщика
Вам нужно создать экземпляр планировщика с количеством задач, которые необходимо отправить для запуска исполнителю.
Scheduler scheduler = args.length == 1 ?new PinScheduler(userProfileExecutorInfo,userBoardExecutorInfo) : new PinScheduler(userProfileExecutorInfo, userBoardExecutorInfo, Integer.parseInt(args[1]), args[2]);
Примечание: обратите внимание, что используются два ExecutorInfo, т.е. один для получения информации о профиле пользователя и другой для информации о плате пользователя для демонстрации. Это объяснение включает только одну executorinfo — userProfileExecutorInfo
- Запуск драйвера планировщика мезос.
MesosSchedulerDriver является реализацией SchedulerDriver, который является абстрактным интерфейсом для подключения планировщика к мезо. Это осуществляется путем управления жизненным циклом планировщика (запуск, остановка и ожидание завершения задач), а также для взаимодействия с Mesos (запуск задач, уничтожение задач и т. Д.).
MesosSchedulerDriver schedulerDriver = new MesosSchedulerDriver(scheduler,frameworkBuilder.build(), args[0]); int status = schedulerDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;schedulerDriver.stop(); System.exit(status);
Реализация исполнителя
Компонент Executor этой структуры — PinUserProfileExecutor.
Executor — это интерфейс обратного вызова, который реализуется исполнителями фреймворков. В нашей реализации давайте сосредоточимся на launchTask ()
@Override public void launchTask(final ExecutorDriver executorDriver, final Protos.TaskInfo taskInfo) { }
- Установите статус задачи, установив идентификатор и состояние с помощью шаблона построителя.
Protos.TaskStatus taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build();
- Отправьте обновление состояния в планировщик инфраструктуры, повторяя при необходимости, пока не будет получено подтверждение или исполнитель не будет завершен, и в этом случае будет отправлено обновление состояния TASK_LOST.
executorDriver.sendStatusUpdate(taskStatus);
- Получить данные из задач и запустить свою логику.
try { message = ("userprofile :" + getUserProfileInfo(url)).getBytes(); } catch (IOException e) { LOGGER.error("Error parsing the Pinterest URL :" + e.getMessage()); }
- Отправьте рамке сообщение.
executorDriver.sendFrameworkMessage(message);
- Отметьте состояние задачи как выполненное и отправьте обновление статуса планировщику платформы.
taskStatus = Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()) .setState(Protos.TaskState.TASK_FINISHED).build();executorDriver.sendStatusUpdate(taskStatus);
- Метод main () для создания экземпляра MesosExecutorDriver и запуска
mesosExecutorDriver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1
Реализация планировщика
Компонент Планировщика структуры является PinScheduler.
Планировщик — это интерфейс обратного вызова, который реализуется планировщиками фреймворков. В нашей реализации давайте сконцентрируемся на resourceOffers (), statusUpdate () и frameworkMessage ()
- Конструктор: построить с информацией об исполнителе и количество задач запуска.
public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor , Protos.ExecutorInfo pinUserBoardExecutor ) { this(pinUserProfileExecutor,pinUserBoardExecutor, 5, "http://www.pinterest.com/techcrunch"); } public PinScheduler(Protos.ExecutorInfo pinUserProfileExecutor,Protos.ExecutorInfo pinUserBoardExecutor, int totalTasks, String url) { this.pinUserProfileExecutor = pinUserProfileExecutor; this.pinUserBoardExecutor = pinUserBoardExecutor; this.totalTasks = totalTasks; this.crawlQueue = Collections.synchronizedList(new ArrayList<String>()); this.crawlQueue.add(url); }
- Ресурсные предложения
- Предложение ресурсов может быть ресурсами, такими как процессор, память и т. Д. Из списка предложений получите скалярное значение ресурсов. Мы должны предоставить наши требования к ресурсам для задач при настройке информации о задачах.
for (Protos.Offer offer : list) { List<Protos.TaskInfo> taskInfoList = new ArrayList<Protos.TaskInfo>(); double offerCpus = 0; double offerMem = 0; for (Protos.Resource resource : offer.getResourcesList()) { if (resource.getName().equals("cpus")) { offerCpus += resource.getScalar().getValue(); } else if (resource.getName().equals("mem")) { offerMem += resource.getScalar().getValue(); } } LOGGER.info("Received Offer : " + offer.getId().getValue() + " with cpus = " + offerCpus + " and mem =" + offerMem);
- Создать идентификатор задачи.
Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(Integer.toString(launchedTasks++)).build();
- Создать информацию о задаче, установив идентификатор задачи, добавив ресурсы, установив данные и установив исполнителя.
Protos.TaskInfo pinUserProfileTaskInfo = Protos.TaskInfo.newBuilder().setName("task " + taskID.getValue()).setTaskId(taskID).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(CPUS_PER_TASK))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(MEM_PER_TASK))).setData(ByteString.copyFromUtf8(crawlQueue.get(0))).setExecutor(Protos.ExecutorInfo.newBuilder(pinUserProfileExecutor)).build();
- Запустите задачи через SchedulerDriver.
... taskInfoList.add(pinUserProfileTaskInfo);taskInfoList.add(pinUserBoardTaskInfo);}schedulerDriver.launchTasks(offer.getId(), taskInfoList);
- Обновление статуса
Это вызывается, когда статус задачи изменился, т. Е. Ведомое устройство потеряно, и поэтому задание потеряно, задача завершается, и исполнитель отправляет обновление состояния, сообщая об этом.
@Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) {... }
- Остановите SchedulerDriver, если задачи завершены
if (taskStatus.getState() == Protos.TaskState.TASK_FINISHED) { finishedTasks++; LOGGER.info("Finished tasks : " + finishedTasks); if (finishedTasks == totalTasks) { schedulerDriver.stop(); } }
- Прервать SchedulerDriver, если задачи убиты, потеряны или не выполнены
if (taskStatus.getState() == Protos.TaskState.TASK_FAILED || taskStatus.getState() == Protos.TaskState.TASK_KILLED || taskStatus.getState() == Protos.TaskState.TASK_LOST) { LOGGER.error("Aborting because the task " + taskStatus.getTaskId().getValue() + " is in unexpected state : " + taskStatus.getState().getValueDescriptor().getName() + "with reason : " + taskStatus.getReason() .getValueDescriptor() .getName() + " from source : " + taskStatus.getSource().getValueDescriptor().getName() + " with message : " + taskStatus.getMessage()); schedulerDriver.abort();
- Рамочное сообщение
Это вызывается, когда исполнитель отправляет сообщение.
- Обработайте ваше сообщение
@Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, Protos.SlaveID slaveID, byte[] bytes) { String data = new String(bytes); System.out.println(data); LOGGER.info("User Profile Information : " + data); }