Статьи

Простая кластерная система распределения задач

В этой статье будут представлены основные концепции JGroups, а затем реализована система распределения задач (поверх JGroups), где задачи можно размещать в кластере и выполнять рабочими узлами.

Я покажу, что рабочие узлы могут быть добавлены во время выполнения, чтобы увеличить вычислительную мощность, или отключены, когда у нас мало нагрузки. Кроме того, задачи, назначенные работникам, которые впоследствии терпят крах, автоматически переназначаются на работающие узлы.

Мы могли бы реализовать это с помощью очередей JMS. Однако, когда у нас большая нагрузка, JMS-сервер становится узким местом. В нашем децентрализованном решении каждый узел в кластере может быть как ведущим (который представляет задачи), так и ведомым (который выполняет задачи и возвращает результаты).

обзор

JGroups — это кластерная библиотека. Приложения могут использовать JGroups для присоединения к кластеру, отправки сообщений на узлы кластера, получения уведомлений о присоединении или выходе других узлов (включая сбои) и выхода из кластера. Его задача — надежная отправка сообщений внутри кластера. Его объем намного меньше, чем JMS; JGroups не знает об очередях, темах и транзакциях, но только об отправке сообщений.

Главной особенностью JGroups является стек протоколов и полученная гибкость конфигурации.
Приложения могут выбирать свойства, которые они хотели бы в кластере, просто редактируя файл XML.
Например, приложение может добавить сжатие, просто добавив в
конфигурацию протокол COMPRESS .

Или он может удалить фрагментацию, потому что его сообщения всегда будут меньше чем 65 КБ (по UDP), или потому что он использует TCP в качестве транспорта.
Другое приложение может добавить шифрование и аутентификацию, поэтому сообщения шифруются, и только
узлы, которые представляют действительный сертификат X.509, могут присоединиться к кластеру.
Приложения могут свободно писать свои собственные протоколы (или расширять существующие) и добавлять их в
конфигурацию. Это может быть полезно, например, для добавления протокола, который отслеживает все сообщения, отправленные
и полученные по кластеру, для целей аудита или статистики. Архитектура JGroups показана на рис. 1.

Основным API для клиентов является канал (см. Ниже), который используется для отправки и получения сообщений. Когда
сообщение отправляется, оно передается по стеку протоколов. Стек представляет собой список протоколов, и каждый протокол
получает возможность что-то сделать с сообщением.
Например, протокол фрагментации может проверять размер сообщения. Если сообщение больше
настроенного размера, оно может разбить его на несколько небольших сообщений и отправить их в
стек.

[Img_assist | NID = 5314 | название = | убывание = | ссылка = нет | ALIGN = не определено | ширина = 600 | высота = 555]

На стороне приема протокол фрагментации будет помещать фрагменты в очередь, пока все они не будут
получены, затем собрать их в исходное сообщение и передать его.
Протоколы, поставляемые с JGroups, можно разделить на следующие категории:

● Транспорт: отправка и получение сообщений. UDP использует IP-рассылку и / или UDP-
дейтаграммы. TCP использует TCP-соединения.
● Обнаружение: первоначальное обнаружение узлов.
● Объединение: после восстановления сетевого раздела это объединяет подкластеры в один.
● Обнаружение сбоев: мониторинг узлов кластера и уведомления о возможных сбоях или зависаниях.
● Надежная доставка: гарантирует, что сообщение не будет. потерял, получил только один раз, и получил в
порядок, в котором отправитель отправил его. Это делается путем присвоения порядковых номеров каждому
сообщению и повторной передачи в случае пропущенного сообщения.
● Стабильность: узлы должны буферизовать все сообщения (для возможной повторной передачи). Протокол стабильности
гарантирует, что периодически (или на основе накопленного размера) сообщения, полученные
всеми узлами кластера, удаляются, чтобы их можно было собирать мусором.
● Членство в группе: отслеживает узлы в кластере и уведомляет приложение о
присоединении и выходе узлов (включая сбои).
● Управление потоком: гарантирует, что отправитель не может отправлять сообщения быстрее, чем получатели могут
обрабатывать их, в течение более длительного времени. , Это необходимо для предотвращения ситуаций нехватки памяти. поток
контроль — это противоположность стабильности.
● Фрагментация: фрагментирует большие сообщения на более мелкие и повторно собирает их в
получателях.
● Передача состояния: обеспечивает
правильную передачу общего состояния кластера (например, всех сеансов HTTP) на новый узел.
● Сжатие: сжимает сообщения и распаковывает их. их на приемниках
● Шифрование: шифрует сообщения
● Аутентификация: предотвращает присоединение неавторизованного узла к кластеру

API

Основным API является org.jgroups.JChannel:

public class JChannel extends Channel {
public JChannel(String properties) throws ChannelException;
public void setReceiver(Receiver r);
public void connect(String cluster_name) throws ChannelException;
public void send(Message msg) throws ChannelException;
public View getView();
public Address getLocalAddress();
public void disconnect();
public void close();
}

Мы присоединяемся к кластеру, создав канал и вызвав connect ():

Channel ch=new JChannel(“/home/bela/udp.xml”);
ch.setReceiver(new ReceiverAdapter() {
public void receive(Message msg) {}
});
ch.connect(“demo-cluster”);

Это создает канал со стеком протоколов, определенным в /home/bela/udp.xml. Если приложению требуются
другие свойства, оно изменит файл udp.xml и передаст измененный XML-файл конструктору.
Затем мы устанавливаем Receiver, который имеет обратные вызовы, которые вызываются при получении сообщений. Наконец, мы
присоединяемся к кластеру «demo-cluster». Все каналы с одинаковой конфигурацией и одинаковым именем кластера
(аргумент connect ()) присоединятся к одному кластеру.

Узел может отправлять и получать сообщения, как только он присоединился к кластеру. Интерфейс Receiver имеет 2 метода, которые нас интересуют:

void receive(Message msg);
void viewAccepted(View new_view);

Обратный вызов receive () вызывается всякий раз, когда сообщение принимается (обратите внимание, что receive () может быть вызван одновременно, когда мы получаем сообщения от разных отправителей, поэтому он должен быть повторно входящим) . Его аргумент — org.jgroups.Message:

public class Message implements Streamable {
protected Address dest_addr=null;
protected Address src_addr=null;
private byte[] buf=null;
public byte[] getBuffer();
public void setBuffer(byte[] b);
}

Сообщение имеет адрес получателя (dest_addr), отправителя (src_addr) и полезную нагрузку (buf).
Адрес является непрозрачным класс идентификации узла однозначно в cluster2. Нулевой адрес назначения
означает, что сообщение должно быть отправлено на все узлы кластера (многоадресная
передача ), а ненулевой адрес назначения означает отправку сообщения одному получателю (одноадресная передача).
При получении сообщения приложение может вызвать getBuffer (), чтобы извлечь буфер byte [], а затем
распаковать его в данные, значимые для приложения.

Обратный вызов viewAccepted () вызывается, когда узел присоединяется или уходит. Его единственный параметр — это View,
который по сути является списком адресов. Представления принимаются всеми узлами кластера в одинаковом
порядке, поэтому, когда у нас есть кластер {A, B, C}, узлы имеют следующие представления:
● A: {A, B, C}
● B: {A , B, C}
● C: {A, B, C}
Если бы у нас был новый узел D, присоединяющийся, то представление стало бы {A, B, C, D}. В случае сбоя B все
установят {A, C, D}. Как мы видим, узлы в представлении упорядочены в соответствии со временем соединения.
Самый старый узел всегда первый.
Текущий вид также можно получить из канала, вызвав Channel.getView ().


Давайте теперь обсудим остальные методы JChannel.

Метод send () принимает сообщение и отправляет его всем узлам кластера, если адресат сообщения имеет значение NULL,
или одному узлу, если пункт назначения не NULL. Приложению необходимо упорядочить свои данные в
буфере byte [] и установить его в сообщение с помощью Message.setBuffer ().

Чтобы получить текущее представление, мы можем использовать Channel.getView (), а для извлечения локального адреса узла мы
вызываем Channel.getLocalAddress (). Метод disconnect () покидает кластер, а close () уничтожает канал. Закрытый канал не может быть открыт снова. Вызов close () также отключает канал, если он еще не отключен.

Вызов disnect () установит новое представление во всех узлах кластера, а viewAccepted () будет вызвано во
всех получателях.

Как мы увидим при построении нашей системы распределения задач, использование JGroups позволяет нам сосредоточиться на имеющейся
системе и не беспокоиться о проблемах кластеризации, поскольку JGroups делает тяжелую работу. В
основные функции , используемые в нашей системе управления членством (мы должны знать , кто узлы кластера
находятся, и когда узел присоединяется или листья) и надежный обмен сообщениями (для отправки задания). Кроме того, изменяя
конфигурацию протокола, мы можем адаптировать транспорт к нашим потребностям.

Система распределения задач

Идея очень проста: у нас есть кластер узлов, и каждый узел может отправлять задачи для выполнения
другим узлом в кластере. Таким образом, каждый узел является равноправным, в том смысле, что он может отправлять и обрабатывать
задачи. В реальном приложении клиенты подключаются к любому из узлов, например, через TCP или RMI, и
отправляют задачи этому узлу, который затем распространяет его на какой-то другой узел (или обрабатывает сам).
При отправке задачи мы выбираем случайное целое число, которое затем сопоставляется с рангом узла в
кластере (int mod N, где N — размер кластера). Ранг — это позиция узла в представлении, и поскольку
представление одинаково во всех узлах, ранг уникально идентифицирует узел.

Затем задача является многоадресной (EXECUTE) по всему кластеру. Каждый узел добавляет задачу в хэш-карту,
состоящую из задач и адресов их отправителей (JGroups).

Каждый узел теперь сравнивает ранг, поставленный с задачей, с его собственным рангом. Если это не соответствует, ничего не
сделано. Если это соответствует, узел должен обработать задачу. Он делает это и возвращает результат отправителю.
Когда отправитель получает ответ (RESULT), он отправляет сообщение REMOVE по всему
кластеру. После получения REMOVE (T) каждый узел удаляет T из своей карты хешей

Если узел X дает сбой (или уходит изящно), мы знаем, какие задачи были ему назначены, просматривая
задачи в хэш-карте с ключом X. Все задачи, которые все еще присутствуют в хэш-карте, еще не были
обработаны и нуждаются в этом. для повторного выполнения, на этот раз другим узлом. Это делается путем сравнения ранга,
поставляемого с задачей, с рангом узла и выполнения его, если ему соответствует собственный ранг узла.
Если мастер M дает сбой после того, как отправил несколько задач, но еще не получил результаты,
ведомые удаляют все задачи, представленные M, потому что M больше не будет нуждаться в результатах3.
На рис. 2 показано, как выглядит распределение задач в кластере.

Кластер состоит из узлов A, B, C и D. Клиенты могут получить доступ к любому из них. Задача, представленная для
Пример для B клиент может назначить 23 для задачи. Затем B многоадресно передает сообщение EXECUTE (23, TASK)
всем узлам кластера, и каждый узел добавляет задачу № 23 в свой кэш.
Тем не менее, единственная задача обработки узла № 23 — это A (которой 23 сопоставляется с), которая затем отправляет
результат в виде RESULT (23, OBJ) на B. B возвращает результат OBJ клиенту и выполняет многоадресную передачу
REMOVE ( 23) сообщение для кластера, в результате которого все узлы удаляют задачу № 23 из своих кэшей.
Если бы при обработке задачи № 23 произошел сбой, какой-то другой узел принял бы управление, обработал бы
результат и отправил его обратно в B.

[Img_assist | NID = 5316 | название = | убывание = | ссылка = нет | ALIGN = не определено | ширина = 500 | Высота = 599]

Код

Давайте посмотрим, как мы реализуем это4. Мы напишем класс Server, который имеет цикл main () и принимает
запросы, которые должны быть распределены по кластеру. Это имитирует реальных клиентов, отправляющих запросы на любой
узел кластера. Затем мы можем запустить несколько серверных процессов в кластере или на одном хосте (для
демонстрации).

Во-первых, нам нужен идентификатор (ClusterID), который уникален для всего кластера и используется для определения
, принимает ли узел задачу. Экземпляр ClusterID создается для каждой задачи. Класс выглядит следующим
образом:

public class ClusterID implements Streamable {
private Address creator;
private int id;
private static int next_id=1;
public static synchronized ClusterID create(Address addr) {
return new ClusterID(addr, next_id++);
}
}

Реализация Streamable позволяет JGroups более эффективно маршалировать и демаршировать объекты.

ClusterID имеет адрес узла, который его создал, и идентификатор, который увеличивается на каждый
вызов create (). Если бы мы использовали только идентификаторы, потому что каждый узел мог потенциально отправлять задачи, мы могли бы в конечном итоге получить
задачу № 23 для отправки A и задачу № 23 для узла C, и это привело бы к проблемам с
перезаписью записей задач в хэш-карте кэша. Префикс идентификатор с его создателем даст
A :: 23 и C :: 23, которые являются 2 различными задачами.

Затем мы определяем интерфейсы Master и Slaves:

public interface Master {
Object submit(Task task, long timeout) throws Exception;
}
public interface Slave {
Object handle(Task task);
}

Эти интерфейсы реализованы нашим классом Server, который выполняет основную часть работы.

Метод submit () принимает задачу (см. Ниже) и время ожидания. Может выдать исключение или вернуть
результат. Обратите внимание, что и подклассы Task, и результат должны быть сериализуемыми или Streamable, потому что
они потенциально отправляются по сети для выполнения.

Метод handle (Task t) вызывается на ведомом устройстве, которое является рабочим узлом, который решает, что он будет
обрабатывать задачу. Обычно он использует данные, поставляемые с задачей (подкласс), и возвращает объект, который
должен быть сериализуемым, поскольку в большинстве случаев он будет отправляться обратно отправителю через сеть.
Далее мы определим задачу:

public interface Task extends Serializable {
public abstract Object execute();
}

Задача содержит все необходимые данные, которые отправляются на ведомое устройство. Затем метод execute () использует эти
данные для выполнения обработки и возвращает результат, который отправляется обратно мастеру, который отправил эту
задачу.

Теперь, когда мы определили все вспомогательные классы и интерфейсы, давайте начнем писать Сервер:

public class Server extends ReceiverAdapter implements Master, Slave {
private String props="udp.xml";
private Channel ch;
private final ConcurrentMap<ClusterID,Entry> tasks;
private final ExecutorService thread_pool=Executors.newCachedThreadPool();
private View view;
private int rank=-1;
private int cluster_size=-1;
public void start() throws Exception {
ch=new JChannel(props);
ch.setReceiver(this);
ch.connect("dzone-demo");
}
public void stop() {
thread_pool.shutdown();
ch.close();
}

Сервер реализует интерфейсы Master и Slave, что означает, что Сервер может действовать как клиент
(Master) и как сервер (Slave). Таким образом, название «Сервер» на самом деле является неправильным, так как это
явно больше, чем сервер!

Далее нам нужно несколько переменных экземпляра. Например, нам нужен канал JGroups (ch), который должен
быть настроен со свойствами (props), определяющими конфигурацию стека протоколов.
Нам также нужен пул потоков (thread_pool) для выполнения задач, которые мы получаем для обработки. Здесь мы выбрали
простой пул, который создает новые потоки при необходимости и удаляет потоки, которые простаивали
более 60 секунд.

Хэш-карта ‘tasks’ — это кэш для полученных задач. Он имеет ключ ClusterId и имеет значения Entry
экземпляры (см. ниже).

Переменные view, rank и cluster_size необходимы для определения того, обрабатывать или нет полученную
задачу. Подробнее об этом позже.

В start () мы создаем JChannel на основе свойств, переданных на сервер, и подключаем его, что приводит к его
присоединению к кластеру. Кроме того, мы устанавливаем Receiver, что означает, что мы будем получать
обратные вызовы receive (Message) и viewAccepted (View) всякий раз, когда получено сообщение или изменение представления.
В stop () мы закрываем пул потоков и закрываем канал, в результате чего этот узел
грациозно покидает кластер . Все остальные, подключенные к этому кластеру, получат изменение представления (обратный вызов viewAccepted ()),
уведомляющее их о завершении этого узла.

Класс Entry (внутренний класс Server) показан ниже:

private static class Entry {
private final Task task;
private final Address submitter;
private final Promise<Object> promise=new Promise<Object>();
}

Это оболочка для задачи, адрес отправителя и обещание (аналогично будущему), которое используется для
блокировки до получения результата. Адрес отправителя задания необходим для отправки
результата обратно. Это необходимо, когда узел, отличный от первоначально назначенного, вступает во владение и
обрабатывает задачу.

Другой внутренний класс Сервера — это Запрос, который используется для отправки запросов и ответов между отправителями
(мастерами) и ведомыми:

public static class Request implements Streamable {
static enum Type {EXECUTE, RESULT, REMOVE};
private Type type;
private Task task;
private ClusterID id;
private Object result;
}

Запрос также реализует Streamable (реализация не показана), который обеспечивает более эффективную
сортировку. Мы отправляем 3 типа запросов:

1. EXECUTE: многоадресная передача отправителем всем узлам кластера. Он содержит задачу и
идентификатор ClusterID, сгенерированный отправителем и используемый подчиненным для определения,
принимать или не принимать задачу. Обратите внимание, что только один ведомый во всем кластере будет выполнять данную задачу.

2. РЕЗУЛЬТАТ: содержит ClusterID и объект (может быть нулевым, если ничего не возвращается, например, вызывая
метод void). Это одноадресная передача от подчиненного к хозяину, который отправил задачу.

3. УДАЛИТЬ: содержит только ClusterID и является многоадресной передачей отправителем задачи T после
Результат для T был получен. Каждый удаляет задачу из своего кэша при получении
этого сообщения.

Теперь, когда у нас есть все кусочки, пришло время взглянуть на метод submit ():

public Object submit(Task task, long timeout) throws Exception {
ClusterID id=ClusterID.create(ch.getLocalAddress());
try {
Request req=new Request(Request.Type.EXECUTE, task, id, null);
byte[] buf=Util.streamableToByteBuffer(req);
Entry entry=new Entry(task, ch.getLocalAddress());
tasks.put(id, entry);
ch.send(new Message(null, null, buf));
return entry.promise.getResultWithTimeout(timeout);
}
catch(Exception ex) {
tasks.remove(id); // remove it again
throw ex;
}
}

Это реализация интерфейса Master. Он генерирует ClusterId и создает
экземпляр запроса типа EXECUTE, содержащий задачу и идентификатор ClusterID.
Util.streamableToByteBuffer () — это вспомогательный метод, предоставляемый JGroups, который принимает
объект Streamable и размещает его в буфере byte []. Помните, что JGroups может отправлять только байтовые [] буферы по
проводам.

Затем мы добавляем задачу в наш локальный кеш, создаем сообщение с адресом null (= multicast) и
маршализованным запросом.

Наконец, мы блокируем обещание вступления, пока не будет получен результат или не получим исключение (например,
тайм-аут)

Ведомая часть для обработки полученных задач проста:

public Object handle(Task task) {
return task.execute();
}

Мы просто берем задачу и вызываем execute ().

В методе Server.start () мы создали JChannel и установили Receiver в качестве самого сервера. Мы
реализуем 2 метода: receive (Message) и viewAccepted (View). Метод receive () показан
ниже:

public void receive(Message msg) {
Request req=(Request)Util.streamableFromByteBuffer(Request.class, msg.getBuffer());
switch(req.type) {
case EXECUTE:
handleExecute(req.id, msg.getSrc(), req.task);
break;
case RESULT:
Entry entry=tasks.get(req.id);
entry.promise.setResult(req.result);
multicastRemoveRequest(req.id);
break;
case REMOVE:
tasks.remove(req.id);
break;
}
}

Метод receive () обрабатывает все запросы и ответы. После получения сообщения нам нужно захватить
его буфер byte [], разархивировать его в запрос и затем обработать запрос. Для этого мы используем вспомогательный
метод JGroups Util.streamableFromByteBuffer ().

При получении EXECUTE мы вызываем handleExecute (), передавая ему ClusterID, отправителя и задачу
.

При получении РЕЗУЛЬТАТА (отправленного ведомым) мы устанавливаем результат в обещании, освобождая заблокированного отправителя
задачи. Затем мы передаем многоадресный запрос REMOVE.

При получении REMOVE мы просто удаляем задачу из нашего кэша.
Метод handleExecute () проверяет, должен ли узел принять задачу, и, если да, передает ее в
пул потоков для выполнения:

private void handleExecute(ClusterID id, Address sender, Task task) {
tasks.putIfAbsent(id, new Entry(task, sender));
int index=id.getId() % cluster_size;
if(index != rank)
return;
thread_pool.execute(new Handler(id, sender, task);
}

Во-первых, мы добавляем задачу в наш кэш задач с ключом ClusterID5. Затем мы берем идентификатор ClusterID, по модулю количество узлов в кластере. Это ранг узла, который должен выполнить задачу.
Если он соответствует нашему собственному рангу, мы создаем обработчик и передаем его в пул потоков для выполнения в отдельном потоке, в противном случае мы возвращаемся из handleExecute (). Класс Handler показан ниже:

private class Handler implements Runnable {
final ClusterID id;
final Address sender;
final Task task;
public void run() {
Object result=null;
try {
result=handle(task);
}
catch(Throwable t) {
result=t;
}
Request response=new Request(Request.Type.RESULT, null, id, result);
byte[] buf=Util.streamableToByteBuffer(response);
ch.send(new Message(sender, null, buf));
}
}

Он выполняет задачу с интерфейсом Slave (метод handle ()) и сохраняет результат. Если есть
исключение, то исключение (которое по умолчанию сериализуемо) сохраняется как результат.

Затем из результата создается объект Response. Util.streamableToByteBuffer () вызывается для генерации буфера byte [] из ответа, который затем помещается в сообщение и отправляется по каналу
исходному отправителю задачи.

Наш код теперь почти завершен. Единственное, чего не хватает, так это обработки изменений членства.
Помните, что нам нужно повторно отправлять задачи из сбойных узлов или из узлов, которые вышли изящно, в другие
узлы. Это делается в viewAccepted (View):

public void viewAccepted(View view) {
List<Address> left_members=Util.leftMembers(this.view, view);
this.view=view;
Address local_addr=ch.getLocalAddress();
cluster_size=view.size();
Vector<Address> mbrs=view.getMembers();
for(int i=0; i < mbrs.size(); i++) {
Address tmp=mbrs.get(i);
if(tmp.equals(local_addr)) {
rank=i;
break;
}
}
if(left_members != null && !left_members.isEmpty()) {
for(Address mbr: left_members)
handleLeftMember(mbr);
}
}

 

Сначала мы определяем, какие члены остались между новым и предыдущим представлениями. Это делается с помощью Util.leftMembers (), которая возвращает список адресов узлов, оставленных между двумя представлениями. Затем мы устанавливаем локальный адрес (Channel.getLocalAddress ()), cluster_size и view.

Затем ранг вычисляется путем итерации нового членства и сравнения каждого элемента с локальным адресом. На совпадении наш ранг — счетчик итераций. Например, если у нас есть членство {A, B, C, D, E} и мы C, тогда наш ранг будет равен 2.

Наконец, нам нужно определить, остались ли какие-либо узлы с момента предыдущего просмотра, и Есть ли какие-либо задачи, чтобы взять на себя от них. Это делается путем перебора всех левых элементов (если они есть) и вызова handleLeftMember (), как показано ниже:

private void handleLeftMember(Address mbr) {
for(Map.Entry<ClusterID,Entry> entry: tasks.entrySet()) {
ClusterID id=entry.getKey();
int index=id.getId() % cluster_size;
if(index != rank)
return;
Entry val=entry.getValue();
if(mbr.equals(val.submitter)) {
continue;
}
execute(id, val.submitter, val.task);
}
}

Этот метод перебирает все записи в кэше и сравнивает идентификатор (размер кластера по модулю) с нашим собственным
рангом. Если он совпадает, мы выполняем задачу (если только сам отправитель не ушел, в этом случае мы отбрасываем
задачу).

Обратите внимание, что и rank, и cluster_size могут измениться при получении нового представления. Оба назначения должны
произойти до вызова handleLeftMember (), так как этот метод использует 2 переменные. Например, если у нас
есть кластер {A, B, C, D, E}, то C имеет ранг = 2, а D имеет ранг = 3. Если C падает, ранги D и E меняются:
ранг D теперь равен 2. Это означает, что D будет обрабатывать все задачи, которые C обрабатывал и которые
не были выполнены к моменту сбоя C (в противном случае C удалил бы их).

Приведенный выше код имеет недостаток: если предположить, что у нас есть кластер {A, B, C, D} и A, то
ранги B, C и D меняются: B с 1 до 0, C с 2 до 1 и D от 3 до 2.

Это означает, что B, C и D теперь будут выполнять задачи, над которыми уже работали другие
узлы. Например, C будет повторно выполнять задачи D, а B будет повторно выполнять задачи C. Это не неверно, так как
отправитель задачи удалит ее после завершения. Таким образом, при получении результата R от
подчиненного для задачи, которая уже выполнена и, следовательно, удалена, отправитель просто отбрасывает R.
Это не является ошибкой, но приводит к ложной и ненужной обработке. Лучший способ определить ранг
будет использовать согласованное хеширование ([2]), которое минимизирует изменения ранга и, следовательно, повторное
выполнение задач, над которыми уже работают другие узлы.

Наш код теперь завершен. Последнее, что нужно сделать, это написать код драйвера, который мы также добавляем на сервер:

public static void main(String[] args) throws Exception {
Server server=new Server(props);
server.start();
loop(server);
}
private static void loop(Server server) {
boolean looping=true;
while(looping) {
int key=Util.keyPress("[1] Submit [2] Submit long running task [q] Quit");
switch(key) {
case '1':
Task task=new Task() {
public Object execute() {
return new Date();
}
};
log("<== result = " + server.submit(task, 30000));
break;
case '2':
task=new Task() {
public Object execute() {
Util.sleep(15000);
return new Date();
}
};
log("<== result = " + server.submit(task, 30000));
break;
}
}
server.stop();
}

 

Метод main () создает сервер и запускает его. Метод loop () ожидает нажатия клавиши, а затем
отправляет кратковременную (для «1») или длительную (для «2») задачу. Задача просто возвращает новую дату с
текущим временем. Длительное задание спит 15 секунд перед возвратом даты. Когда
нажимается «q» , мы корректно останавливаем сервер и возвращаемся.

демонстрация

Посмотрим, работает ли эта штука! Демо JAR можно скачать здесь. Давайте начнем несколько экземпляров и представим несколько задач. Чтобы запустить экземпляр, мы запускаем:


[linux] / home / bela / JGroupsArticles / dist $ java -Djgroups.bind_addr = 192.168.1.5 -jar demo.jar

———————— ——————————-

GMS: адрес 192.168.1.5:33795

———- ———————————————

просмотр: [192.168. 1.5: 33795 | 0] [192.168.1.5:33795]

Мой ранг 0

[1] Отправить [2] Отправить долгосрочное задание [3] Информация [q] Выйти

Замените IP-адрес, установленный на -Djgroups.bind_addr, на адрес действительного сетевого адаптера. Если вы не установите это свойство, JGroups выбирает случайный сетевой адаптер.

Мы видим, что мы являемся первым узлом в кластере, наш локальный адрес — 192.168.1.5:33795, и наш ранг
равен 0. Когда мы отправляем задачу, мы видим, что она выполняется нами самостоятельно, так как мы являемся единственный узел в кластере:


[1] Отправить [2] Отправить долгосрочное задание [3] Информация [q] Выйти

1

==> отправить 192.168.1.5:33795::1

выполнить 192.168.1.5:33795::1

<== результат = вт 09 сентября 12 : 38: 55 CEST 2008

Давайте начнем второй экземпляр:


[linux] / home / bela / JGroupsArticles / dist $ java -Djgroups.bind_addr = 192.168.1.5 -jar demo.jar

———————— ——————————-

GMS: адрес 192.168.1.5:41232

———- ———————————————

просмотр: [192.168. 1.5: 33795 | 1] [192.168.1.5:33795, 192.168.1.5:41232]

мой ранг 1

[1] Отправить [2] Отправить долгосрочное задание [3] Информация [q] Выйти

Мы можем видеть, что представление теперь имеет 2 члена: 192.168.1.5:33795 с рангом = 0 и 192.168.1.5:41232
(запущен второй экземпляр) с рангом = 1. Обратите внимание, что для этой демонстрации мы запускаем все экземпляры как отдельные
процессы на одном и том же хосте, но, конечно, мы размещаем эти процессы на разных хостах в реальной жизни.

Если мы теперь вернемся к первому экземпляру и отправим 2 задачи, мы увидим, что они назначены на оба
экземпляра:


1

==> отправка 192.168.1.5:33795::2

выполнение 192.168.1.5:33795::2

<== result = вторник, сентябрь 12:43:48 CEST 2008

[1] Submit [2] Submit длительное выполнение задачи [3 ] Информация [q] Выйти

1

==> отправка 192.168.1.5:33795::3

<== результат = вторник, сентябрь 09 12:43:49 CEST 2008

Задача № 2 была выполнена нами самостоятельно, но задача № 3 была выполнена вторым экземпляром (это можно проверить
, посмотрев на выходные данные второго экземпляра).

Давайте теперь начнем третий экземпляр:


[linux] / home / bela / JGroupsArticles / dist $ java -Djgroups.bind_addr = 192.168.1.5 -jar demo.jar

———————— ——————————-

GMS: адрес 192.168.1.5:45532

———- ———————————————

просмотр: [192.168. 1.5: 33795 | 2] [192.168.1.5:33795, 192.168.1.5:41232, 192.168.1.5:45532]

мой ранг 2

[1] Добавить [2] Отправить долгосрочное задание [3] Информация [q] Выйти

Мы видим, что в кластере теперь есть 3 узла, а ранг только что запущенного экземпляра равен 2.
Теперь мы передадим долгосрочную задачу T и — до ее завершения — уничтожим узел, обрабатывающий T.

Давайте отправим эту задачу в третий экземпляр:


[1] Отправить [2] Отправить долгосрочное задание [3] Информация [q] Выйти

2

==> отправить 192.168.1.5:45532::1

Поскольку второй экземпляр имеет ранг = 1, задача № 1 из 192.168.1.5:45532 выполняется в этом экземпляре.
До истечения 15 секунд давайте убьем второй экземпляр. Через несколько секунд выходные данные третьего
экземпляра показывают следующее:


ракурс: [192.168.1.5:33795|3] [192.168.1.5:33795, 192.168.1.5:45532]

мой ранг 1

**** с заданием 192.168.1.5:45532::1 из 192.168.1.5:41232 ( отправлено

192.168.1.5:45532)

выполнением 192.168.1.5:45532::1

сна в течение 15 секунд …

выполнено

<== результат = вторник, сен 09 12:55:10 CEST 2008

Это может быть несколько удивительно, но правильно. Посмотрим, что происходит.

Сначала мы получаем изменение представления, новое представление — 192.168.1.5:33795, 192.168.1.5:45532. Это означает, что
третий экземпляр теперь имеет ранг = 1, что в точности соответствует рангу убитого экземпляра. Поэтому, когда задача
№ 1 переназначается, это третий узел 192.168.1.5:41232, который выполняет # 1.

Это тот же самый узел, что и отправитель, но это нормально: поскольку в
нашем кластере осталось только 2 узла, отправитель обрабатывает свою задачу с вероятностью 50%. Если бы у нас было больше узлов в
кластере, вероятность того, что отправитель обработает свою задачу, уменьшится.

Вывод

Мы внедрили простую, высоко децентрализованную систему распределения задач в кластере,
содержащую примерно 500 строк кода и 5 классов. Система устойчива к сбоям, потому что все узлы являются узлами и нет центрального
сервера.

Все одноранговые узлы равны (каждый одноранговый узел может выступать как ведущим, так и ведомым), и задачи захватываются узлом
на основе идентификатора, назначенного отправителем (ведущим).

Сбои или постепенное прекращение работы узлов не приводит к отсутствию задач, так как система перебалансирует себя
и назначает потерянные задачи другому узлу в кластере.
Система настолько мала, потому что работает поверх JGroups. Если бы мы написали это без JGroups, нам пришлось
бы самим реализовать следующую функциональность:


  • Членство в кластере и обнаружение сбоев: нам нужно знать, когда меняется членство, и

    все узлы в кластере должны получать эти представления в абсолютно одинаковом порядке.

  • Одновременное вещание (с дейтаграммами UDP): фрагментация (если задача или результат превышает 65 КБ) и

    повторная передача (дейтаграммы UDP потеряны), а также подавление дублирующихся сообщений (мы не можем получить одну и ту же задачу несколько раз!).

  • Простое переключение между транспортами (UDP, TCP) и настройка / настройка

    транспорта: например, добавление сжатия или шифрования

Распределение текущей задачи далеко не завершено (в конце концов, это всего лишь демонстрация того, что можно сделать с помощью JGroups!); Возможные дальнейшие улучшения включают в себя:


  • Реализация java.util.concurrent.ExecutorService.
    Это расширило бы пул потоков в ВМ,

    чтобы стать кластеризованным пулом потоков, где задачи выполняются не только потоками в одной и той же

    JVM, но также потоками на разных хостах. Это позволило бы мастерам отправлять задачи (

    например, набор задач) и ждать их завершения позже. В нашем текущем решении

    поток вызывающей функции submit () блокируется до тех пор, пока не истечет время ожидания или не станет доступен результат.

  • Не все узлы хранят задачу, но только подмножество узлов.
    Когда происходит сбой узла X, мы просим

    всех о задачах, назначенных для X, и они возвращаются узлами, которые их сохранили.

  • Используйте случайные числа для создания идентификаторов ClusterID, а не монотонно растущих целых чисел.
    В настоящее время

    мы используем схему распределения идентификаторов. Хотя это довольно хорошо для
    равномерного распределения всех задач
    , в некоторых случаях может быть лучше назначить веса отдельным узлам кластера в соответствии

    с количеством ядер, памятью и т. Д. Таким образом, задачи можно назначать более оптимально, тогда как в

    текущем решении мы распределяйте все задачи равномерно, что означает, что более медленные хосты получают такое же количество

    задач, что и более быстрые хосты. Вероятно, нам следует выполнить внешнюю политику, которая создает идентификаторы и / или

    выбирает узлы, чтобы их можно было заменить.

Скачать

Полный демо-код можно скачать здесь