Потоковая передача данных представляет растущий интерес для многих организаций, и большинству приложений необходимо использовать модель «производитель-потребитель» для получения и обработки данных в режиме реального времени. Сегодня на рынке существует множество решений для обмена сообщениями, но немногие из них созданы для решения проблем современного развертывания, связанных с IoT, крупными веб-приложениями и связанными с ними проектами с большими данными.
Apache Kafka был создан LinkedIn для решения этих задач и используется во многих проектах. Apache Kafka — это быстрая, масштабируемая, надежная и распределенная система обмена сообщениями.
Цель этой статьи — использовать сквозной пример и пример кода, чтобы показать вам, как:
- Установить, настроить и запустить Kafka
- Создать новые темы
- Написание и запуск Java-продюсера для публикации сообщений в темах.
- Напишите и запустите потребитель Java для чтения и обработки сообщений из тем.
кредиты
Это содержание частично основано на документации, предоставленной проектом Apache Kafka.
Мы добавили короткие, реалистичные примеры программ, которые иллюстрируют, как реальные программы пишутся с использованием Kafka.
Предпосылки
Вам понадобятся базовые навыки программирования на Java, а также доступ к:
- Апач Кафка 0.9.0
- Apache Maven 3.0 или более поздняя версия
- Гит
Установка
Шаг 1: Загрузите Кафку
Загрузите релиз Apache Kafka 0.9.0 и распакуйте его.
1
2
|
$ tar -xzf kafka_2. 11 - 0.9 . 0.0 .tgz $ cd kafka_2. 11 - 0.9 . 0.0 |
Шаг 2: Запустите сервер
Запустите сервер ZooKeeper; Kafka имеет встроенную конфигурацию Zookeeper с одним узлом.
1
2
3
4
|
$ bin/zookeeper-server-start.sh config/zookeeper.properties & [ 2016 - 02 - 08 14 : 59 : 28 , 275 ] INFO Reading configuration from: config zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [ 2016 - 02 - 08 14 : 59 : 28 , 276 ] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) ... |
Обратите внимание, что это запустит Zookeeper в фоновом режиме. Чтобы остановить Zookeeper, вам нужно вернуть его на передний план и использовать control-C, или вам нужно будет найти процесс и убить его. Теперь вы можете запустить сам сервер Kafka:
1
2
3
|
$ bin/kafka-server-start.sh config/server.properties [ 2016 - 02 - 08 15 : 10 : 29 , 945 ] INFO KafkaConfig values: .. .... |
Как и в Zookeeper, здесь работает брокер Kafka в фоновом режиме. Чтобы остановить Kafka, вам нужно вернуть его на передний план или найти процесс и уничтожить его явно, используя kill.
Шаг 3: Создайте темы для примеров программ
Сообщения организованы по темам, по которым Производители отправляют сообщения и из которых Потребители читают сообщения. В нашем примере приложения используются темы буксировки: быстрые сообщения и маркеры итогов. Следующие команды создают темы:
1
2
3
4
|
$ bin/kafka-topics.sh --create --zookeeper localhost: 2181 \ --replication-factor 1 --partitions 1 --topic fast-messages $ bin/kafka-topics.sh --create --zookeeper localhost: 2181 \ --replication-factor 1 --partitions 1 --topic summary-markers |
Они могут быть перечислены:
1
2
3
|
$ bin/kafka-topics.sh --list --zookeeper localhost: 2181 fast-messages summary-markers |
При запуске команд Kafka вы увидите сообщения журнала процесса Kafka. Вы можете переключиться на другое окно, если это отвлекает.
Примечание . Посредник может быть настроен на автоматическое создание новых тем, как они упоминаются клиентским приложением, но это часто считается немного опасным, поскольку неправильное написание имени темы не приводит к сбою.
Запустите ваше первое приложение Kafka
На этом этапе у вас должен быть работающий брокер Kafka, работающий на вашей машине. Следующие шаги должны скомпилировать примеры программ и поэкспериментировать с тем, как они работают.
1- Скомпилируйте и упакуйте примеры программ
Клонируйте и скомпилируйте репозиторий, используя следующие команды:
1
2
3
|
$ git clone https: //github.com/mapr-demos/kafka-sample-programs.git $ cd kafka-sample-programs/ $ mvn clean package |
Для удобства проект примеров программ настроен так, что цель пакета maven создает единственный исполняемый файл target / kafka-example, который включает в себя все примеры программ и зависимостей.
2- Запустите пример потребителя
Запустите потребителя с помощью следующей команды:
1
2
3
4
|
$ target/kafka-example consumer SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: //www.slf4j.org/codes.html#StaticLoggerBinder for further details. |
Теперь потребитель запущен и прослушивает все сообщения по темам быстрых сообщений и сводных маркеров.
На этом этапе ничего не должно происходить, потому что нет никаких сообщений
3- Запустите пример производителя
В новом окне терминала запустите пример производителя, используя следующую команду:
1
2
3
4
5
6
|
$ target/kafka-example producer Sent msg number 0 Sent msg number 1000 ... Sent msg number 998000 Sent msg number 999000 |
Производитель отправляет большое количество сообщений в быстрые сообщения вместе со случайными сообщениями в итоговые маркеры.
Потребитель, работающий в другом окне, получает и обрабатывает все сообщения из этих тем.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
$ target/kafka-example consumer SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" . SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http: //www.slf4j.org/codes.html#StaticLoggerBinder for further details. Got 31003 records after 0 timeouts 1 messages received in period, latency(min, max, avg, 99 %) = 20352 , 20479 , 20416.0 , 20479 (ms) 1 messages received overall, latency(min, max, avg, 99 %) = 20352 , 20479 , 20416.0 , 20479 (ms) 1000 messages received in period, latency(min, max, avg, 99 %) = 19840 , 20095 , 19968.3 , 20095 (ms) 1001 messages received overall, latency(min, max, avg, 99 %) = 19840 , 20479 , 19968.7 , 20095 (ms) ... 1000 messages received in period, latency(min, max, avg, 99 %) = 12032 , 12159 , 12119.4 , 12159 (ms) 998001 messages received overall, latency(min, max, avg, 99 %) = 12032 , 20479 , 15073.9 , 19583 (ms) 1000 messages received in period, latency(min, max, avg, 99 %) = 12032 , 12095 , 12064.0 , 12095 (ms) 999001 messages received overall, latency(min, max, avg, 99 %) = 12032 , 20479 , 15070.9 , 19583 (ms) |
Если вы снова запустите производителя, вы увидите новые сообщения в окне терминала пользователя.
Быстрый просмотр кода производителя и потребителя
На этом этапе у вас запущено Kafka, простое приложение Kafka, которое отправляет и принимает сообщения. Настало время взглянуть на код и понять, как было создано приложение.
зависимости
Чтобы создать Kafka Producer или Consumer, то есть клиентское приложение Kafka, необходимо добавить следующую зависимость в свой проект Maven:
1
2
3
4
5
|
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version> 0.9 . 0.0 </version> </dependency> |
Режиссер
Пример Producer — это классическое Java-приложение с методом main (), которое должно:
- Инициализировать и настроить производителя
- Используйте продюсера для отправки сообщений
1- инициализация производителя
Создать производителя довольно просто, вам просто нужно реализовать класс org.apache.kafka.clients.producer.KafkaProducer
с набором свойств, это выглядит так:
1
|
producer = new KafkaProducer(properties); |
В этом примере конфигурация выводится в файле свойств со следующими записями:
1
2
3
4
|
bootstrap.servers=localhost: 9092 acks=all ... block.on.buffer.full= true |
Для этого вступления наиболее важным свойством является:
- В
bootstrap.servers
перечислены хост и порт сервера / кластера Kafka, которые вы запустили ранее в этом руководстве.
Другие свойства используются для управления способом отправки и сериализации сообщений. Вы можете найти информацию обо всех свойствах в главе «Конфигурации производителя» документации Kafka .
2- Публикация сообщений
Если у вас есть экземпляр продюсера, вы можете публиковать сообщения в теме, используя класс ProducerRecord . Класс ProducerRecord является парой ключ / значение, где:
- ключ тема
- значение — это сообщение
Как вы можете догадаться, отправка сообщения в тему проста:
1
2
3
|
... producer.send( new ProducerRecord( "fast-messages" , "This is a dummy message" )); ... |
Обратите внимание, что существуют другие конструкторы ProducerRecord
которые позволяют вам использовать больше параметров конструктора, таких как ключ сообщения, номер раздела, но эти параметры не используются в этом простом учебном пособии.
Производитель примера приложения публикует сообщения, используя цикл для отправки:
- 1 сообщение за каждую итерацию в тему быстрых сообщений
- 1 маркерное сообщение каждые 1000 итераций в тему быстрых сообщений
- 1 сообщение каждые 1000 итераций в итоговые маркеры
3- Конец продюсера
Как только вы закончите с продюсером, используйте метод producer.close()
который блокирует процесс до тех пор, пока все сообщения не будут отправлены на сервер. Этот вызов используется в блоке finally, чтобы гарантировать, что он вызывается. Производитель Kafka также может быть использован в конструкции с ресурсами .
1
2
3
4
5
|
... } finally { producer.close(); } ... |
4- Продюсерское исполнение
Как упоминалось ранее, производитель является простым Java-классом, в этом примере приложение-источник запускается из приложения Run следующим образом:
1
2
3
|
... Producer.main(args); ... |
Теперь, когда вы знаете, как отправлять сообщения на сервер Kafka, давайте посмотрим на потребителя.
потребитель
Класс Consumer , как и производитель, является простым Java-классом с методом main.
Этот образец потребителя использует библиотеку HdrHistogram для записи и анализа сообщений, полученных из раздела быстрых сообщений, и Джексона для анализа сообщений JSON.
Вот почему вы видите следующие зависимости в файле pom.xml:
01
02
03
04
05
06
07
08
09
10
|
< dependency > < groupId >org.hdrhistogram</ groupId > < artifactId >HdrHistogram</ artifactId > < version >2.1.8</ version > </ dependency > < dependency > < groupId >com.fasterxml.jackson.core</ groupId > < artifactId >jackson-databind</ artifactId > < version >2.5.1</ version > </ dependency > |
Давайте теперь сосредоточимся на потребительском коде.
1- Инициализация потребителя
Первое, что нужно сделать, это создать потребительский экземпляр класса org.apache.kafka.clients.consumer.KafkaConsumer
с набором свойств, который выглядит следующим образом:
1
|
consumer = new KafkaConsumer(properties); |
В этом примере свойства выводятся в файл со следующими записями:
1
2
3
4
5
|
bootstrap.servers=localhost: 9092 group.id=test enable.auto.commit= true ... max.partition.fetch.bytes= 2097152 |
Для этого вступления наиболее важными свойствами являются:
- bootstrap.servers, который является хостом и портом сервера / кластера Kafka, который вы запустили ранее в этом руководстве
- group.id — группа потребительских процессов, к которой принадлежит этот потребитель.
Другие свойства используются для управления способом использования сообщений. Вы можете найти информацию обо всех свойствах в главе Consumer Configs документации Kafka .
2- Подписка на темы
Потребитель может подписаться на одну или несколько тем, в этом примере он будет прослушивать сообщения из двух тем, используя следующий код:
1
|
consumer.subscribe(Arrays.asList( "fast-messages" , "summary-markers" )); |
3- Расход сообщений
Теперь, когда ваш потребитель подписался на темы, он теперь может опросить сообщения из тем в цикле. Цикл выглядит так :
1
2
3
4
5
6
|
... while ( true ) { ConsumerRecords records = consumer.poll( 200 ); ... } ... |
Метод poll вызывается повторно в цикле. Для каждого звонка потребитель будет читать записи из темы. Для каждого чтения он отслеживает смещение, чтобы можно было прочитать правильное сообщение при следующем вызове. Метод poll занимает тайм-аут в миллисекундах. Он будет ждать до тех пор, пока данные не будут доступны.
Возвращаемый объект метода poll представляет собой Iterable, содержащий полученные записи, поэтому вам просто нужно зациклить каждую запись для их обработки. Код для обработки сообщений в получателе выглядит так:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
|
... for (ConsumerRecord record : records) { switch (record.topic()) { case "fast-messages" : // deal with messages from fast-messages topic ... case "summary-markers" : // deal with messages from summary-markers topic ... break ; default : } } ... |
В примере приложения потребитель обрабатывает только сообщения из раздела быстрых сообщений со следующей логикой, основанной на том факте, что сообщения используются в том порядке, в котором они были отправлены производителем:
- для каждого сообщения JSON с тестом типа задержка добавляется к статистике гистограммы
- при обработке сообщения JSON с маркером типа статистика печатается и сбрасывается .
4- потребительский конец
Когда вы закончите с потребителем, используйте метод consumer.close()
для освобождения ресурсов. Это особенно важно в многопоточном приложении. Образец потребителя не вызывает этот метод, так как он останавливается с помощью Ctrl + C, который останавливает всю JVM.
5- Продюсерское исполнение
Как упоминалось ранее, Consumer — это простой класс Java, в этом примере приложение, которое потребитель запускает из Run Application, выглядит следующим образом:
1
2
3
|
... Consumer.main(args); ... |
Вывод
Из этой статьи вы узнали, как создать простое приложение Kafka 0.9.x, используя:
- производитель, который публикует сообщения JSON в нескольких темах
- потребитель, который получает сообщения JSON и вычисляет статистику по содержимому сообщения.
Это приложение очень простое, и вы можете расширить его, чтобы протестировать некоторые другие интересные функции Kafka:
- добавьте новых потребителей, используя разные группы, для обработки данных по-разному, например, для сохранения данных в базе данных NoSQL, такой как HBase или MapR-DB
- добавьте новые разделы и потребителей по темам, чтобы обеспечить высокую доступность вашего приложения.
Наконец, хотя этот пример основан на Apache Kafka, этот же код будет работать непосредственно в кластере MapR с использованием MapR-Streams , интегрированной системы обмена сообщениями, совместимой с API Kafka 0.9.0. С MapR-Streams вы упростите производственное развертывание своего приложения, поскольку оно интегрировано в платформу данных MapR, так что у вас будет один кластер для развертывания и управления. Использование MapR Streams для обмена сообщениями также обеспечивает дополнительную масштабируемость, безопасность и службы больших данных, связанные с платформой конвергентных данных MapR .
Ссылка: | Начало работы с примерами программ для Apache Kafka 0.9 от нашего партнера по JCG Тугдуала Граля в блоге Mapr . |