Последние две недели я пробовал использовать Apache Beam API. Я прочитал эту превосходную документацию, предоставленную Beam, и она помогла мне понять основы. Я рекомендую читателям пройти через это, прежде чем мы продолжим.
Вступление
Сегодня мы собираемся построить простой конвейер данных WordCount, используя Apache Kafka для неограниченных источников. Мы могли бы использовать любого брокера сообщений для этого приложения, например, Google Pub / Sub и так далее. В Beam имеется множество встроенных разъемов ввода-вывода для обмена сообщениями. В конце нашего конвейера мы выведем результат в текстовый файл.
Вам также может понравиться: сделать интенсивную обработку данных эффективной и переносимой с помощью Apache Beam
Прежде чем мы перейдем к коду, мы должны знать некоторые концепции потоковой передачи, такие как управление окнами, триггеры, время обработки и время события. Я рекомендую прочитать эту статью Streamin 101 и Streaming 102 от Tyler Akidau .
Хорошо, давайте продолжим и сделаем настройку!
Настроить
- Настройте среду Java. Мы собираемся использовать Java API Beam.
- Установите Zookeeper и Apache Kafka. Если вам лень это делать, перейдите сюда (не беспокойтесь о Yarn, просто раскрутите Zookeeper и Kafka, используя команды «bin / grid start all» и «bin / grid stop all» .)
- Добавьте переменную $ KAFKA_HOME в файл .bashrc / .zshrc и перезапустите сеанс терминала.
-
Оболочка
xxxxxxxxxx
1
1
export $KAFKA_HOME=<install folder>/hello-samza/deploy/kafka
- Клонируйте этот репозиторий Git здесь
- Установить Python
После того, как все связано. Мы создадим тему Кафки, чтобы отправить сообщение. Используйте команду ниже:
Оболочка
xxxxxxxxxx
1
# create topic by name "messenger"
2
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic messenger
3
# to list the existing topic
4
./bin/kafka-topics.sh --list --zookeeper localhost:2181
В этом примере мы будем использовать прямой запуск Beam в качестве движка исполнения. Перейдите в домашний каталог проекта и выполните эту команду, чтобы запустить конвейер.
Оболочка
xxxxxxxxxx
1
mvn compile exec:java -Dexec.mainClass=com.sunil.WindowedWordCount -Pdirect-runner -Dexec.args="--output=<output folder>"
Это оно! Конвейерное задание слушает тему Кафки и готово к обработке данных.
Выполните эту команду, чтобы отправить событие Кафке в тему «Мессенджер».
Оболочка
xxxxxxxxxx
1
cd scripts
2
python3 GenMessage.py <name> <message<optional>> <epoch time in ms<optional>>
3
# example: python3 GenMessage.py sunil
4
# {"name":"sunil","message": "This message is from sunil","ets": 1580054229156}
Давайте поговорим о коде сейчас!
В этом примере мы будем считать нет. слов для данного размера окна (скажем, 1-часовое окно). Окна в Beam основаны на времени события, т.е. времени, полученном из метки времени сообщения, а не системной метки времени (метка времени обработки). В зависимости от времени события луч поместит сообщение в соответствующее окно.
Когда окно достигает установленного водяного знака (на основе эвристики), т.е. ожидается, что все данные поступили в систему для определенного окна, окно закрывается. Триггеры предоставляют гибкие способы запуска вычислений (подробнее об этом здесь ) для накопленных событий (внутри окна). В нашем случае мы устанавливаем запуск после закрытия окна.
Оставшийся набор операций, таких как GroupBy и CountPerKey (map-Reduce), может быть выполнен с этим результатом. Выходные данные операции преобразования карты сохраняются в файл, разделенный временной меткой окна.
Рисунок 1. Обработка неограниченного потока во время события и операции окон
Наш основной класс выглядит примерно так:
Джава
xxxxxxxxxx
1
public class WindowedWordCount {
2
static void runWithOptions(WindowedWordCountOptions options) {
4
Pipeline pipeline = Pipeline.create(options);
5
Duration WINDOW_TIME = Duration.standardMinutes(1);
6
Duration ALLOWED_LATENESS = Duration.standardMinutes(1);
7
CoderRegistry cr = pipeline.getCoderRegistry();
9
cr.registerCoderForClass(Record.class, new RecordSerializableCoder());
10
pipeline.apply(
13
KafkaIO.<Long, Record>read()
14
.withBootstrapServers(options.getBootstrap())
15
.withTopic(options.getInputTopic())
16
.withKeyDeserializer(LongDeserializer.class)
17
.withValueDeserializer(RecordDeserializer.class)
18
.withTimestampPolicyFactory((tp, previousWaterMark) -> new CustomFieldTimePolicy(previousWaterMark))
19
.withoutMetadata()
20
)
21
.apply(Values.<Record>create())
22
.apply("append event time for PCollection records", WithTimestamps.of((Record rec) -> new Instant(rec.getEts())))
23
.apply("extract message string", MapElements
24
.into(TypeDescriptors.strings())
25
.via(Record::getMessage))
26
.apply("apply window", Window.<String>into(FixedWindows.of(WINDOW_TIME))
27
.withAllowedLateness(ALLOWED_LATENESS)
28
.triggering(AfterWatermark.pastEndOfWindow())
29
.accumulatingFiredPanes()
30
)
31
.apply("count words", new CountWords())
32
.apply("format result to String",MapElements
33
.into(TypeDescriptors.strings())
34
.via((KV<String, Long> rec) -> rec.getKey() + ":" + rec.getValue()))
35
.apply("Write it to a text file", new WriteOneFilePerWindow(options.getOutput()));
36
pipeline.run();
39
}
40
public static void main(String[] args) {
42
WindowedWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(WindowedWordCountOptions.class);
43
options.setStreaming(true);
44
runWithOptions(options);
45
}
46
}
Время события и водяной знак
Водяной знак - это системное понятие, указывающее, когда можно ожидать поступления всех данных в определенном окне в систему. Как правило, водяные знаки являются производными от самой исходной системы, то есть от потребителя Kafka в нашем случае. Мы должны настроить соединитель Kafka на использование времени события сообщения для создания водяного знака вместо времени обработки, которое является значением по умолчанию.
В разъеме IO для Kafka луча мы можем установить эту конфигурацию с помощью метода withTimestampPolicyFactory . Здесь мы предоставляем настраиваемую политику для переопределения поведения по умолчанию. Неправильная конфигурация не приведет к выводу результатов из конвейера, который иногда трудно отладить.
Джава
xxxxxxxxxx
1
/**
2
* Custom TimestampPolicy for Kafka source to manage timestamp and watermark when it pulls data from broker
3
*/
4
public class CustomFieldTimePolicy extends TimestampPolicy<Long, Record> {
5
protected Instant currentWatermark;
8
public CustomFieldTimePolicy(Optional<Instant> previousWatermark) {
10
currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
11
}
12
15
public Instant getTimestampForRecord(PartitionContext ctx, KafkaRecord<Long, Record> record) {
16
currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
17
return currentWatermark;
18
}
19
21
public Instant getWatermark(PartitionContext ctx) {
22
return currentWatermark;
23
}
24
}
Другой важной частью при обработке событий во времени события является установка временной метки события для каждого сообщения в PCollection, как показано ниже:
Джава
xxxxxxxxxx
1
... ...
2
.apply("append event time for PCollection records", WithTimestamps.of((Record rec) -> new Instant(rec.getEts())))
3
... ...
Окно использует эту метку времени для перемещения событий в соответствующие окна.
Скорее всего, вы столкнетесь с этим сообщением об ошибке, если проигнорируете установку политики withTimestampPolicyFactory, потому что мы установили окна для использования времени события, а соединитель Kafka использует время обработки (по умолчанию), которое всегда самое позднее, а время события на миллисекунду старше, чем время обработки.
Заключение
Я надеюсь, что вы узнали, как обрабатывать неограниченный поток во время события, используя Beam и Kafka. Здесь я пропустил многие части конвейерного кода, такие как кодеры / декодеры, serde для сообщений Kafka, окно записи в файл и другие PTransforms, которые описаны в Beam Javadocs. Пожалуйста, изучите git-код на досуге. Удачного кодирования!
Дальнейшее чтение
Осмысление потоковой обработки
Потоковая обработка в реальном времени с помощью Apache Kafka. Часть первая