Статьи

Начало работы с Apache Flink на платформе конвергентных данных MapR

Вступление

Apache Flink — это платформа с открытым исходным кодом для распределенной потоковой и пакетной обработки данных. Flink — это механизм потоковой передачи данных с несколькими API для создания приложения, ориентированного на потоки данных.

Приложения Flink очень часто используют Apache Kafka для ввода и вывода данных.

Эта статья предоставит вам простые шаги по использованию Apache Flink с потоками MapR . MapR Streams — это распределенная система обмена сообщениями для потоковой передачи данных о событиях в масштабе, и она интегрирована в платформу конвергентных данных MapR на основе API Apache Kafka (0.9.0)

Предпосылки

Создайте свой потоковый проект Flink

Первым шагом является создание приложения Java. Самый простой способ сделать это — использовать архетип flink-quickstart-java, который содержит основные зависимости и задачи упаковки. Эта статья похожа на пример быстрого запуска Apache Flink с четким фокусом на ввод и вывод данных с помощью MapR Streams.

В этом приложении мы создадим две вакансии:

  • WriteToKafka : генерирует случайные строки и WriteToKafka их в теме потоков MapR с помощью Kafka Flink Connector и его API-интерфейса производителя.
  • ReadFromKafka : он читает ту же тему и печатает сообщения в стандартном выводе, используя Kafka Flink Connector и его получателя. API.

Полный проект доступен на GitHub:

Давайте создадим проект с использованием Apache Maven:

1
2
3
4
5
6
7
8
mvn archetype:generate \
      -DarchetypeGroupId=org.apache.flink\
      -DarchetypeArtifactId=flink-quickstart-java \
      -DarchetypeVersion=1.1.0 \
      -DgroupId=com.mapr.demos \
      -DartifactId=mapr-streams-flink-demo \
      -Dversion=1.0-SNAPSHOT \
      -DinteractiveMode=false

Maven создаст следующую структуру:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
$ tree mapr-streams-flink-demo/
mapr-streams-flink-demo/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── mapr
        │           └── demos
        │               ├── BatchJob.java
        │               ├── SocketTextStreamWordCount.java
        │               ├── StreamingJob.java
        │               └── WordCount.java
        └── resources
            └── log4j.properties

Этот проект настроен на создание файла Jar, который содержит код проекта Flink, а также включает все зависимости, необходимые для его запуска.

Проект содержит несколько других примеров работ. Они нам не нужны для этой статьи, поэтому вы можете оставить их в образовательных целях или просто удалить их из проекта.

Добавить зависимости потоков Kafka и MapR

Откройте pom.xml и добавьте в свой проект следующие зависимости:

1- Добавить репозиторий MapR Maven

в элемент <repositories> добавьте:

1
2
3
4
5
6
<repository>
     <id>mapr-releases</id>
     <snapshots><enabled>false</enabled></snapshots>
     <releases><enabled>true</enabled></releases>
   </repository>

2- Добавить библиотеки MapR Streams

в <dependencies> :

01
02
03
04
05
06
07
08
09
10
<dependency>
   <groupId>com.mapr.streams</groupId>
   <artifactId>mapr-streams</artifactId>
   <version>5.2.0-mapr</version>
 </dependency>
 <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.9.0.0-mapr-1602</version>
 </dependency>

3- Добавить библиотеки Flink Kafka Connector

В качестве первого шага мы должны добавить соединитель Flink Kafka в качестве зависимости, чтобы мы могли использовать приемник Kafka. Добавьте это в файл pom.xml в разделе зависимостей:

Теперь вы должны добавить зависимость Flink Kafka Connector, чтобы использовать приемник Kafka. Добавьте следующую запись в элемент <dependencies> :

1
2
3
4
5
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
      <version>${flink.version}</version>
 </dependency>

4 — Исключить клиент Kafka, чтобы разрешить использование клиента MapR Streams

Как вы, возможно, знаете, MapR Streams использует API Kafka 0.9.0 для создания и потребления сообщений. Поэтому нам нужно удалить (исключить) клиентский API Apache Kafka, чтобы быть уверенным, что Flink может использовать потоки MapR.

В зависимости Flink Kafka Connector добавьте следующее исключение:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
    <version>${flink.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka_2.10</artifactId>
        </exclusion>
      </exclusions>
  </dependency>

Теперь проект Flink готов использовать DataStream с использованием Kafka Connector, чтобы вы могли отправлять и получать сообщения из потоков MapR.

Давайте теперь создадим Stream в MapR и напишем простой код Flink для его использования.

Создайте потоки MapR и тему

Поток — это набор тем, которыми вы можете управлять как группой:

  1. Настройка политик безопасности, которые применяются ко всем темам в этом потоке
  2. Установка количества разделов по умолчанию для каждой новой темы, созданной в потоке
  3. Установите время жизни для сообщений в каждой теме в потоке

Вы можете найти больше информации о концепциях MapR Streams в документации .

В вашем Mapr Cluster или Sandbox выполните следующие команды:

1
2
$ maprcli stream create -path /apps/application-stream -produceperm p -consumeperm p -topicperm p
$ maprcli stream topic create -path /apps/application-stream -topic flink-demo

Установите и используйте утилиты MapR Kafka

Установите пакет mapr-kafka в вашем кластере:

1
yum install mapr-kafka

Откройте два окна терминала и запустите утилиты Kafka производителя и потребителя, используя следующие команды:

Режиссер

1
/opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-producer.sh --broker-list this.will.be.ignored:9092 --topic /apps/application-stream:flink-demo=

потребитель

1
/opt/mapr/kafka/kafka-0.9.0/bin/kafka-console-consumer.sh --new-consumer --bootstrap-server this.will.be.ignored:9092 --topic /apps/application-stream:flink-demo

В окне производителя вы можете опубликовать некоторые сообщения и просмотреть их в пользовательских окнах. Мы будем использовать эти инструменты для отслеживания взаимодействия между потоками MapR и Flink.

Напишите ваше заявление Flink

Теперь давайте используем Flink Kafka Connector для отправки сообщений в потоки MapR и их использования.

Режиссер

Производитель генерирует сообщения, используя SimpleStringGenerator() и отправляет строку в /apps/application-stream:flink-demo .

01
02
03
04
05
06
07
08
09
10
11
12
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
    Properties properties = new Properties();
    // properties.setProperty("bootstrap.servers", "<kafka-broker>:9092"); // not used by MapR Streams
    properties.setProperty("streams.buffer.max.time.ms", "200");
 
    DataStream<String> stream = env.addSource(new SimpleStringGenerator());
    stream.addSink(new FlinkKafkaProducer09<>("/apps/application-stream:flink-demo", new SimpleStringSchema(), properties));
 
    env.execute();
  }

Код SimpleStringGenerator() доступен здесь .

Основные шаги:

  • создать новую StreamExecutionEnvironment на основе любого приложения Flink
  • создать новый DataStream в среде приложения, класс SimpleStringGenerator реализует [SourceFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/) базовый интерфейс для всех потоковые источники данных в Flink.
  • добавьте приемник FlinkKafkaProducer09 в потоки; Поскольку MapR Streams основан на Kafka API 0.9, можно использовать класс FlinkKafkaProducer09; с 2 небольшими отличиями:
    • список посредников (первый параметр) не используется, поскольку потоки MapR используют местоположение кластера, определенное в классе /opt/mapr/conf/mapr-clusters.conf .
    • имя темы включает путь и имя потока MapR Stream, в котором расположена тема, например /apps/application-stream:flink-demo

потребитель

Потребитель просто читает сообщения из раздела / apps / application-stream: flink-demo и печатает их в консоли.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
    Properties properties = new Properties();
    // properties.setProperty("bootstrap.servers", "<kafka-broker>:9092"); // not used by MapR Streams
    properties.setProperty("group.id", "flink_consumer");
 
    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>(
        "/apps/application-stream:flink-demo", new SimpleStringSchema(), properties) );
 
    stream.rebalance().map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;
 
      @Override
      public String map(String value) throws Exception {
        return "Stream Value: " + value;
      }
    }).print();
 
    env.execute();
  }

Основные шаги:

  • Создайте новую StreamExecutionEnvironment , которая является основой любого приложения Flink.
  • Создайте набор свойств с информацией о потребителе. В этом приложении мы можем установить только потребительский group.id . Обратите внимание, что свойство bootstrap.servers не используется MapR Streams, поэтому нет необходимости устанавливать его.
  • Используйте FlinkKafkaConsumer09 чтобы получить сообщение из темы MapR Streams. /apps/application-stream:flink-demo

Создайте и запустите приложение

Давайте запустим приложение прямо из Maven (или из вашей любимой IDE).

1- Создайте проект:

1
$ mvn clean package

2- Запустите работу Flink Producer.

1
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka

3- Выполнить работу потребителя Flink

1
$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka

В терминале вы должны увидеть сообщения, сгенерированные от производителя

Теперь вы можете развернуть и выполнить это задание на своем кластере Flink.

Вывод

Из этой статьи вы узнали, как использовать Flink с потоками MapR для записи и чтения потоков данных. Ключевым элементом является настройка зависимостей Maven для настройки проекта для использования библиотек MapR Streams вместо библиотек Kafka.