Первое сетевое приложение, которое я написал в своей жизни (OMG, это было 17 лет назад), было очень интересным. Мы занимались разработкой морской навигационной системы — электронная карта + подключения к GPS, радарам и другим датчикам. В какой-то момент стало очевидно, что одного компьютера недостаточно хотя бы по причинам бронирования. Забавным фактом является то, что главной причиной с точки зрения бизнеса было даже не это, а тот факт, что на действительно большом судне (танкере или грузовом судне или что-то еще) размер моста настолько велик, что вы хотите иметь как минимум три компьютера (совместных, конечно) — один в центре моста и по одному на каждом крыле.
Одним из основных требований было отсутствие конфигурации сети. Как вы можете себе представить, типичные члены команды ничего не могут сделать с IP-адресами или портами. Таким образом, компьютеры должны были найти друг друга молча и решить, как сотрудничать.
Решение хорошо известно — каждый компьютер (узел) периодически передает по сети собственный идентификатор и сетевой адрес, где он прослушивает входящие соединения. Также каждый узел прослушивает такие передаваемые сообщения. Когда другой узел получает такое сообщение, он решает, кто должен с кем соединиться. Это может быть детерминированный алгоритм (например, сравнение идентификаторов) или кто первым установит связь или что-то еще. Как только узлы обнаружены, возможна совместная работа друг друга.
Честно говоря, в тот момент это была тяжелая задача для меня, которая заняла довольно много времени. Сегодня я хочу показать, насколько это просто.
Наш план следующий: мы разработаем два универсальных объекта — один для периодической трансляции сообщений, а другой для получения.
Каждый объект будет иметь отдельный поток и отдельный сокет. Во многих случаях использование двух сокетов не является необходимым, поскольку обычно у вас есть и широковещательный, и отправитель, но в некоторых случаях вам нужен только один из них, поэтому мы реализуем более общее решение.
Наши отправители / получатели будут очень общими и ничего не знают о природе данных. Мы хотим разделить проблемы.
Давайте начнем с общей функциональности для отправителя и получателя.
abstract class BroadcastThread extends SupervisedChannel {
InetAddress group
int port
private MulticastSocket socket
private volatile boolean stopped
abstract void loopAction ()
protected void doStartup() {
executor.execute {
try {
socket = new MulticastSocket(port);
socket.joinGroup(group);
while (!stopped)
loopAction ()
}
catch(Throwable t) {
stopped = true
crash(t)
}
socket.close()
}
}
protected void doShutdown() {
stopped = true;
}
}
SuperviseChannel — очень интересное животное. Стоит отдельная статья сама по себе, но вот краткая идея
Прежде всего, это канал сообщений (или субъект) в смысле, описанном в статье «Блокировка алгоритмов бесплатной передачи сообщений с помощью Groovy ++». Это означает, что он реагирует на входящие сообщения, и мы гарантируем, что в любой момент времени обрабатывается не более одного сообщения. , В частности, методы startup () / shutdown () являются асинхронными (просто отправка соответствующего сообщения). Методы doStartup () / doShutdown (), вызываемые при получении сообщения, определяют, что должен делать наш канал при запуске.
Во-вторых, и еще более важные контролируемые каналы, встроенные в дерево. Эта отличная идея пришла от Erlang OTP. Грубо говоря, каждый контролируемый канал ответственно за создание, запуск и остановку своих детей. Особенно важно, что он также несет ответственность за решение, что делать, если какой-то ребенок потерпел крах. Существует много возможных стратегий — перезапустить сбойный дочерний процесс, либо остановить и перезапустить все дочерние процесс, либо самому сбой и позволить собственному руководителю решить, что делать (наша политика по умолчанию)
У нас нет дочерних элементов для отправителя / получателя, но очень скоро мы объединим их в более интересный объект. То, что мы делаем, это то, что если произошла какая-то ошибка и произошел сбой, мы позволяем нашему руководителю решить, что делать
Последнее, что следует отметить, это то, что каждый контролируемый канал имеет java.util.concurrent.Executor (обычно наследуется от владельца / супервизора). Мы используем его в приведенном выше коде для запуска нового потока.
На данный момент мы готовы создать отправителя
static class Sender extends BroadcastThread {
long sleepPeriod = 1000L
byte [] dataToTransmit
void loopAction () {
socket.send ([dataToTransmit, dataToTransmit.length, group, port])
Thread.currentThread().sleep(sleepPeriod);
}
}
Разве это не очень просто? Нам просто нужно определить частоту передачи и данные для трансляции
Приемник только немного сложнее. Мы хотим иметь возможность определять преобразование полученных байтов во что-то значимое для отправки либо владельцу, либо, если у нас нет владельца для нашей собственной обработки.
static class Receiver extends BroadcastThread {
Function1<byte[],?> messageTransform
void loopAction () {
def buffer = new byte [512]
def packet = new DatagramPacket(buffer, buffer.length)
socket.receive(packet)
def msg = buffer
if (messageTransform)
msg = messageTransform(buffer)
if (msg)
(owner ? owner : this).post(msg)
}
}
Нет, мы готовы построить наш главный объект обнаружения. Это будет руководитель, объединяющий отправителя и получателя.
При запуске он создает и запускает отправителя и получателя. Оба будут остановлены автоматически, когда он остановился
Полученные байты преобразуются в значимые сообщения и отправляются любому заинтересованному слушателю (включая нас самих, что открывает возможность для создания подкласса и имеет мост между передачей сообщений и миром ООП)
class BroadcastDiscovery extends SupervisedChannel {
// our ID
UUID uid
// our IP address
InetSocketAddress address
// where to send notifications on discovery
Multiplexor<Discovery> listeners = []
// where to broadcast
InetAddress group
// broadcast port
int port
static class Discovery {
UUID uuid
SocketAddress address
}
void doStartup() {
listeners.subscribe(this)
BroadcastThread.Sender sender = [
group:group,
port:port,
dataToTransmit: createDataToTransmit()
]
startupChild (sender)
BroadcastThread.Receiver receiver = [
group:group,
port:port,
messageTransform: { byte [] buf -> listeners.post(transformReceivedData(buf)) }
]
startupChild (receiver)
}
protected void doOnMessage(Object message) {
switch(message) {
case Discovery:
Discovery dicovery = message
onDiscovery(dicovery.uuid, dicovery.address)
break
default:
super.doOnMessage(message)
}
}
protected void onDiscovery(final UUID uuid, final SocketAddress address) {
}
private byte [] createDataToTransmit() {
// ...
}
private Discovery transformReceivedData (byte [] buf ) {
// ...
}
}
Обратите внимание на две детали:
Наше преобразование сообщения всегда возвращает ноль, что означает, что получатель не будет отправлять сообщение. Вместо этого мы отправляем преобразованные данные всем подписавшимся слушателям изнутри преобразования
Метод startupChild, используемый выше, является стандартным способом одновременного создания отношения владелец / дочерний элемент и запуска дочернего элемента.
Мы почти закончили. Просто для полноты здесь приведен код для запуска нашего канала обнаружения
private void startBroadcast() {
broadcast = new Broadcast()
broadcast.group = GROUP
broadcast.port = PORT
broadcast.uid = clusterNode.id
broadcast.address = (InetSocketAddress)serverChannel.getLocalAddress()
broadcast.startup ()
}
Последнее, но важное замечание:
Обычно такой канал обнаружения не является корневым супервизором. Например, естественно представить, что «брат» канала обнаружения является каналом, ответственным за прием входящих соединений TCP / IP и т. Д.
Спасибо за уделенное время. Надеюсь, это было интересно.
До следующего раза.