Вступление
Apache Flink — это платформа с открытым исходным кодом для распределенной потоковой и пакетной обработки данных. Flink — это механизм потоковой передачи данных с несколькими API для создания приложения, ориентированного на потоки данных.
Приложения Flink очень часто используют Apache Kafka для ввода и вывода данных.
Эта статья предоставит вам простые шаги по использованию Apache Flink с потоками MapR . MapR Streams — это распределенная система обмена сообщениями для потоковой передачи данных о событиях в масштабе, и она интегрирована в платформу конвергентных данных MapR на основе API Apache Kafka (0.9.0)
Предпосылки
- MapR 5.2
- Вы можете использовать MapR Converged Data Platform Sandbox
- MapR Client установлен на вашем хосте разработки
- Гит
- Maven 3.x или позже
Создайте свой потоковый проект Flink
Первым шагом является создание приложения Java. Самый простой способ сделать это — использовать архетип flink-quickstart-java, который содержит основные зависимости и задачи упаковки. Эта статья похожа на пример быстрого запуска Apache Flink с четким фокусом на ввод и вывод данных с помощью MapR Streams.
В этом приложении мы создадим две вакансии:
-
WriteToKafka: генерирует случайные строки иWriteToKafkaих в теме потоков MapR с помощью Kafka Flink Connector и его API-интерфейса производителя. -
ReadFromKafka: он читает ту же тему и печатает сообщения в стандартном выводе, используя Kafka Flink Connector и его получателя. API.
Полный проект доступен на GitHub:
Давайте создадим проект с использованием Apache Maven:
|
1
2
3
4
5
6
7
8
|
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink\ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.1.0 \ -DgroupId=com.mapr.demos \ -DartifactId=mapr-streams-flink-demo \ -Dversion=1.0-SNAPSHOT \ -DinteractiveMode=false |
Maven создаст следующую структуру:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
$ tree mapr-streams-flink-demo/mapr-streams-flink-demo/├── pom.xml└── src └── main ├── java │ └── com │ └── mapr │ └── demos │ ├── BatchJob.java │ ├── SocketTextStreamWordCount.java │ ├── StreamingJob.java │ └── WordCount.java └── resources └── log4j.properties |
Этот проект настроен на создание файла Jar, который содержит код проекта Flink, а также включает все зависимости, необходимые для его запуска.
Проект содержит несколько других примеров работ. Они нам не нужны для этой статьи, поэтому вы можете оставить их в образовательных целях или просто удалить их из проекта.
Добавить зависимости потоков Kafka и MapR
Откройте pom.xml и добавьте в свой проект следующие зависимости:
1- Добавить репозиторий MapR Maven
в элемент <repositories> добавьте:
|
1
2
3
4
5
6
|
<repository> <id>mapr-releases</id> <snapshots><enabled>false</enabled></snapshots> <releases><enabled>true</enabled></releases> </repository> |
2- Добавить библиотеки MapR Streams
в <dependencies> :
|
01
02
03
04
05
06
07
08
09
10
|
<dependency> <groupId>com.mapr.streams</groupId> <artifactId>mapr-streams</artifactId> <version>5.2.0-mapr</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0-mapr-1602</version> </dependency> |
3- Добавить библиотеки Flink Kafka Connector
В качестве первого шага мы должны добавить соединитель Flink Kafka в качестве зависимости, чтобы мы могли использовать приемник Kafka. Добавьте это в файл pom.xml в разделе зависимостей:
Теперь вы должны добавить зависимость Flink Kafka Connector, чтобы использовать приемник Kafka. Добавьте следующую запись в элемент <dependencies> :
|
1
2
3
4
5
|
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>${flink.version}</version> </dependency> |
4 — Исключить клиент Kafka, чтобы разрешить использование клиента MapR Streams
Как вы, возможно, знаете, MapR Streams использует API Kafka 0.9.0 для создания и потребления сообщений. Поэтому нам нужно удалить (исключить) клиентский API Apache Kafka, чтобы быть уверенным, что Flink может использовать потоки MapR.
В зависимости Flink Kafka Connector добавьте следующее исключение:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
|
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> </exclusion> </exclusions> </dependency> |
Теперь проект Flink готов использовать DataStream с использованием Kafka Connector, чтобы вы могли отправлять и получать сообщения из потоков MapR.
Давайте теперь создадим Stream в MapR и напишем простой код Flink для его использования.
Создайте потоки MapR и тему
Поток — это набор тем, которыми вы можете управлять как группой:
- Настройка политик безопасности, которые применяются ко всем темам в этом потоке
- Установка количества разделов по умолчанию для каждой новой темы, созданной в потоке
- Установите время жизни для сообщений в каждой теме в потоке
Вы можете найти больше информации о концепциях MapR Streams в документации .
В вашем Mapr Cluster или Sandbox выполните следующие команды:
|
1
2
|
$ maprcli stream create -path /apps/application-stream -produceperm p -consumeperm p -topicperm p$ maprcli stream topic create -path /apps/application-stream -topic flink-demo |
Установите и используйте утилиты MapR Kafka
Установите пакет mapr-kafka в вашем кластере:
|
1
|
yum install mapr-kafka |
Откройте два окна терминала и запустите утилиты Kafka производителя и потребителя, используя следующие команды:
Режиссер
|
1
|
/opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-producer.sh --broker-list this.will.be.ignored:9092 --topic /apps/application-stream:flink-demo= |
потребитель
|
1
|
/opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/application-stream:flink-demo |
В окне производителя вы можете опубликовать некоторые сообщения и просмотреть их в пользовательских окнах. Мы будем использовать эти инструменты для отслеживания взаимодействия между потоками MapR и Flink.
Напишите ваше заявление Flink
Теперь давайте используем Flink Kafka Connector для отправки сообщений в потоки MapR и их использования.
Режиссер
Производитель генерирует сообщения, используя SimpleStringGenerator() и отправляет строку в /apps/application-stream:flink-demo .
|
01
02
03
04
05
06
07
08
09
10
11
12
|
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); // properties.setProperty("bootstrap.servers", "<kafka-broker>:9092"); // not used by MapR Streams properties.setProperty("streams.buffer.max.time.ms", "200"); DataStream<String> stream = env.addSource(new SimpleStringGenerator()); stream.addSink(new FlinkKafkaProducer09<>("/apps/application-stream:flink-demo", new SimpleStringSchema(), properties)); env.execute(); } |
Код SimpleStringGenerator() доступен здесь .
Основные шаги:
- создать новую
StreamExecutionEnvironmentна основе любого приложения Flink - создать новый
DataStreamв среде приложения, классSimpleStringGeneratorреализует[SourceFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/)базовый интерфейс для всех потоковые источники данных в Flink. - добавьте приемник
FlinkKafkaProducer09в потоки; Поскольку MapR Streams основан на Kafka API 0.9, можно использовать класс FlinkKafkaProducer09; с 2 небольшими отличиями:- список посредников (первый параметр) не используется, поскольку потоки MapR используют местоположение кластера, определенное в классе
/opt/mapr/conf/mapr-clusters.conf. - имя темы включает путь и имя потока MapR Stream, в котором расположена тема, например
/apps/application-stream:flink-demo
- список посредников (первый параметр) не используется, поскольку потоки MapR используют местоположение кластера, определенное в классе
потребитель
Потребитель просто читает сообщения из раздела / apps / application-stream: flink-demo и печатает их в консоли.
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public static void main(String[] args) throws Exception { // create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); // properties.setProperty("bootstrap.servers", "<kafka-broker>:9092"); // not used by MapR Streams properties.setProperty("group.id", "flink_consumer"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>( "/apps/application-stream:flink-demo", new SimpleStringSchema(), properties) ); stream.rebalance().map(new MapFunction<String, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(String value) throws Exception { return "Stream Value: " + value; } }).print(); env.execute(); } |
Основные шаги:
- Создайте новую
StreamExecutionEnvironment, которая является основой любого приложения Flink. - Создайте набор свойств с информацией о потребителе. В этом приложении мы можем установить только потребительский
group.id. Обратите внимание, что свойствоbootstrap.serversне используется MapR Streams, поэтому нет необходимости устанавливать его. - Используйте
FlinkKafkaConsumer09чтобы получить сообщение из темы MapR Streams./apps/application-stream:flink-demo
Создайте и запустите приложение
Давайте запустим приложение прямо из Maven (или из вашей любимой IDE).
1- Создайте проект:
|
1
|
$ mvn clean package |
2- Запустите работу Flink Producer.
|
1
|
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka |
3- Выполнить работу потребителя Flink
|
1
|
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka |
В терминале вы должны увидеть сообщения, сгенерированные от производителя
Теперь вы можете развернуть и выполнить это задание на своем кластере Flink.
Вывод
Из этой статьи вы узнали, как использовать Flink с потоками MapR для записи и чтения потоков данных. Ключевым элементом является настройка зависимостей Maven для настройки проекта для использования библиотек MapR Streams вместо библиотек Kafka.
| Ссылка: | Начало работы с Apache Flink на платформе конвергентных данных MapR от нашего партнера по JCG Тугдуала Граля в блоге Mapr . |