Статьи

Регулирование данных MQTT

Вступление

Большинство брокеров MQTT, доступных в настоящее время на рынке, предоставляют встроенную поддержку WebSockets, что позволяет любой библиотеке JavaScript MQTT устанавливать связь путем инкапсуляции сообщений MQTT в фреймы WebSocket: это называется MQTT Over WebSocket.

Большим преимуществом этого подхода является то, что все современные браузеры, в том числе работающие на смартфонах, могут отправлять и получать сообщения MQTT. Это приводит к растягиванию протокола в сети, что делает его все более и более привлекательным для сценариев M2H (машина-человек).

Однако непредсказуемость Интернета с точки зрения потери пакетов и доступной пропускной способности, особенно в мобильных сетях, делает такой подход «тупой канал» совершенно ненадежным. Например, отправка всех больших данных в реальном времени, создаваемых датчиками IoT, в приложения на основе браузера и в мобильные приложения может быть как невозможной, так и бесполезной из-за риска перегрузки сети, браузера и пользователя … В этих случаях динамическое регулирование является ключом. Данные, которые проходят через WebSockets, должны регулироваться для адаптации к доступной пропускной способности с повторной выборкой. Таким образом, разные клиенты, подписанные на одни и те же высокочастотные темы MQTT в разных сетях, будут видеть обновленные данные, даже при крошечных соединениях с пропускной способностью.

MQTT.Cool за пределами WebSockets

MQTT.Cool, являясь оптимизированным шлюзом между веб-клиентами и брокерами, способен повысить эффективность обмена данными в режиме реального времени. Кроме того, если ваша инфраструктура не поддерживает WebSockets (например, из-за прозрачных прокси-серверов или строгих корпоративных брандмауэров, которые их блокируют), MQTT.Cool автоматически выбирает лучший транспорт для каждого клиента, возможно, возвращаясь к HTTP Streaming и HTTP Long Polling: с MQTT.Cool, WebSocket больше не является единственным способом подключения к брокеру MQTT из Интернета.

Применяя регулирование данных с помощью специальных методов, таких как организация очередей , повторная выборка и сопоставление, MQTT.Cool динамически оптимизирует поток сообщений независимо от того, какой транспорт используется.

Данные можно регулировать двумя различными способами:

  1. Адаптация регулирования, с помощью которой MQTT.Cool автоматически повторяет выборку данных на лету, применяя сопоставление для адаптации к любой перегрузке сети.
  2. Контролируемое клиентом регулирование, с помощью которого каждый клиент может явно настроить максимальную полосу пропускания для своего нисходящего канала, а также максимальную частоту обновления для каждой подписки разветвления .

Вы можете найти больше о подписках разветвления в Руководстве по началу работы ; но ради полноты

  • Совместно используемое соединение – это соединение одного брокера от MQTT.Cool с брокером MQTT, поверх которого туннелируются и мультиплексируются различные клиенты MQTT.Cool.
  • Точно так же подписка разветвления – это отдельная подписка MQTT, созданная MQTT.Cool в брокере для управления всеми сообщениями, опубликованными по этой теме и подписанными несколькими удаленными клиентами MQTT.Cool.

Общие подключения и подписки разветвления – это два основных механизма, которые позволяют любому брокеру MQTT добиться массового распространения в сети.

MQTT.Cool предлагает большую гибкость и высокий уровень оптимизации, поскольку он основан на Lightstreamer , высокопроизводительном сервере, который используется большой базой топ-клиентов по всему миру для эффективной и надежной доставки данных в реальном времени через Интернет.

Витрина для дросселирования MQTT

Чтобы лучше проиллюстрировать, как MQTT.Cool управляет регулированием, мы создали демонстрацию регулирования MQTT, которая фокусируется на регулировании с помощью клиента. Демонстрация была построена в сотрудничестве с нашими друзьями из Gambit Communications , которые помогли нам с их мощным симулятором MIMIC MQTT, как будет описано позже.

Демонстрация показывает, как легко создать веб-клиент, который управляет телеметрией в реальном времени, и, что очень важно, как можно далее управлять входящим потоком сообщений с точки зрения пропускной способности и частоты.

MQTT Data

Клиент визуализирует обновления в режиме реального времени, поступающие от десяти различных датчиков IoT, которые постоянно обнаруживают и публикуют в MQTT-посреднике расстояние между собой и движущимися объектами.

Чтобы сделать четкой разницу между дросселированными и нерегулируемыми данными, для каждого датчика отображаются два графика: с красными точками для дросселированных данных и с оранжевой линией для нерегулируемых данных.

Более того, клиент позволяет вам взаимодействовать с пользовательским интерфейсом, чтобы точно контролировать влияние регулирования потока данных как на глобальном, так и на индивидуальном уровне, как показано ниже.

Контролировать частоту

Для каждого кадра вы можете использовать ползунок для динамического изменения максимальной частоты обновления, с которой могут поступать входящие сообщения, связанные с датчиком. Это влияет на частоту, с которой красные точки отображаются на кадре, как вы можете видеть из следующей анимации, где селектор частоты был перемещен с «неограниченного» на «1 обновление / сек», а затем на «3 обновления / сек».

MQTT Data

Контролировать пропускную способность

Таким же образом, в верхней части страницы селектор позволяет изменить полосу пропускания, используемую всеми подписками, влияя на общую частоту всех красных точек, отображаемых во всех кадрах. В приведенной ниже анимации селектор пропускной способности был изменен с «ulimited kpbs»
»До« 5 кбит / с », а затем до« 15 кбит / с »:

MQTT Data

Взаимодействие с пользовательским интерфейсом помогает понять, как ограничения полосы пропускания и частоты устанавливают верхнюю границу, динамически управляемую MQTT.Cool, на разных уровнях: ограничение полосы пропускания применяется глобально к соединению, тогда как ограничение частоты применяется к каждой подписке MQTT индивидуально. ,

Обратите внимание, что обновления в реальном времени не буферизируются и не задерживаются, а пересматриваются и сопоставляются. Другими словами, когда подписка может быть обновлена ​​(на основе алгоритма циклического перебора), она получит самое последнее доступное сообщение, а не старое.

Архитектура Демо

Следующая диаграмма показывает общую архитектуру, используемую для создания демонстрации:

MQTT Data

Подписчик

Используя API веб-клиента , клиент JavaScript отправляет подписку MQTT для каждого датчика / темы, чтобы получать данные в реальном времени и отображать их на относительной диаграмме.

Использование API веб-клиента необходимо, так как MQTT.Cool не «говорит» на чистом MQTT с веб-клиентами: он прозрачно переназначает протокол MQTT поверх протокола Lightstreamer, чтобы использовать возможности Lightstreamer для перемещения больших объемов данных через веб. Это означает, что у вас есть собственные клиенты MQTT, напрямую подключенные к брокеру MQTT с одной стороны, и веб-клиенты, подключенные к MQTT.Cool через API веб-клиента, с другой стороны.

Издатель

Как уже ожидалось, мы использовали гибкость и мощь, предлагаемые MIMIC MQTT Simulator , благодаря которым чрезвычайно легко создавать неограниченный диапазон моделируемых сценариев, поскольку он способен генерировать произвольную, настраиваемую, масштабируемую и предсказуемую телеметрию.

Ребята из Gambit развернули смоделированные IoT-датчики в своей лаборатории MQTT, чтобы публиковать изменения расстояния, генерируемые в виде синусоидальных волн, каждый из которых имеет разную частоту, чтобы отображать разные схемы трафика.

Брокер

Брокер MQTT размещен в нашей облачной инфраструктуре и слушает по адресу tcp://broker.mqtt.cool:1883 . Говоря об этом, будучи публично доступным брокером, не стесняйтесь использовать его для любых целей тестирования!

Шлюз

Выступая в качестве посредника между Подписчиком и Брокером, сервер MQTT.Cool выполняет роль реального веб-шлюза. Живая версия демо-версии подключается к https://cloud.mqtt.cool , который является адресом нашего онлайн-экземпляра MQTT.Cool. Как описано далее, вы можете заменить его своим собственным экземпляром в локальной копии демо.

Копать код

Демонстрация основана на jQuery и, как указывалось ранее, была разработана с использованием API веб-клиента MQTT.Cool с поддержкой еще двух небольших библиотек:

  1. Flotr2 для рендеринга графиков.
  2. rangelider.js для визуализации и манипулирования слайдером.

Давайте начнем с изучения некоторых интересных фрагментов кода, взятых из файла js/app.js , где определен весь код приложения.

Адреса хостов

В верхней части файла вы можете найти адреса сервера MQTT.Cool и брокера MQTT:

1
2
3
4
5
6
$(function() {
  // Define urls for MQTT.Cool and the external MQTT broker.
  const MQTT_COOL_URL = 'http://localhost:8080';
  const BROKER_URL = 'tcp://broker.mqtt.cool:1883';
  ...
})

Если вы хотите клонировать проект на своем ноутбуке и использовать другой сервер MQTT .Cool, замените константу MQTT_COOL_URL соответствии с вашей средой.

Обратите внимание, что то же самое не относится к брокеру MQTT, поскольку MIMIC Simulator в настоящее время публикует данные для нашего облачного брокера.

датчиков

После определения других переменных моделируемый удаленный датчик IoT абстрагируется функцией конструктора Sensor , которая идентифицируется по идентификатору датчика и идентификатору кадра :

  • Идентификатор датчика используется для формирования темы, по которой MIMIC Simulator будет публиковать данные телеметрии для этого имитируемого датчика. Тема используется для отправки подписки MQTT.
  • Идентификатор фрейма используется для поиска области на странице HTML, посвященной отображению анимированного графика.
1
2
3
4
5
6
function Sensor (sensorId, frameId) {
  this.sensorId = sensorId;
  this.frameId= frameId;
  this.topic = '/gambit/' + this.sensorId + '/telemetry';
  ...
}

В следующем массиве перечислены все датчики, поставляемые Gambitt:

01
02
03
04
05
06
07
08
09
10
11
12
const SENSORS = [
    new Sensor('20:19:AB:F4:0D:0D', 'sensor1'),
    new Sensor('20:19:AB:F4:0D:0E', 'sensor2'),
    new Sensor('20:19:AB:F4:0D:0F', 'sensor3'),
    new Sensor('20:19:AB:F4:0D:10', 'sensor4'),
    new Sensor('20:19:AB:F4:0D:11', 'sensor5'),
    new Sensor('20:19:AB:F4:0D:12', 'sensor6'),
    new Sensor('20:19:AB:F4:0D:13', 'sensor7'),
    new Sensor('20:19:AB:F4:0D:14', 'sensor8'),
    new Sensor('20:19:AB:F4:0D:15', 'sensor9'),
    new Sensor('20:19:AB:F4:0D:16', 'sensor10')
  ];

Обработка сообщений

MessageHandler обрабатывает сообщения, полученные из подписки MQTT, и пересылает их нужному локальному объекту Sensor:

1
2
3
4
5
const MessageHandler = function (sensorType) {
    return function (message) {
      ...
    };
  }

После создания эта функция конструктора вернет обратный вызов, который будет вызван при получении экземпляра Message (параметр message ).

Чтобы определить, какой смоделированный удаленный датчик создал сообщение, мы просто перебираем все элементы SENSORS , пока единственная возможная тема (как определено ранее) не совпадает с Message.destinationName полученного сообщения:

1
2
3
const sourceSensor = SENSORS.find(function (s) {
  return s.topic == message.destinationName
});

Поскольку MIMIC Simulator был настроен на публикацию полезных данных в виде строк JSON, мы анализируем поле Message.payloadString чтобы получить обратно объект JSON:

1
const payload = JSON.parse(message.payloadString);

Наконец, объект Sensor запускается, чтобы обновить диаграмму, sensorType ( « sensorType » или «необработано» в соответствии с клиентом, с которым связан MessageHandler ), передавая значение, переносимое полезной нагрузкой:

1
sourceSensor.update(sensorType, payload);

Вот как выглядит MessageHanlder в целом:

1
2
3
4
5
6
7
8
9
const MessageHandler = function (sensorType) {
  return function (message) {
    const sourceSensor = SENSORS.find(function (s) {
      return s.topic == message.destinationName;
    });
    const payload = JSON.parse(message.payloadString);
    sourceSensor.update(sensorType, payload);
  };
}

Теперь давайте пройдемся по подключению и подпискам.

Подключение и подписки

Прежде всего, мы должны открыть новый сеанс против шлюза:

1
2
3
4
5
6
7
8
9
mqttcool.openSession(MQTT_COOL_URL, 'demouser', '', {
 
 onConnectionSuccess: function (mqttCoolSession) {
   ...
 },
 onLsClient: function (lsClient) {
   ...
 }
}

Здесь demouser – это имя пользователя, требуемое нашим онлайн-сервером для всех развернутых демонстрационных версий (в этом случае необходимо demouser пустой пароль).

Последний аргумент – это реализация интерфейса MQTTCoolListener , который уведомляется о событиях, связанных с созданием сеанса.

Обязательный onConnectionSuccess вызов onConnectionSuccess запускается после успешного подключения к MQTT.Cool. Это место, где обычно происходит соединение MQTT: действительно, предоставленный параметр mqttCoolSession предоставляет точку входа для создания экземпляра MqttClient настроенного для подключения к брокеру по указанному адресу:

1
throttledClient = mqttCoolSession.createClient(BROKER_URL);

Затем клиент связывается с экземпляром MessageHandler для отправки сообщений, которые можно регулировать. Для этого мы устанавливаем MqttClient.onMessageArrived вызов MqttClient.onMessageArrived :

1
throttledClient.onMessageArrived = new MessageHandler('throttled');

Наконец, мы можем подключиться к брокеру и, после подключения, подписаться на тему каждого датчика:

1
2
3
4
5
6
7
throttledClient.connect({
  onSuccess: function () {
    for (var i = 0; i < SENSORS.length; i++) {
      throttledClient.subscribe(SENSORS[i].topic);
    }
  }
});

Ниже приводится полная версия onConnectionSuccess :

01
02
03
04
05
06
07
08
09
10
11
12
onConnectionSuccess: function (mqttCoolSession) {
  throttledClient = mqttCoolSession.createClient(BROKER_URL);
 
  throttledClient.onMessageArrived = new MessageHandler('throttled');
 
  throttledClient.connect({
    onSuccess: function () {
      for (var i = 0; i < SENSORS.length; i++) {
        throttledClient.subscribe(SENSORS[i].topic);
      }
    }
}

Подробное обсуждение создания соединений с MQTT.Cool см. В разделе «Шаблон соединения MQTT.Cool» Руководства по началу работы .

Базовый API

Поскольку мы также хотим управлять глобальной пропускной способностью, мы должны использовать объект LighststreamerClient , точку входа API веб-клиента Lightstreamer , поверх которой построен API веб-клиента MQTT.Cool.

Этот объект управляет всеми связями между клиентами и сервером MQTT.Cool, и для каждого сеанса MQTT.Cool существует соответствующий объект LightstreamerClient .

Посредством этого объекта мы можем явно изменить пропускную способность по нашему усмотрению; но сначала мы должны кешировать ссылку для последующего использования. Вот где приходит onLsClient помочь нам:

1
2
3
onLsClient: function(lsClient) {
  lightstreamerClientReference = lsClient
}

Управление частотой

Теперь давайте рассмотрим, как приложение позволяет изменять максимальную частоту обновления для данного датчика IoT.

Функция конструктора Sensor мы видели ранее, также определяет внутреннюю функцию ( initFrequencySelector ), которая устанавливает начальное значение связанного селектора. Что еще более важно, функция принимает обратный вызов, который будет вызываться каждый раз, когда вы перемещаете ползунок, чтобы выбрать новое значение:

01
02
03
04
05
06
07
08
09
10
11
12
function Sensor(sensorId, frameId) {
  ...
  // Initialize the Frequency Selector of this IoT Sensor
  function initFrequencySelector(callback) {
    ...
  }
 
  // Trigger the Frequency Selector initialization passing the callback
  initFrequencySelector(function(subOptions) {
    throttledClient.subscribe(self.topic, subOptions);
  });
}

Обратный вызов просто повторно отправляет подписку на ту же тему, но предоставляет другой объект SubscribeOptions (параметр subOpts
), который содержит обновленное значение maxFrequency предоставленное селектором.

1
2
3
4
5
6
7
8
onSlideEnd: function (position, value) {
  ...
  const subOptions = {};
  if (value !== 'Unlimited') {
    subOptions['maxFrequency'] = value;
  }
  onSlideEndCallback(subOptions);
}

После повторной подписки сообщения начинают передаваться с указанной скоростью.

Управление пропускной способностью

Логика обновления полосы пропускания аналогична, но на этот раз вам придется поиграться с кэшированным объектом LightstramerClient , как и ожидалось выше.

Функция initBandwidhtSelector (определенная во внешней области) инициализирует состояние глобального переключателя пропускной способности и требует обратного вызова, который будет активирован при обработке ползунка:

1
2
3
initBandwidthSelector(function (value) {
  lightstreamerClientRef.connectionOptions.setMaxBandwidth(value);
});

Максимально допустимая пропускная способность, доступная для всех подписок, изменяется мгновенно, вызывая метод LightstreamerClient.connectOptions.setMaxBandiwidth
; вскоре вы можете увидеть, как это влияет на все графики, на которых дросселированные синусоидальные волны начинают рендеринг через переменные интервалы на основе полосы, выбранной с помощью селектора.

Приложение «Доступ к клиентскому API Lightstreamer» Руководства по началу работы рассматривает взаимосвязь между двумя клиентскими API-интерфейсами и показывает, как извлечь из этого другие выгоды.

Нерегулируемые данные

Манипуляции по частоте и полосе пропускания влияют только на входящие сообщения, отображаемые в виде красных точек и, следовательно, связанные с подписками MQTT, запрашиваемыми от дросселированного клиента, как было показано ранее.

Но визуальное сравнение данных с регулированием и необработанными данными (то есть с данными без регулирования) помогает нам лучше понять, как пропускная способность и частота действительно влияют на потоки сообщений.

Вот почему мы использовали в демоверсии другой тип соединения MQTT.Cool, выделенное соединение , чьи подписки MQTT получают данные по мере их поступления от брокера MQTT без каких-либо дополнительных манипуляций, которые могут запускаться клиентом или сервером MQTT.Cool. , Сообщения, проходящие через это соединение, отображаются в виде непрерывных оранжевых линий в кадре.

Следующий фрагмент показывает соединение и последующие подписки, сделанные для необработанного клиента :

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
mqttcool.openSession(MQTT_COOL_URL, 'demouser', '', {
 
  onConnectionSuccess: function (mqttCoolSession) {
    const clientId = 'client-' + new Date().getTime().toString();
    rawClient = mqttCoolSession.createClient(BROKER_URL, clientId);
 
    rawClient.onMessageArrived = new MessageHandler('raw');
 
    rawClient.connect({
      onSuccess: function () {
        for (var i = 0; i < SENSORS.length; i++) {
          rawClient.subscribe(SENSORS[i].topic);
        }
      }
    });
});

Используемая схема является обычной. Фактически, фрагмент кода похож на случай с дросселем, но с двумя существенными отличиями, которые мы выделили:

  • Строки 4-5: идентификатор клиента (хотя и не претендующий на то, чтобы быть уникальным для всех возможных клиентов) провоцирует сеанс вернуть MQTT-клиент, который будет запускать выделенное соединение с брокером на стороне сервера.
  • Строка 7: экземпляр MessageHanlder теперь создается с необработанным типом датчика, таким образом обращаясь к соответствующему типу диаграммы, который является оранжевой линией.

Вывод

Демонстрация демонстрирует основные методы, которые можно использовать для улучшения управления потоками MQTT с хорошим контролем частоты и полосы пропускания потока сообщений. Это стало возможным благодаря базовой технологии Lightstreamer, которая позволяет выделять как желаемую частоту для каждой подписки, так и общую пропускную способность для всего соединения, также гарантируя, что никогда не превысит требуемые значения.

В ближайшем будущем новые расширения API (не нарушая Paho-подобный дизайн) сделают управляемое клиентом регулирование еще проще благодаря следующим улучшениям:

  • Специальные дополнения к SubscribeOptions для контроля частоты, поэтому повторная подписка больше не требуется.
  • Прямой контроль над пропускной способностью через явные операции, предоставляемые MQTTCoolSession .

Наслаждайтесь сообщениями MQTT с MQTT.Cool и следите за обновлениями!

Опубликовано на Java Code Geeks с разрешения Джанлуки Финоккиаро, партнера нашей программы JCG . См. Оригинальную статью здесь: Регулирование данных MQTT.

Мнения, высказанные участниками Java Code Geeks, являются их собственными.