Обрабатывать опоздания
На практике элементы или события не поступают в той последовательности, в которой они были созданы. Это может быть связано с внешними факторами, такими как задержка сети или пользователь вышел из сети. Это распространенная проблема в потоковых вычислениях и должна решаться либо потоковой средой, либо самим приложением.
** Мое использование слов «элементы», «события» или «сообщения» в этом посте представляет собой единицу неизменяемых данных, генерируемых в источнике и доступных в качестве вывода из потока
Итак, что такое задержка здесь , согласно Флинку: «Поздние элементы — это элементы, которые поступают после того, как часы времени системы (как сигнализируются водяными знаками) уже прошли время отметки времени последнего элемента». Проще говоря, если окно заканчивается в 15:00:00, то любой элемент, поступающий в систему Flink после этой отметки времени, запаздывает. По умолчанию поздние элементы отбрасываются Flink .
Чтобы позволить пользователям обрабатывать поздние элементы, Flink предоставляет метод, называемый « allowedLateness
». Вы можете указать, как оконное преобразование должно обрабатывать поздние элементы и сколько допускается задержек. Элементы, поступающие в пределах допустимого времени ожидания, все еще помещаются в окна и учитываются при вычислении результатов окна. Если элементы прибывают после допустимого опоздания, они будут отброшены.
Примечание. Установка допустимого времени задержки действительна только для окон времени события .
Е Xample 1
Я использую класс «ElementGeneratorSource» (из предыдущего поста ) в качестве моего источника, я настроил его, чтобы вручную генерировать некоторые поздние элементы следующим образом:
Предполагая текущее системное время как 2020-03-16 13:42:40
Элемент | Отметка | Окно 5 секунд |
1 | 2020-03-16T13: 42: 20 | |
2 | 2020-03-16T13: 42: 21 | |
3 | 2020-03-16T13: 42: 26 | Триггеры 1-й водяной знак |
4 | 2020-03-16T13: 42: 23 | 1-й поздний элемент |
5 | 2020-03-16T13: 42: 24 | 2-й поздний элемент |
6 | 2020-03-16T13: 42: 28 | |
7 | 2020-03-16T13: 42: 39 | Триггеры 2-го водяного знака |
Теперь код Flink для обработки поздних элементов
Джава
1
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2
DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );
3
elementStream
5
.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
6
{
7
8
public long extractAscendingTimestamp( Element element )
9
{
10
return element.getTimestamp();
11
}
12
})
13
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 5 ) ) )
14
.allowedLateness( Time.seconds( 3 ) )
15
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
16
{
17
18
public void process( Context arg0, Iterable<Element> input, Collector<Integer> output ) throws Exception
19
{
20
logger.info( "Computing sum for {}", input );
21
int sum = 0;
22
for(Element e : input) {
23
sum += e.getValue();
24
}
25
output.collect( sum );
26
}
27
})
28
.print();
29
env.execute();
Аналогичен примеру EventTimestamp из предыдущего примера за исключением .allowedLateness( Time.seconds( 3 ) )
Я опоздал на 3 секунды, чтобы взять эти два поздних элемента из таблицы выше.
Пробный прогон дает следующий результат
Немного сюрприз здесь - я ожидал, что Flink создаст одно окно для [1, 2, 4, 5] и другое для [3, 6], но вместо этого я получил несколько перекрывающихся окон (вроде скользящего окна, имеющего элементы).
Поиск этого вопроса привел меня к этому сообщению , посвященному стеку , которое помогло мне ... "Из-за allowLateness любые поздние события (в пределах периода разрешенного опоздания) будут вызывать поздние (или, другими словами, дополнительные) срабатывания соответствующих окон. При использовании EventTimeTrigger по умолчанию (который, по-видимому, вы используете), каждое позднее событие вызывает дополнительный запуск окна, и обновленный результат окна будет выдан ».
Проще говоря, «allowLateness () со стандартным EventTimeTrigger создаст более одного окна - сначала с элементами внутри обычного окна, а затем с параметром first_window + late_element». Поскольку я использую EventTimeTrigger по умолчанию (не предоставил никакого пользовательского триггера), приведенное выше поведение является ожидаемым. Другой вариант - определить пользовательский триггер, который не будет запускать окно до тех пор, пока getCurrentWatermark <window_time + allow_lateness
Пример 2
Другой рекомендуемый подход для обработки опозданий - использование sideOutputLateData
метода. Есть два преимущества использования подхода «OutputTag» :
1. Вы можете разделить обычный поток и поздний поток
2. Вы можете поместить любые данные, которые не были правильно обработаны в обычном потоке, в другой поток для целей отладки и сортировки (вроде очереди недоставленных сообщений)
Давайте посмотрим пример здесь
Джава
xxxxxxxxxx
1
final OutputTag<Element> outputLateTag = new OutputTag<Element>("side-output-late") {};
2
final OutputTag<Element> outputErrTag = new OutputTag<Element>("side-output-error") {};
4
SingleOutputStreamOperator<Integer> stream =
6
elementStream
7
.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
8
{
9
10
public long extractAscendingTimestamp( Element element )
11
{
12
return element.getTimestamp();
13
}
14
})
15
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 5 ) ) )
16
.sideOutputLateData( outputLateTag )
17
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
18
{
19
20
public void process( Context ctx, Iterable<Element> input, Collector<Integer> output ) throws Exception
21
{
22
logger.info( "Computing sum for {}", input);
23
int sum = 0;
24
for(Element e : input)
25
{
26
sum += e.getValue();
27
28
// send to a side stream
29
ctx.output(outputErrTag, e));
30
}
31
output.collect( sum );
32
}
33
});
34
// get late and error streams as side output
36
DataStream<Element> lateStream = stream.getSideOutput( outputLateTag );
37
DataStream<Element> errStream = stream.getSideOutput( outputErrTag );
38
// print to console
40
stream.print();
41
lateStream.print();
42
errStream.print();
Строка # 1, # 3 = определить два боковых выходных тега, один для обработки поздних элементов, а другой для обработки любых ошибок
Строка # 16 = объявить, что вы хотите, чтобы каждый поздний элемент в боковом выводе помечался как «outputLateTag»
Строка # 29 = Ради этого примера я пишу каждый элемент в "outputErrTag"
Строка № 36, № 37 = получить доступ к побочным потокам
Строка # 40 - # 42 = Вывести результат на консоль
Заключение
В этой статье мы обсудили использование allowLateness для обработки поздних элементов, которые поступают после создания водяного знака для текущего окна. Кроме того, мы увидели, как использовать подход «sideOuput» для перенаправления поздних элементов в совершенно новый поток.