Статьи

Потоковая передача данных в HPCC с использованием Java

Высокопроизводительный вычислительный кластер (HPCC) — это инфраструктура распределенной обработки, похожая на Hadoop, за исключением того, что он запускает программы, написанные на его собственном доменно-специфическом языке (DSL), который называется Enterprise Control Language (ECL). ECL — это здорово, но иногда вам захочется вызвать тяжелую атлетику на других языках. Например, вы можете использовать библиотеку NLP, написанную на Java.

Кроме того, HPCC обычно работает с данными, хранящимися в файловых системах, похожих на HDFS. Как и в случае с HDFS, после перехода от обработки файлов журналов и снимков статических данных быстро возникает потребность в бэкэнде базы данных. 

На самом деле, я бы сказал, что это общая тенденция отрасли: HDFS-> HBase, S3-> Redshift и т. Д. В конце концов, вы хотите уменьшить задержку аналитики (почти до нуля). Для этого вы устанавливаете какую-то распределенную базу данных, способную поддерживать как пакетную обработку, так и потоковую передачу / микропакетирование. И вы применяете неизменный / инкрементальный подход к хранению данных, который позволяет вам сворачивать инфраструктуру и передавать данные в систему во время ее анализа. (все упрощает в процессе)

Но я отступаю, как шаг в этом направлении …

Мы можем использовать возможности интеграции Java в HPCC для поддержки пользовательских функций в Java. Точно так же мы можем использовать те же возможности, чтобы добавить дополнительные механизмы хранения данных (например, Cassandra). Более конкретно, давайте посмотрим на  возможности потоковой передачи интеграции HPCC / Java для получения данных из внешнего источника.

Давайте сначала посмотрим на ванильную интеграцию Java.

Если у вас есть настройка среды HPCC, интеграция Java начинается с пути / opt / HPCCSystems / classes. Вы можете поместить классы и файлы JAR в это место, и функции будут доступны из ECL. Следуйте  этой странице за инструкциями .

Если у вас возникнут проблемы, просмотрите руководство по устранению неполадок на этой странице. Самое сложное — заставить HPCC найти ваши классы. Для меня я столкнулся с неприятной проблемой версии jdk. По умолчанию HPCC собирал старую версию JDK на моей машине с Ubuntu. Поскольку он использовал старую версию, HPCC не удалось найти классы, скомпилированные с «новым» JDK (1.7), что привело к загадочному сообщению «Не удалось разрешить имя класса». Если вы столкнулись с этим, тянуть  патч я представил , чтобы исправить это для Ubuntu .
Как только у вас это заработает, вы сможете вызывать Java из ECL, используя следующий синтаксис:

IMPORT java;
integer add1(integer val) := IMPORT(java, 'JavaCat.add1:(I)I');
output(add1(10));

Это довольно аккуратно, и, как следует из документации, вы можете вернуть XML из метода Java, если данные сложные. Но что вы делаете, если у вас есть тонна данных, больше, чем может храниться в памяти? Ну, тогда вам нужна потоковая передача Java в HPCC. 😉 

Вместо возврата фактических данных из импортированного метода мы возвращаем java Iterator. Затем HPCC использует итератор для создания набора данных. Ниже приведен пример итератора. 

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class DataStream implements Iterator {
    private int position = 0;
    private int size = 5;

    public static Iterator stream(String foo, String bar){
        return new DataStream();
    }

    @Override
    public boolean hasNext() {
        position++;
        return (position < size);
    }

    @Override
    public Row next() {
        return new Row("row");
    }

    @Override
    public void remove() {
    }

}

Это стандартный итератор, но обратите внимание, что он возвращает объект Row, который определяется следующим образом:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class Row {
    private String value;
    public Row(String value){
       this.value = value;
    }
}

Объект является бобом Java. HPCC установит значения переменных-членов при отображении в DATASET. Чтобы увидеть, как именно это происходит, давайте посмотрим на код ECL:

IMPORT java;
rowrec := record
  string value;
end;
DATASET(rowrec) stream() := IMPORT(java, 'DataStream.stream:(Ljava/lang/String;Ljava/lang/String;)Ljava/util/Iterator;');
output(stream());

После оператора import мы определяем тип записи с именем  rowrec . В следующей строке мы импортируем UDF и набираем результат как DATASET, который содержит  rowrecs . Имена полей в  rowrec  должны совпадать с именами переменных-членов в Java-бине. HPCC будет использовать итератор и заполнит набор данных возвратом метода next (). Последняя строка ECL выводит возвращенные результаты. 

Я поместил весь приведенный выше код в  репозиторий github  с некоторыми инструкциями по его запуску. Веселиться.

Оставайтесь с нами, чтобы узнать больше …
Представьте себе объединение возможностей потоковой передачи Java, описанных здесь, с возможностью потоковой передачи данных из Кассандры, как подробно в моем предыдущем посте . Результатом является мощное средство запуска пакетной аналитики с использованием Thor для данных, хранящихся в Cassandra (с локальностью данных!) … (возможно, включение заданий ECL для данных, поступающих через потоки событий реального времени! =)