Статьи

Начало работы с Kafka REST Proxy для потоков MapR

MapR Ecosystem Package 2.0 (MEP) поставляется с некоторыми новыми функциями, связанными с потоками MapR:

  • Kafka REST Proxy для MapR Streams предоставляет интерфейс RESTful для MapR Streams и кластеров Kafka, что упрощает использование и создание сообщений, а также выполнение административных операций.
  • Kafka Connect для MapR Streams — это утилита для потоковой передачи данных между MapR Streams и Apache Kafka и другими системами хранения.

Пакеты экосистем MapR (MEP) — это способ предоставления обновлений экосистемы, отделенный от основных обновлений, позволяющий вам обновлять инструментальные средства независимо от платформы конвергентных данных MapR. Вы можете узнать больше о MEP 2.0 в этой статье .

В этом блоге мы расскажем, как использовать REST-прокси Kafka для публикации и приема сообщений в / из MapR Streams. REST Proxy является отличным дополнением к платформе конвергентных данных MapR, позволяя любому языку программирования использовать потоки MapR.

Прокси Kafka REST, предоставляемый инструментами MapR Streams, может использоваться с MapR Streams (по умолчанию), а также с Apache Kafka (в гибридном режиме). В этой статье мы сосредоточимся на потоках MapR.

Предпосылки

  • MapR Converged Data Platform 5.2 с MEP 2.0
    • с помощью инструментов MapR Streams
  • curl, wget или любой инструмент HTTP / REST Client

Создайте потоки MapR и тему

Поток — это набор тем, которыми вы можете управлять как группой:

  1. Настройка политик безопасности, которые применяются ко всем темам в этом потоке
  2. Установка количества разделов по умолчанию для каждой новой темы, созданной в потоке
  3. Установка времени жизни для сообщений в каждой теме в потоке

Вы можете найти больше информации о концепциях MapR Streams в документации .

На вашем MapR Cluster или Sandbox выполните следующие команды:

1
2
3
$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p
$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3
$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3

Start Kafka Console Производители и потребители

Откройте два окна терминала и запустите потребительские утилиты Kafka, используя следующие команды:

потребитель

  • Тема сенсора-JSON
1
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-json
  • Тема сенсорный-бинарный
1
$ /opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/iot-stream:sensor-binary

Эти два окна терминала позволят вам увидеть сообщения, размещенные на разные темы.

Использование Kafka REST Proxy

Проверить метаданные темы

Конечная точка / themes / [topic_name] позволяет получить некоторую информацию о теме. В потоках MapR темы являются частью потока, идентифицируемого путем; чтобы получить доступ к теме через REST API, необходимо ввести полный путь и закодировать его в URL; например:

  • / apps / iot-stream: датчик-json будет кодироваться с помощью% 2Fapps% 2Fiot-stream% 3Asensor-json

Выполните следующую команду, чтобы получить информацию о теме sensor-json :

1
$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

Примечание. Для простоты я запускаю команду с узла, где работает REST-прокси Kafka, поэтому можно использовать localhost .

Вы можете красиво напечатать JSON, добавив команду Python, такую ​​как:

1
$ curl -X GET  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool

Поток по умолчанию

Как упоминалось выше, путь потока является частью имени темы, которое вы должны использовать в команде; однако возможно настроить прокси-сервер REST MapR Kafka для использования потока по умолчанию. Для этой конфигурации вы должны добавить следующее свойство в файл /opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties :

  • streams.default.stream = / приложения / ИТН-поток

При изменении конфигурации прокси-сервера Kafka REST необходимо перезапустить службу с помощью maprcli или MCS.

Основная причина использования свойств streams.default.stream — упрощение URL-адресов, используемых приложением. Например:

В этой статье все URL-адреса содержат закодированное имя потока, поэтому вы можете начать использовать Kafka REST-прокси без изменения конфигурации, а также использовать его с другими потоками.

Публикация сообщений

Прокси Kafka REST для MapR Streams позволяет приложениям публиковать сообщения в MapR Streams. Сообщения могут быть отправлены в виде JSON или двоичного содержимого (кодировка base64).

Чтобы отправить сообщение JSON:

  • запрос должен быть HTTP POST
  • Тип содержимого должен быть следующим: application / vnd.kafka.json.v1 + json
  • тело:
01
02
03
04
05
06
07
08
09
10
11
12
13
{
  "records":
  [
    {
      "value":
      {
        "temp" : 10 ,
        "speed" : 40 ,
        "direction" : "NW"
        
      }
  ]
}

Полный запрос:

1
2
3
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
  --data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"}  }]}' \
  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

Вы должны увидеть сообщение, напечатанное в окне терминала, где работает потребитель / apps / iot-stream: sensor-json .

Чтобы отправить двоичное сообщение:

  • запрос должен быть HTTP POST
  • Тип содержимого должен быть следующим: application / vnd.kafka.binary.v1 + json
  • тело:
1
2
3
4
5
6
7
8
{
  "records":
  [
    {
      "value":"SGVsbG8gV29ybGQ="
    }
  ]
}

Обратите внимание, что SGVsbG8gV29ybGQ = — это строка «Hello World», закодированная в Base64.

Полный запрос:

1
2
3
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \
  --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \
  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

Вы должны увидеть сообщение, напечатанное в окне терминала, где работает потребитель / apps / iot-stream: sensor-binary .

Чтобы отправить несколько сообщений:

Поле записи тела HTTP позволяет отправлять несколько сообщений; например, вы можете отправить:

1
2
3
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \
  --data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"}  }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"}  } ]}' \
  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json

Эта команда отправит 2 сообщения и увеличит смещение на 2. Вы можете сделать то же самое с двоичным содержимым, добавив новые элементы в массив JSON; например:

1
2
3
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \
  --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \
  http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary

Как вы, вероятно, знаете, можно связать ключ с сообщением, чтобы быть уверенным, что все сообщения с одним и тем же ключом будут поступать в один и тот же раздел. Для этого добавьте в сообщение атрибут ключа следующим образом:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
{
  "records":
  [
    {
      "key": "K001",
      "value":
      {
        "temp" : 10 ,
        "speed" : 40 ,
        "direction" : "NW"
        
      }
  ]
}

Теперь, когда вы знаете, как публиковать сообщения в теме MapR Streams с помощью REST Proxy, давайте посмотрим, как использовать эти сообщения.

Использование сообщений

Прокси REST также может использоваться для получения сообщений из тем; Для этого вам необходимо:

  1. Создайте потребительский экземпляр.
  2. Используйте URL-адрес, возвращенный первым вызовом, чтобы прочитать сообщение.
  3. Удалите экземпляр потребителя, если это необходимо.

Создание экземпляра потребителя

Следующий запрос создает экземпляр потребителя:

1
2
3
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
      --data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json

Ответ от сервера выглядит так:

1
2
3
4

Обратите внимание, что мы использовали / consumer / [topic_name] для создания потребителя. Base_uri будет использоваться последующими запросами для получения сообщений из темы. Как и любой потребитель MapR Streams / Kafka, auto.offset.reset определяет его поведение. В этом примере значение установлено на самое раннее , что означает, что потребитель будет читать сообщения с самого начала. Вы можете найти больше информации о конфигурации потребителя в документации MapR Streams .

Использование сообщений

Чтобы использовать сообщения, просто добавьте тему MapR Streams в URL-адрес экземпляра потребителя.

Следующий запрос потребляет сообщения из темы:

1
2
curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json

Этот вызов возвращает сообщения в документе JSON:

1
2
3
4
5
[
  {"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},
  {"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},
  {"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3}
]

Каждый вызов API возвращает новые опубликованные сообщения, основываясь на смещении последнего вызова.

Обратите внимание, что Потребитель будет уничтожен:

  • после некоторого времени простоя, установленного consumer.instance.timeout.ms (значение по умолчанию 300000ms / 5 минут), оно уничтожается с помощью вызова API REST (см. ниже).

Использование сообщений в двоичном формате

Подход тот же, если вам нужно использовать двоичные сообщения: вам нужно изменить формат и заголовок Accept.

Вызовите этот URL, чтобы создать потребительский экземпляр для двоичной темы:

1
2
3
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
      --data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary

Затем принимать сообщения, заголовок принятия устанавливается в application / vnd.kafka.binary.v1 + json :

1
2
curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \
http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary

Этот вызов возвращает сообщения в документе JSON, а значение кодируется в Base64:

1
2
3
4
[
  {"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},
  {"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2}
]

Удалить потребительские экземпляры

Как упоминалось ранее, потребитель будет автоматически уничтожен на основе конфигурации customer.instance.timeout.ms прокси REST; также возможно уничтожить экземпляр, используя URI экземпляра-потребителя и вызов HTTP DELETE, следующим образом:

1
curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer

Вывод

Из этой статьи вы узнали, как использовать Raf-прокси Kafka для потоков MapR, что позволяет любому приложению использовать сообщения, опубликованные в MapR Converged Data Platform.

Вы можете найти более подробную информацию о Kafka REST Proxy в документации MapR и на следующих ресурсах: