Статьи

Потоковая передача данных в HPCC с использованием Java

Высокопроизводительный вычислительный кластер (HPCC) — это среда распределенной обработки, похожая на Hadoop, за исключением того, что он запускает программы, написанные на его собственном доменно-специфическом языке (DSL), который называется Enterprise Control Language (ECL). ECL — это здорово, но иногда вам захочется вызвать тяжелую атлетику на других языках. Например, вы можете использовать библиотеку NLP, написанную на Java.

Кроме того, HPCC обычно работает с данными, хранящимися в файловых системах, похожих на HDFS. Как и в случае с HDFS, после перехода от обработки файлов журналов и снимков статических данных быстро возникает потребность в бэкэнде базы данных.

На самом деле, я бы сказал, что это общая тенденция отрасли: HDFS-> HBase, S3-> Redshift и т. Д. В конце концов, вы хотите уменьшить задержку аналитики (почти до нуля). Для этого вы устанавливаете какую-то распределенную базу данных, способную поддерживать как пакетную обработку, так и потоковую передачу / микропакетирование. И вы применяете неизменный / инкрементальный подход к хранению данных, который позволяет вам сворачивать инфраструктуру и передавать данные в систему во время ее анализа (упрощая все в процессе)

Но я отступаю, как шаг в этом направлении …

Мы можем использовать возможности интеграции Java в HPCC для поддержки пользовательских функций в Java. Точно так же мы можем использовать те же возможности, чтобы добавить дополнительные механизмы хранения данных (например, Cassandra). Более конкретно, давайте посмотрим на возможности потоковой передачи интеграции HPCC / Java для получения данных из внешнего источника.

Давайте сначала посмотрим на ванильную интеграцию Java.

Если у вас есть настройка среды HPCC, интеграция Java начинается с пути / opt / HPCCSystems / classes. Вы можете поместить классы и файлы JAR в это место, и функции будут доступны из ECL. Следуйте этой странице за инструкциями .

Если у вас возникнут проблемы, просмотрите руководство по устранению неполадок на этой странице. Самое сложное — заставить HPCC найти ваши классы. Для меня я столкнулся с неприятной проблемой версии jdk. По умолчанию HPCC собирал старую версию JDK на моей машине с Ubuntu. Поскольку он использовал старую версию, HPCC не смогла найти классы, скомпилированные с «новым» JDK (1.7), что привело к загадочному сообщению «Не удалось разрешить имя класса». Если вы столкнетесь с этим, извлеките представленный мной патч, чтобы исправить это для Ubuntu .

Как только у вас это заработает, вы сможете вызывать Java из ECL, используя следующий синтаксис:

1
2
3
IMPORT java;
integer add1(integer val) := IMPORT(java, 'JavaCat.add1:(I)I');
output(add1(10));

Это довольно аккуратно, и, как следует из документации, вы можете вернуть XML из метода Java, если данные сложные. Но что вы делаете, если у вас есть тонна данных, больше, чем может храниться в памяти? Ну, тогда вам нужна потоковая передача Java в HPCC. 😉

Вместо того, чтобы возвращать фактические данные из импортированного метода, мы возвращаем java Iterator. Затем HPCC использует итератор для создания набора данных. Ниже приведен пример итератора.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
 
public class DataStream implements Iterator {
    private int position = 0;
    private int size = 5;
 
    public static Iterator stream(String foo, String bar){
        return new DataStream();
    }
 
    @Override
    public boolean hasNext() {
        position++;
        return (position < size);
    }
 
    @Override
    public Row next() {
        return new Row("row");
    }
 
    @Override
    public void remove() {
    }
 
}

Это стандартный итератор, но обратите внимание, что он возвращает объект Row, который определяется следующим образом:

01
02
03
04
05
06
07
08
09
10
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
 
public class Row {
    private String value;
    public Row(String value){
       this.value = value;
    }
}

Объект является бобом Java. HPCC установит значения переменных-членов при отображении в DATASET. Чтобы увидеть, как именно это происходит, давайте посмотрим на код ECL:

1
2
3
4
5
6
7
8
9
IMPORT java;
 
rowrec := record
  string value;
end;
 
DATASET(rowrec) stream() := IMPORT(java, 'DataStream.stream:(Ljava/lang/String;Ljava/lang/String;)Ljava/util/Iterator;');
 
output(stream());

После оператора import мы определяем тип записи с именем rowrec . В следующей строке мы импортируем UDF и набираем результат как DATASET, который содержит rowrecs . Имена полей в rowrec должны совпадать с именами переменных-членов в Java-бине. HPCC будет использовать итератор и заполнит набор данных возвратом метода next (). Последняя строка ECL выводит возвращенные результаты.

Я поместил весь приведенный выше код в репозиторий github с некоторыми инструкциями по его запуску. Повеселись.

Оставайтесь с нами, чтобы узнать больше …

Представьте себе объединение возможностей потоковой передачи Java, описанных здесь, с возможностью потоковой передачи данных из Кассандры, как подробно описано в моем предыдущем посте . Результатом является мощное средство запуска пакетной аналитики с использованием Thor для данных, хранящихся в Cassandra (с локальностью данных!)… (Возможно, включение заданий ECL для данных, поступающих через потоки событий реального времени! =)