Статьи

Конфигурируемая обработка ETL с использованием Apache Storm и Kite SDK Morphlines

С первых дней работы инженером-программистом, я всегда слышу один и тот же запрос от многих сторон :

« Мы хотим, чтобы все было настраиваемо, мы хотим изменить все во время выполнения и хотим, чтобы у нас был визуальный инструмент для применения всей этой логики, чтобы люди, не являющиеся разработчиками, использовали и настраивали наше приложение. »

Мне также нравится эта общая область применения, но, как мы все знаем, программные системы не так легко адаптируются, а запросы клиентов нестабильны.

В предыдущие годы мы создавали такие настраиваемые приложения (не настраиваемые на 100%) с использованием традиционных сред / технологий (JMX, распределенный кеш, Spring или JEE и т. Д.).

В последние годы в нашу архитектуру вошла дополнительная концепция, это концепция больших данных (или 3В или 4В или любые другие слова, которые подходят лучше). Эта новая концепция не одобряет различные решения или обходные пути, знакомые нам и применяемые в старых трехуровневых приложениях.

Самое смешное, что много раз я оказывался в том же положении, что и 10 лет назад. Это правило разработки программного обеспечения, оно никогда не заканчивается, и поэтому личное совершенство и новые приключения никогда не заканчиваются 🙂

Основная проблема остается той же, как построить настраиваемое распределенное приложение ETL .

По этой причине я создал мини-адаптируемое решение, которое может быть полезным во многих случаях использования. В мире больших данных я использовал 3 распространенных инструмента: Java , Apache Storm и Kite SDK Morplines . Java как основной язык программирования, Apache Storm как механизм распределенной потоковой обработки и Kite SDK Morphlines как настраиваемый механизм ETL.

Kite SDK Morplines

Скопировано из описания: Morphlines — это платформа с открытым исходным кодом, которая сокращает время и усилия, необходимые для создания и изменения потоковых приложений Hadoop ETL, которые извлекают, преобразуют и загружают данные в Apache Solr, HBase, HDFS, корпоративные хранилища данных или аналитические онлайн-панели мониторинга. , Морфлайн — это богатый файл конфигурации, который позволяет легко определить цепочку преобразований, которая использует данные любого типа из любого источника данных, обрабатывает данные и загружает результаты в компонент Hadoop. Он заменяет программирование на Java простыми шагами по настройке и соответственно снижает затраты и усилия по интеграции, связанные с разработкой и сопровождением пользовательских проектов ETL.

Помимо встроенных команд , вы можете легко реализовать свою собственную Команду и использовать ее в своем файле конфигурации morphline.

Пример конфигурации Morphline, которая читает строку JSON, анализирует ее, а затем просто регистрирует определенный элемент JSON:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
morphlines : [{
    id : json_terminal_log
    importCommands : ["org.kitesdk.**"]
     
    commands : [
            # read the JSON blob
            { readJson: {} }
 
            # extract JSON objects into head fields
            { extractJsonPaths {
              flatten: true
              paths: {
                name: /name
                age: /age
              }
            } }
 
            # log data
            { logInfo {
                format : "name: {}, record: {}"
                args : ["@{name}", "@{}"]
            }}
    ]
}]

Storm Morphlines Bolt

Чтобы использовать Morphlines внутри Storm, я реализовал пользовательский MorphlinesBolt . Основными обязанностями этого болта являются:

  • Инициализируйте обработчик Morphlines через файл конфигурации
  • Инициализация картографических инструкций:
    а) от входа Tuple к Morphline и
    б) от выхода Morphline до нового вывода Tuple
  • Обработайте каждое входящее событие, используя уже инициализированный контекст Morplines
  • Если Bolt не является Терминалом , то, используя предоставленный Mapper (тип «b»), создайте новый Tuple, используя выходные данные выполнения Morphline

Простые конфигурируемые топологии ETL

Чтобы протестировать пользовательский MorphlinesBolt , я написал 2 простых теста. В этих тестах вы можете увидеть, как инициализируется MorphlinesBolt, а затем результат каждого выполнения. В качестве входных данных я использовал пользовательский Spout (RandomJsonTestSpout), который просто генерирует новые строки JSON каждые 100 мс (настраивается).

DummyJsonTerminalLogTopology

Простая топология, которая конфигурирует контекст Morphline через файл конфигурации и обработчик выполнения Morphline для каждого входящего кортежа. В этой топологии MorphlinesBolt настроен как клеммный болт, что означает, что для каждого входного кортежа не генерируется новый кортеж.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DummyJsonTerminalLogTopology {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
 
        RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);
 
        String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();
        tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);
 
        MorphlinesBolt morphBolt = new MorphlinesBolt()
                .withTupleMapper(tuppleMapper)
                .withMorphlineId("json_terminal_log")
                .withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");
 
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("WORD_SPOUT", spout, 1);
        builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");
 
        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("MyDummyJsonTerminalLogTopology");
            cluster.shutdown();
            System.exit(0);
        } else if (args.length == 1) {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");
        }
    }
}

DummyJson2StringTopology

Простая топология, которая конфигурирует контекст Morphline через файл конфигурации и обработчик выполнения Morphline для каждого входящего кортежа. В этой топологии MorphlinesBolt настроен как обычный болт, что означает, что для каждого входного кортежа он генерирует новый кортеж.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class DummyJson2StringTopology {
 
    public static void main(String[] args) throws Exception {
        Config config = new Config();
 
        RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);
 
        String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();
        tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);
 
        MorphlinesBolt morphBolt = new MorphlinesBolt()
                .withTupleMapper(tuppleMapper)
                .withMorphlineId("json2string")
                .withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")
                //.withOutputProcessors(Arrays.asList(resultRecordHandlers));
                .withOutputFields(CmnStormCons.TUPLE_FIELD_MSG)
                .withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));
 
        LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);
 
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("WORD_SPOUT", spout, 1);
        builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");
        builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");
 
        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("MyDummyJson2StringTopology");
            cluster.shutdown();
            System.exit(0);
        } else if (args.length == 1) {
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            System.out.println("Usage: DummyJson2StringTopology <topology_name>");
        }
    }
}

Последние мысли

MorphlinesBolt может использоваться как часть любого конфигурируемого «решения» ETL (как отдельный болт обработки, как болт терминала, как часть сложного конвейера и т. Д.).

morphlines_storm_topology_examples

Исходный код предоставляется в виде модуля maven ( sv-etl-storm-morphlines ) в моей коллекции примеров проектов в github.

Отличная комбинация — использовать MorphlinesBolt с Flux . Это может дать вам полностью настраиваемую топологию ETL !!!
Я еще не добавил в качестве опции, чтобы сохранить меньше зависимостей (я могу добавить с областью «test»).

Этот модуль не является окончательным, и я постараюсь улучшить его, поэтому вы найдете много ошибок в этой первой реализации.

Для каких-либо дополнительных мыслей или разъяснений, пожалуйста, напишите комментарий 🙂

Это мой первый пост в 2016 году! Надеюсь, у вас крепкое здоровье и лучшие мысли и поступки. Первыми добродетелями / ценностями всего является человек и уважение к окружающей среде, в которой мы все живем (общество, земля, животные, растения и т. Д.). Все остальные являются второстепенными приоритетами и не должны разрушать то, что подразумевается под первыми приоритетами. Всегда держите в уме ваши самые важные добродетели и учитывайте их в любом действии или мысли, которую вы делаете.