MapR Ecosystem Package 2.0 (MEP) поставляется с некоторыми новыми функциями, связанными с потоками MapR:
- Kafka REST Proxy для MapR Streams предоставляет интерфейс RESTful для MapR Streams и кластеров Kafka, что упрощает использование и создание сообщений, а также выполнение административных операций.
- Kafka Connect для MapR Streams — это утилита для потоковой передачи данных между MapR Streams и Apache Kafka и другими системами хранения.
Пакеты экосистем MapR (MEP) — это способ предоставления обновлений экосистемы, отделенный от основных обновлений, позволяющий вам обновлять инструментальные средства независимо от платформы конвергентных данных MapR. Вы можете узнать больше о MEP 2.0 в этой статье .
В этом блоге мы расскажем, как использовать REST-прокси Kafka для публикации и приема сообщений в / из MapR Streams. REST Proxy является отличным дополнением к платформе конвергентных данных MapR, позволяя любому языку программирования использовать потоки MapR.
Прокси Kafka REST, предоставляемый инструментами MapR Streams, может использоваться с MapR Streams (по умолчанию), а также с Apache Kafka (в гибридном режиме). В этой статье мы сосредоточимся на потоках MapR.
Предпосылки
- MapR Converged Data Platform 5.2 с MEP 2.0
- с помощью инструментов MapR Streams
- curl, wget или любой инструмент HTTP / REST Client
Создайте потоки MapR и тему
Поток — это набор тем, которыми вы можете управлять как группой:
- Настройка политик безопасности, которые применяются ко всем темам в этом потоке
- Установка количества разделов по умолчанию для каждой новой темы, созданной в потоке
- Установка времени жизни для сообщений в каждой теме в потоке
Вы можете найти больше информации о концепциях MapR Streams в документации .
На вашем MapR Cluster или Sandbox выполните следующие команды:
1
2
3
|
$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p $ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3 $ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3 |
Start Kafka Console Производители и потребители
Откройте два окна терминала и запустите потребительские утилиты Kafka, используя следующие команды:
потребитель
- Тема сенсора-JSON
1
|
$ /opt/mapr/kafka/kafka- 0.9 . 0 /bin/kafka-console-consumer.sh -- new -consumer --bootstrap-server this .will.be.ignored: 9092 --topic /apps/iot-stream:sensor-json |
- Тема сенсорный-бинарный
1
|
$ /opt/mapr/kafka/kafka- 0.9 . 0 /bin/kafka-console-consumer.sh -- new -consumer --bootstrap-server this .will.be.ignored: 9092 --topic /apps/iot-stream:sensor-binary |
Эти два окна терминала позволят вам увидеть сообщения, размещенные на разные темы.
Использование Kafka REST Proxy
Проверить метаданные темы
Конечная точка / themes / [topic_name] позволяет получить некоторую информацию о теме. В потоках MapR темы являются частью потока, идентифицируемого путем; чтобы получить доступ к теме через REST API, необходимо ввести полный путь и закодировать его в URL; например:
- / apps / iot-stream: датчик-json будет кодироваться с помощью% 2Fapps% 2Fiot-stream% 3Asensor-json
Выполните следующую команду, чтобы получить информацию о теме sensor-json :
1
|
$ curl -X GET http: //localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json |
Примечание. Для простоты я запускаю команду с узла, где работает REST-прокси Kafka, поэтому можно использовать localhost .
Вы можете красиво напечатать JSON, добавив команду Python, такую как:
1
|
$ curl -X GET http: //localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool |
Поток по умолчанию
Как упоминалось выше, путь потока является частью имени темы, которое вы должны использовать в команде; однако возможно настроить прокси-сервер REST MapR Kafka для использования потока по умолчанию. Для этой конфигурации вы должны добавить следующее свойство в файл /opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties :
- streams.default.stream = / приложения / ИТН-поток
При изменении конфигурации прокси-сервера Kafka REST необходимо перезапустить службу с помощью maprcli или MCS.
Основная причина использования свойств streams.default.stream — упрощение URL-адресов, используемых приложением. Например:
- с streams.default.stream вы можете использовать curl -X GET http: // localhost: 8082 / themes /
- без этой конфигурации или если вы хотите использовать определенный поток, вы должны указать его в URL: http: // localhost: 8082 / themes /% 2Fapps% 2Fiot-stream% 3Asensor-json
В этой статье все URL-адреса содержат закодированное имя потока, поэтому вы можете начать использовать Kafka REST-прокси без изменения конфигурации, а также использовать его с другими потоками.
Публикация сообщений
Прокси Kafka REST для MapR Streams позволяет приложениям публиковать сообщения в MapR Streams. Сообщения могут быть отправлены в виде JSON или двоичного содержимого (кодировка base64).
Чтобы отправить сообщение JSON:
- запрос должен быть HTTP POST
- Тип содержимого должен быть следующим: application / vnd.kafka.json.v1 + json
- тело:
01
02
03
04
05
06
07
08
09
10
11
12
13
|
{ "records" : [ { "value" : { "temp" : 10 , "speed" : 40 , "direction" : "NW" } } ] } |
Полный запрос:
1
2
3
|
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ --data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"} }]}' \ http: //localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json |
Вы должны увидеть сообщение, напечатанное в окне терминала, где работает потребитель / apps / iot-stream: sensor-json .
Чтобы отправить двоичное сообщение:
- запрос должен быть HTTP POST
- Тип содержимого должен быть следующим: application / vnd.kafka.binary.v1 + json
- тело:
1
2
3
4
5
6
7
8
|
{ "records" : [ { "value" : "SGVsbG8gV29ybGQ=" } ] } |
Обратите внимание, что SGVsbG8gV29ybGQ = — это строка «Hello World», закодированная в Base64.
Полный запрос:
1
2
3
|
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \ --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \ http: //localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary |
Вы должны увидеть сообщение, напечатанное в окне терминала, где работает потребитель / apps / iot-stream: sensor-binary .
Чтобы отправить несколько сообщений:
Поле записи тела HTTP позволяет отправлять несколько сообщений; например, вы можете отправить:
1
2
3
|
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \ --data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"} }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"} } ]}' \ http: //localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json |
Эта команда отправит 2 сообщения и увеличит смещение на 2. Вы можете сделать то же самое с двоичным содержимым, добавив новые элементы в массив JSON; например:
1
2
3
|
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \ --data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \ http: //localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary |
Как вы, вероятно, знаете, можно связать ключ с сообщением, чтобы быть уверенным, что все сообщения с одним и тем же ключом будут поступать в один и тот же раздел. Для этого добавьте в сообщение атрибут ключа следующим образом:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
{ "records" : [ { "key" : "K001" , "value" : { "temp" : 10 , "speed" : 40 , "direction" : "NW" } } ] } |
Теперь, когда вы знаете, как публиковать сообщения в теме MapR Streams с помощью REST Proxy, давайте посмотрим, как использовать эти сообщения.
Использование сообщений
Прокси REST также может использоваться для получения сообщений из тем; Для этого вам необходимо:
- Создайте потребительский экземпляр.
- Используйте URL-адрес, возвращенный первым вызовом, чтобы прочитать сообщение.
- Удалите экземпляр потребителя, если это необходимо.
Создание экземпляра потребителя
Следующий запрос создает экземпляр потребителя:
1
2
3
|
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \ --data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \ http: //localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json |
Ответ от сервера выглядит так:
1
2
3
4
|
{ "instance_id" : "iot_json_consumer" , "base_uri" : "http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer" } |
Обратите внимание, что мы использовали / consumer / [topic_name] для создания потребителя. Base_uri будет использоваться последующими запросами для получения сообщений из темы. Как и любой потребитель MapR Streams / Kafka, auto.offset.reset определяет его поведение. В этом примере значение установлено на самое раннее , что означает, что потребитель будет читать сообщения с самого начала. Вы можете найти больше информации о конфигурации потребителя в документации MapR Streams .
Использование сообщений
Чтобы использовать сообщения, просто добавьте тему MapR Streams в URL-адрес экземпляра потребителя.
Следующий запрос потребляет сообщения из темы:
1
2
|
curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \ http: //localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json |
Этот вызов возвращает сообщения в документе JSON:
1
2
3
4
5
|
[ { "key" : null , "value" :{ "temp" : 10 , "speed" : 40 , "direction" : "NW" }, "topic" : "/apps/iot-stream:sensor-json" , "partition" : 1 , "offset" : 1 }, { "key" : null , "value" :{ "temp" : 12 , "speed" : 42 , "direction" : "NW" }, "topic" : "/apps/iot-stream:sensor-json" , "partition" : 1 , "offset" : 2 }, { "key" : null , "value" :{ "temp" : 10 , "speed" : 37 , "direction" : "N" }, "topic" : "/apps/iot-stream:sensor-json" , "partition" : 1 , "offset" : 3 } ] |
Каждый вызов API возвращает новые опубликованные сообщения, основываясь на смещении последнего вызова.
Обратите внимание, что Потребитель будет уничтожен:
- после некоторого времени простоя, установленного consumer.instance.timeout.ms (значение по умолчанию 300000ms / 5 минут), оно уничтожается с помощью вызова API REST (см. ниже).
Использование сообщений в двоичном формате
Подход тот же, если вам нужно использовать двоичные сообщения: вам нужно изменить формат и заголовок Accept.
Вызовите этот URL, чтобы создать потребительский экземпляр для двоичной темы:
1
2
3
|
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \ --data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \ http: //localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary |
Затем принимать сообщения, заголовок принятия устанавливается в application / vnd.kafka.binary.v1 + json :
1
2
|
curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \ http: //localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary |
Этот вызов возвращает сообщения в документе JSON, а значение кодируется в Base64:
1
2
3
4
|
[ { "key" : null , "value" : "SGVsbG8gV29ybGQ=" , "topic" : "/apps/iot-stream:sensor-binary" , "partition" : 1 , "offset" : 1 }, { "key" : null , "value" : "Qm9uam91cg==" , "topic" : "/apps/iot-stream:sensor-binary" , "partition" : 1 , "offset" : 2 } ] |
Удалить потребительские экземпляры
Как упоминалось ранее, потребитель будет автоматически уничтожен на основе конфигурации customer.instance.timeout.ms прокси REST; также возможно уничтожить экземпляр, используя URI экземпляра-потребителя и вызов HTTP DELETE, следующим образом:
1
|
curl -X DELETE http: //localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer |
Вывод
Из этой статьи вы узнали, как использовать Raf-прокси Kafka для потоков MapR, что позволяет любому приложению использовать сообщения, опубликованные в MapR Converged Data Platform.
Вы можете найти более подробную информацию о Kafka REST Proxy в документации MapR и на следующих ресурсах:
- Начало работы с потоками MapR
- «Потоковая архитектура: новые дизайны с использованием Apache Kafka и MapR Streams», книга Теда Даннинга и Эллен Фридман
Ссылка: | Начало работы с Kafka REST Proxy для MapR Streams от нашего партнера по JCG Тугдуала Граля в блоге Mapr . |