В моем предыдущем посте я поделился примерами того, как определить и использовать TumblingWindow в его поведении по умолчанию. В этой статье я приведу примеры переопределения этих значений по умолчанию и обработки поздних значений.
Вам также может понравиться: Глубокое погружение в TumblingWindow Apache Flink — часть 1
Настройка смещения времени окна Tumbling
Давайте возьмем пример, показанный ниже (продолжая предыдущую статью, в которой был создан объект среды выполнения потока Flink, а нашим источником является простой целочисленный генератор).
Джава
xxxxxxxxxx
1
intStream
2
.windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ), Time.seconds( 2 ) ) )
3
.process( new ProcessAllWindowFunction<Integer, Integer ,TimeWindow>()
4
{
5
6
public void process( Context arg0, Iterable<Integer> input, Collector<Integer> output ) throws Exception
7
{
8
logger.info( "Computing sum for {}", input );
9
int sum = 0;
10
for(int i : input) {
11
sum += i;
12
}
13
output.collect( sum );
14
}
15
})
16
.print();
Единственное изменение здесь — это строка № 2, которая теперь вместо использования "timeWindowAll( Time.seconds( 5 ) )"
теперь использует более подробные"windowAll( TumblingProcessingTimeWindows.of( Time.seconds( 5 ),
Time.seconds( 2 ) ) )"
TimeWindowAll () — это метод-оболочка, по умолчанию используется windowAll (TumblingProcessingTimeWindows.of (size)), т. Е. Окно фиксированного размера по времени (это время системного времени, когда выполняется задание Flink).
Примечание. У Флинка есть несколько понятий времени, о которых я расскажу позже в этом посте.
Как я уже говорил в предыдущем посте, по умолчанию Flink запускает окно на границе часов, но используя второй параметр, windowAll()
мы можем настроить границу часов.
Ниже показаны примеры запуска кода выше.
Строка # 1 — # 5 = Flink запускает окно, собирает целые числа. Однако в 19:26:37 это закрытие окна срабатывает, и сумма [1,2,3,4] печатается в строке № 6
Примечание. Если бы смещение не было предоставлено, Flink закрыл бы окно в «19:26:35». Но так как смещение составило 2 секунды, окно заканчивалось на дополнительные 2 секунды за пределами часов.
TumblingWindow с EventTime
До сих пор в нашем обсуждении мы использовали «время» как системное время по умолчанию, в котором Flink выполняет задание. Однако во многих случаях мы хотим использовать фактическое время события, т.е. когда событие было создано в источнике события. Для обработки таких сценариев Flink поддерживает 3 способа обработки «времени». Давайте посмотрим на время события и как его можно использовать во Flink.
Во время события элементы группируются в окна на основе метки времени самого элемента, а не системных часов. Давайте посмотрим на пример.
Сначала я определил простой класс POJO с именем «Элемент» следующим образом. Я использовал lombok для создания геттеров для меня с помощью аннотаций.
Джава
xxxxxxxxxx
1
2
public class Element
3
{
4
Integer value;
5
Long timestamp;
6
7
public Element( int counter, long currTime )
8
{
9
this.value = counter;
10
this.timestamp = currTime;
11
}
12
13
14
public String toString()
15
{
16
return "" + value;
17
}
18
}
Затем я определяю простой класс Source под названием «ElementGeneratorSource», который будет создавать объекты типа Element и назначать случайную возрастающую метку времени. Это сделано для того, чтобы я не создал элемент с соответствующим системным временем. На практике у вас будет временная метка как часть самого события.
Джава
xxxxxxxxxx
1
class ElementGeneratorSource implements SourceFunction<Element>
2
{
3
volatile boolean isRunning = true;
4
final Logger logger = LoggerFactory.getLogger(ElementGeneratorSource.class);
5
6
7
public void run( SourceContext<Element> ctx ) throws Exception
8
{
9
int counter = 1;
10
11
// 20 seconds behind flink program's start time
12
long eventStartTime = System.currentTimeMillis() - 20000;
13
14
// create first event using above timestamp
15
Element element = new Element(counter++, eventStartTime);
16
17
while( isRunning )
18
{
19
logger.info("Produced Element with value {} and timestamp {}", element.getValue(), printTime(element.getTimestamp()));
20
21
ctx.collect( element );
22
23
// create elements and assign timestamp with randomness so that they are not same as current system clock time
24
element = new Element(counter++, element.getTimestamp() + ThreadLocalRandom.current().nextLong( 1000, 6000 ));
25
26
Thread.sleep( 1000 );
27
}
28
}
29
30
31
public void cancel()
32
{
33
isRunning = false;
34
}
35
36
// helper function to print epoch time in readable format
37
String printTime(long longValue)
38
{
39
return LocalDateTime.ofInstant(Instant.ofEpochMilli(longValue), ZoneId.systemDefault()).toString();
40
}
41
}
Теперь давайте определим конвейер для обработки этих элементов с помощью TumblingEventTime Windows. (Я удалил строки объявления класса и метода, чтобы сосредоточиться на важных блоках кода.)
Джава
1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2
3
// set to EventTime else it defaults to ProcessTime
4
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
5
6
DataStreamSource<Element> elementStream = env.addSource( new ElementGeneratorSource() );
7
8
elementStream
9
.assignTimestampsAndWatermarks( new AscendingTimestampExtractor<Element>()
10
{
11
12
public long extractAscendingTimestamp( Element element )
13
{
14
return element.getTimestamp();
15
}
16
})
17
.windowAll( TumblingEventTimeWindows.of( Time.seconds( 10 ) ) )
18
.process( new ProcessAllWindowFunction<Element, Integer ,TimeWindow>()
19
{
20
21
public void process( Context arg0, Iterable<Element> input, Collector<Integer> output ) throws Exception
22
{
23
logger.info( "Computing sum for {}", input );
24
25
int sum = 0;
26
for(Element e : input) {
27
sum += e.getValue();
28
}
29
output.collect( sum );
30
}
31
})
32
.print();
33
34
env.execute();
Строка № 1 — Определите среду выполнения потоковой передачи, чтобы начать с потоковой передачи Flink .
Строка № 4 — необходимо установить EventTime, иначе Flink будет игнорировать метку времени внутри элементов и использовать системные часы по умолчанию.
Строка № 6 — создайте DataStream, используя ElementGenerator в качестве источника (обсуждалось ранее в этой статье)
Строка № 9. Перед определением окна мне нужно сообщить Flink, как получить отметку времени и водяной знак для каждого элемента, который он получает.
В этом примере я использую очень удобный класс « AscendingTimestampExtractor », который согласно документу Flink: «Назначитель меток времени и генератор водяных знаков для потоков, где метки времени монотонно растут. В этом случае локальные водяные знаки для потоков легко генерировать». потому что они строго следуют меткам времени. » Еще одно преимущество использования этого предоставленного Flink API заключается в том, что он создаст для меня водяной знак. Водяной знак — это способ для Flink узнать, когда закрывать текущее окно (последний элемент, принадлежащий окну, прибыл).
Короче говоря, assignTimestampsAndWatermarks()
позволит Flink узнать, как читать метку времени из события / элемента, поступающего во Flink, и, что наиболее важно, как вычислять водяные знаки.
Строка № 17 — Определить окно типа TumblingEventTimeWindows
с размером 10 секунд.
Остальной код похож на предыдущий, где мы суммируем значения и печатаем его.
Пример вывода приведенного выше примера
Три элемента создаются в строках № 1, № 2, № 3 с отметкой времени, отличной от отметки системных часов. (время системных часов печатается первым до уровня журнала).
Когда третий элемент создается в «2020-02-22T22: 22: 02.495», он вызывает текущее закрытие окна, потому что водяной знак был нарушен. С 10-секундным временным окном конечное время здесь будет «2020-02-22T22: 21: 59.000». Таким образом, текущее окно собирает только первые два значения.
При следующем запуске окно закроется в «2020-02-22T22: 22: 09.000», что означает, что значение 3 и значение 4 будут собраны в новом окне, поскольку в строке # 7 есть элемент с меткой времени> = текущий водяной знак.
Заключение
В этой статье мы обсудили переопределение границы часов по умолчанию и как работать с TumblingEventTimeWindow. Мы также видели пример назначения метки времени элементам.
В следующей части я поделюсь и обсудим обработку поздних элементов, которые поступают после создания водяного знака для текущего окна.
Дальнейшее чтение
Пример базового преобразования Apache Flink
Анализ журнала 101 с помощью Apache Flink, Spring Boot и ActiveMQ