Вступление
Apache Flink — это платформа с открытым исходным кодом для распределенной потоковой и пакетной обработки данных. Flink — это механизм потоковой передачи данных с несколькими API для создания приложения, ориентированного на потоки данных.
Приложения Flink очень часто используют Apache Kafka для ввода и вывода данных.
Эта статья предоставит вам простые шаги по использованию Apache Flink с потоками MapR . MapR Streams — это распределенная система обмена сообщениями для потоковой передачи данных о событиях в масштабе, и она интегрирована в платформу конвергентных данных MapR на основе API Apache Kafka (0.9.0)
Предпосылки
- MapR 5.2
- Вы можете использовать MapR Converged Data Platform Sandbox
- MapR Client установлен на вашем хосте разработки
- Гит
- Maven 3.x или позже
Создайте свой потоковый проект Flink
Первым шагом является создание приложения Java. Самый простой способ сделать это — использовать архетип flink-quickstart-java, который содержит основные зависимости и задачи упаковки. Эта статья похожа на пример быстрого запуска Apache Flink с четким фокусом на ввод и вывод данных с помощью MapR Streams.
В этом приложении мы создадим две вакансии:
-
WriteToKafka
: генерирует случайные строки иWriteToKafka
их в теме потоков MapR с помощью Kafka Flink Connector и его API-интерфейса производителя. -
ReadFromKafka
: он читает ту же тему и печатает сообщения в стандартном выводе, используя Kafka Flink Connector и его получателя. API.
Полный проект доступен на GitHub:
Давайте создадим проект с использованием Apache Maven:
1
2
3
4
5
6
7
8
|
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink\ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion= 1.1 . 0 \ -DgroupId=com.mapr.demos \ -DartifactId=mapr-streams-flink-demo \ -Dversion= 1.0 -SNAPSHOT \ -DinteractiveMode= false |
Maven создаст следующую структуру:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
$ tree mapr-streams-flink-demo/ mapr-streams-flink-demo/ ├── pom.xml └── src └── main ├── java │ └── com │ └── mapr │ └── demos │ ├── BatchJob.java │ ├── SocketTextStreamWordCount.java │ ├── StreamingJob.java │ └── WordCount.java └── resources └── log4j.properties |
Этот проект настроен на создание файла Jar, который содержит код проекта Flink, а также включает все зависимости, необходимые для его запуска.
Проект содержит несколько других примеров работ. Они нам не нужны для этой статьи, поэтому вы можете оставить их в образовательных целях или просто удалить их из проекта.
Добавить зависимости потоков Kafka и MapR
Откройте pom.xml
и добавьте в свой проект следующие зависимости:
1- Добавить репозиторий MapR Maven
в элемент <repositories>
добавьте:
1
2
3
4
5
6
|
< repository > < id >mapr-releases</ id > < snapshots >< enabled >false</ enabled ></ snapshots > < releases >< enabled >true</ enabled ></ releases > </ repository > |
2- Добавить библиотеки MapR Streams
в <dependencies>
:
01
02
03
04
05
06
07
08
09
10
|
< dependency > < groupId >com.mapr.streams</ groupId > < artifactId >mapr-streams</ artifactId > < version >5.2.0-mapr</ version > </ dependency > < dependency > < groupId >org.apache.kafka</ groupId > < artifactId >kafka-clients</ artifactId > < version >0.9.0.0-mapr-1602</ version > </ dependency > |
3- Добавить библиотеки Flink Kafka Connector
В качестве первого шага мы должны добавить соединитель Flink Kafka в качестве зависимости, чтобы мы могли использовать приемник Kafka. Добавьте это в файл pom.xml в разделе зависимостей:
Теперь вы должны добавить зависимость Flink Kafka Connector, чтобы использовать приемник Kafka. Добавьте следующую запись в элемент <dependencies>
:
1
2
3
4
5
|
< dependency > < groupId >org.apache.flink</ groupId > < artifactId >flink-connector-kafka-0.9_2.10</ artifactId > < version >${flink.version}</ version > </ dependency > |
4 — Исключить клиент Kafka, чтобы разрешить использование клиента MapR Streams
Как вы, возможно, знаете, MapR Streams использует API Kafka 0.9.0 для создания и потребления сообщений. Поэтому нам нужно удалить (исключить) клиентский API Apache Kafka, чтобы быть уверенным, что Flink может использовать потоки MapR.
В зависимости Flink Kafka Connector добавьте следующее исключение:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
< dependency > < groupId >org.apache.flink</ groupId > < artifactId >flink-connector-kafka-0.9_2.10</ artifactId > < version >${flink.version}</ version > < exclusions > < exclusion > < groupId >org.apache.kafka</ groupId > < artifactId >kafka-clients</ artifactId > </ exclusion > < exclusion > < groupId >org.apache.kafka</ groupId > < artifactId >kafka_2.10</ artifactId > </ exclusion > </ exclusions > </ dependency > |
Теперь проект Flink готов использовать DataStream с использованием Kafka Connector, чтобы вы могли отправлять и получать сообщения из потоков MapR.
Давайте теперь создадим Stream в MapR и напишем простой код Flink для его использования.
Создайте потоки MapR и тему
Поток — это набор тем, которыми вы можете управлять как группой:
- Настройка политик безопасности, которые применяются ко всем темам в этом потоке
- Установка количества разделов по умолчанию для каждой новой темы, созданной в потоке
- Установите время жизни для сообщений в каждой теме в потоке
Вы можете найти больше информации о концепциях MapR Streams в документации .
В вашем Mapr Cluster или Sandbox выполните следующие команды:
1
2
|
$ maprcli stream create -path /apps/application-stream -produceperm p -consumeperm p -topicperm p $ maprcli stream topic create -path /apps/application-stream -topic flink-demo |
Установите и используйте утилиты MapR Kafka
Установите пакет mapr-kafka
в вашем кластере:
1
|
yum install mapr-kafka |
Откройте два окна терминала и запустите утилиты Kafka производителя и потребителя, используя следующие команды:
Режиссер
1
|
/opt/mapr/kafka/kafka- 0.9 . 0 /bin/kafka-console-producer.sh --broker-list this .will.be.ignored: 9092 --topic /apps/application-stream:flink-demo= |
потребитель
1
|
/opt/mapr/kafka/kafka- 0.9 . 0 /bin/kafka-console-consumer.sh -- new -consumer --bootstrap-server this .will.be.ignored: 9092 --topic /apps/application-stream:flink-demo |
В окне производителя вы можете опубликовать некоторые сообщения и просмотреть их в пользовательских окнах. Мы будем использовать эти инструменты для отслеживания взаимодействия между потоками MapR и Flink.
Напишите ваше заявление Flink
Теперь давайте используем Flink Kafka Connector для отправки сообщений в потоки MapR и их использования.
Режиссер
Производитель генерирует сообщения, используя SimpleStringGenerator()
и отправляет строку в /apps/application-stream:flink-demo
.
01
02
03
04
05
06
07
08
09
10
11
12
|
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); // properties.setProperty("bootstrap.servers", "<kafka-broker>:9092"); // not used by MapR Streams properties.setProperty( "streams.buffer.max.time.ms" , "200" ); DataStream<String> stream = env.addSource( new SimpleStringGenerator()); stream.addSink( new FlinkKafkaProducer09<>( "/apps/application-stream:flink-demo" , new SimpleStringSchema(), properties)); env.execute(); } |
Код SimpleStringGenerator()
доступен здесь .
Основные шаги:
- создать новую
StreamExecutionEnvironment
на основе любого приложения Flink - создать новый
DataStream
в среде приложения, классSimpleStringGenerator
реализует[SourceFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/)
базовый интерфейс для всех потоковые источники данных в Flink. - добавьте приемник
FlinkKafkaProducer09
в потоки; Поскольку MapR Streams основан на Kafka API 0.9, можно использовать класс FlinkKafkaProducer09; с 2 небольшими отличиями:- список посредников (первый параметр) не используется, поскольку потоки MapR используют местоположение кластера, определенное в классе
/opt/mapr/conf/mapr-clusters.conf
. - имя темы включает путь и имя потока MapR Stream, в котором расположена тема, например
/apps/application-stream:flink-demo
- список посредников (первый параметр) не используется, поскольку потоки MapR используют местоположение кластера, определенное в классе
потребитель
Потребитель просто читает сообщения из раздела / apps / application-stream: flink-demo и печатает их в консоли.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public static void main(String[] args) throws Exception { // create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); // properties.setProperty("bootstrap.servers", "<kafka-broker>:9092"); // not used by MapR Streams properties.setProperty( "group.id" , "flink_consumer" ); DataStream<String> stream = env.addSource( new FlinkKafkaConsumer09<>( "/apps/application-stream:flink-demo" , new SimpleStringSchema(), properties) ); stream.rebalance().map( new MapFunction<String, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(String value) throws Exception { return "Stream Value: " + value; } }).print(); env.execute(); } |
Основные шаги:
- Создайте новую
StreamExecutionEnvironment
, которая является основой любого приложения Flink. - Создайте набор свойств с информацией о потребителе. В этом приложении мы можем установить только потребительский
group.id
. Обратите внимание, что свойствоbootstrap.servers
не используется MapR Streams, поэтому нет необходимости устанавливать его. - Используйте
FlinkKafkaConsumer09
чтобы получить сообщение из темы MapR Streams./apps/application-stream:flink-demo
Создайте и запустите приложение
Давайте запустим приложение прямо из Maven (или из вашей любимой IDE).
1- Создайте проект:
1
|
$ mvn clean package |
2- Запустите работу Flink Producer.
1
|
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka |
3- Выполнить работу потребителя Flink
1
|
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka |
В терминале вы должны увидеть сообщения, сгенерированные от производителя
Теперь вы можете развернуть и выполнить это задание на своем кластере Flink.
Вывод
Из этой статьи вы узнали, как использовать Flink с потоками MapR для записи и чтения потоков данных. Ключевым элементом является настройка зависимостей Maven для настройки проекта для использования библиотек MapR Streams вместо библиотек Kafka.
Ссылка: | Начало работы с Apache Flink на платформе конвергентных данных MapR от нашего партнера по JCG Тугдуала Граля в блоге Mapr . |