Учебники

Apache Flink — Создание приложения Flink

В этой главе мы узнаем, как создать приложение Flink.

Откройте Eclipse IDE, нажмите «Новый проект» и выберите «Java Project».

Создать приложение Flink

Дайте имя проекта и нажмите «Готово».

Создать приложение Flink2

Теперь нажмите Finish, как показано на следующем скриншоте.

Создать приложение Flink3

Теперь щелкните правой кнопкой мыши на src и перейдите в New >> Class.

Создать приложение Flink4

Дайте имя классу и нажмите «Готово».

Создать приложение Flink5

Скопируйте и вставьте приведенный ниже код в редактор.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {

   // *************************************************************************
   // PROGRAM
   // *************************************************************************
   public static void main(String[] args) throws Exception {
      final ParameterTool params = ParameterTool.fromArgs(args);
      // set up the execution environment
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);
      // get input data
      DataSet<String> text = env.readTextFile(params.get("input"));
      DataSet<Tuple2<String, Integer>> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
      // group by the tuple field "0" and sum up tuple field "1"
      .groupBy(0)
      .sum(1);
      // emit result
      if (params.has("output")) {
         counts.writeAsCsv(params.get("output"), "\n", " ");
         // execute program
         env.execute("WordCount Example");
      } else {
         System.out.println("Printing result to stdout. Use --output to specify output path.");
         counts.print();
      }
   }
   
   // *************************************************************************
   // USER FUNCTIONS
   // *************************************************************************
   public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // normalize and split the line
         String[] tokens = value.toLowerCase().split("\\W+");
         // emit the pairs
         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<>(token, 1));
            }
         }
      }
   }
}

Вы получите много ошибок в редакторе, потому что библиотеки Flink необходимо добавить в этот проект.

Добавлены библиотеки Flink

Щелкните правой кнопкой мыши по проекту >> Build Path >> Configure Build Path.

Щелкните правой кнопкой мыши Project

Выберите вкладку «Библиотеки» и нажмите «Добавить внешние файлы JAR».

Выберите библиотеки

Перейдите в каталог lib Flink, выберите все 4 библиотеки и нажмите OK.

Flinks lib каталог

Перейдите на вкладку «Порядок и экспорт», выберите все библиотеки и нажмите «ОК».

Вкладка «Заказ и экспорт»

Вы увидите, что ошибок больше нет.

Теперь давайте экспортируем это приложение. Щелкните правой кнопкой мыши по проекту и выберите «Экспорт».

Экспортировать это приложение

Выберите файл JAR и нажмите «Далее»

Выберите файл JAR

Укажите путь к месту назначения и нажмите «Далее».

путь назначения

Нажмите на Далее>

Нажмите кнопку "Далее

Нажмите «Обзор», выберите основной класс (WordCount) и нажмите «Готово».

Нажмите Готово

Примечание. Нажмите «ОК», если вы получили предупреждение.

Запустите приведенную ниже команду. В дальнейшем будет запущено приложение Flink, которое вы только что создали.