В этой статье я поделюсь примерами кодирования некоторых ключевых аспектов TumblingWindow во Flink. Те, кто не знаком с потоковой передачей Flink, могут получить представление здесь.
Прежде чем мы перейдем к TumblingWindow , давайте разберемся с «окном», когда дело доходит до потоковой обработки или потоковых вычислений. В потоке данных у вас есть источник, который непрерывно создает данные, что делает невозможным вычисление окончательного значения.
В большинстве случаев для получения значимой информации предпочтительны два метода.
- Вычисление выполняется для конечного набора времени (например, ошибки HTTP 401 в минуту)
- Вычисления выполняются в виде непрерывного обновления (например, табло, актуальные темы)
«Окно» определяет конечный набор элементов в неограниченном потоке, к которому мы можем применять вычисления. Этот набор может основываться на времени, количестве элементов, комбинации счетчиков и времени или некоторой пользовательской логике для назначения элементов окнам.
- количество заказов, полученных каждую минуту (фиксированное время)
- среднее время выполнения последних 100 заказов (фиксированные элементы)
Поставщики потоковой инфраструктуры реализуют более одного варианта определения «окна». У Flink есть три типа (a) Tumbling (b) Sliding и (c) Session window, из которого я остановлюсь на первом в этой статье.
Вам также может понравиться: потоковый ETL с Apache Flink
TumblingWindow
Это окно простое для понимания и с которым легко начать. Это окно фиксированного размера, где «размер» — это либо время (30 секунд, 5 минут), либо просто счет (100 элементов).
Временное окно в 5 минут будет собирать все элементы, поступившие в окно, и оценивать его через пять минут. Новое окно будет запускаться каждые пять минут. Окно подсчета 100 будет собирать 100 элементов в окне и оценивать окно, когда 100-й элемент был добавлен.
Самое главное, что не будет перекрытия окон и дублирующих элементов. Каждый элемент присваивается только одному окну. Если вы указали ключ, то Flink будет логически разделять поток и запускать параллельные оконные операции для каждого ключевого элемента.
Давайте посмотрим на пример, чтобы понять их лучше. Простой IntegerGenerator
класс » » действует как источник, производящий целое число каждую секунду (начиная с 1). Следующие строки инициализируют локальную среду Flink и создают объект DataStream.
Джава
1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2
DataStream<Integer> intStream = env.addSource( new IntegerGenerator() );
Временное окно
Джава
xxxxxxxxxx
1
intStream
2
.timeWindowAll( Time.seconds( 5 ) )
3
.process( new ProcessAllWindowFunction<Integer, Integer ,TimeWindow>()
4
{
5
6
public void process( Context arg0, Iterable<Integer> input, Collector<Integer> output ) throws Exception
7
{
8
logger.info( "Computing sum for {}", input );
9
int sum = 0;
10
for(int i : input) {
11
sum += i;
12
}
13
output.collect( sum );
14
}
15
})
16
.print();
17
18
env.execute();
Строка 2 — Определяет падающее окно из пяти секунд (фиксированный размер по времени)
Строка 3 — Определяет вычисления (бизнес-логика) с помощью API-интерфейса Flink ProcessAllWindowFunction. Здесь я просто вычисляю сумму всех целых чисел, собранных в данном окне.
Примечание — ProcessAllWindowFunction
Flink позволит буферизовать все элементы окна в памяти и затем передавать целые элементы для вычисления. Вот почему у вас есть Iterable<>
объект в качестве входного параметра для process()
.
Строка 13 — возвращает результат этого окна обратно Flink для следующего шага, который просто печатается на консоли.
Следующее показывает вывод образца прогона
Давайте рассмотрим вывод здесь.
- Строка # 1 — # 3 = Два целых числа были получены до закрытия текущего окна. Обратите внимание, что хотя мы сказали пять секунд, первое окно не работало в течение пяти секунд. Причина в том, что по умолчанию Flink округляется до ближайшей границы часов, которая в нашем случае произошла в 13:33:55. Это вызвало Flink TriggerWindow, чтобы закрыть текущее окно и передать его следующему шагу (оператор Флинка).
- Строка # 4 =
process()
метод был вызван со всеми элементами [1, 2] и сумма «3» была выведена на консоль - Строка # 5 — # 10 = Новое окно запускается и собирается следующий набор целых чисел. Через 5 секунд в «13:34:00» окно закрывается. Все собранные данные отправляются в процесс, который печатает полученные целые числа и вычисляет сумму чисел в этом окне = ’18’.
- Строка # 11 = текущая сумма окна выводится на консоль.
- Аналогичная логика применяется дальше от строки № 12 и далее.
Примечание . Целые числа в предыдущем окне отсутствуют в этом новом окне. Следующее окно запускается после закрытия текущего окна (без наложения и дублирования).
Окно подсчета
Джава
x
1
intStream
2
.countWindowAll( 4 )
3
.reduce( new ReduceFunction<Integer>()
4
{
5
6
public Integer reduce( Integer value1, Integer value2 ) throws Exception
7
{
8
logger.info( "Reducing {} and {}", value1, value2 );
9
return value1 + value2;
10
}
11
})
12
.print();
13
env.execute();
Используя то же самое IntegerGenerator
как источник, выше пример окна подсчета
Строка 2 = Определяет акробатическое окно из четырех элементов (фиксированный размер по количеству)
Строка 3 = Определяет вычисление, которое будет выполнено для элементов окна с использованием Flink's ReduceFunction API. Логика та же (сумма чисел)
Примечание - ReduceFunction
позволит Flink выполнять инкрементные вычисления (непрерывное обновление). В след памяти очень меньше по сравнению с ProcessFunction. Первый параметр - это вычисленное значение из предыдущего окна, а второй параметр - текущий элемент, назначенный этому окну.
Строка 4 = вывод окончательного результата этого окна на консоль.
Ниже показаны выходные данные образца:
Давайте поймем вывод здесь
- Line #1 - #3 = First two integers are collected and then Flink triggers TriggerWindow which calls the reduce() method with the first two elements. The computed value '3' is buffered in Flink
- Line #4 = Next integer value '3' is produced by source.
- Line #5 =
reduce()
method is called, please note in here the first parameter is '3' from the previous calculation and the second parameter is the current integer produced by the source. The computed value '6' is buffered in Flink. - Line #6 = Next integer value '4' is produced by source.
- Line #7 =
reduce()
method is called with first parameter as '6' and second parameter as '4'. The computed value is 10 now. At this point, Flink has collected 4 integers from source and thus our count condition has been satisfied for this window. - Line #8 = Since the current window count size has been reached, Flink prints the value 10 (1+2+3+4) of this window.
- Line #9 - #10 = A new window starts and it waits for the next two integers from the source.
- Line #11 =
reduce()
is called with a new set of numbers which right now are 5 and 6
Similar logic is applied for the next two numbers and reduce()
is called accordingly to perform incremental updates.
Line #16 = When Flink gets 4 numbers for the current window, it calls print()
and outputs 26 (5+6+7+8)
Conclusion
In this article, we observed both types of TumblingWindow (time vs count) with their default behavior. We also saw two window functions, ProcessAllFunction and ReduceFunction for accumulative and incremental computation.
In the next part, I will share and discuss how to override some of the default behavior and how to handle late arrival to data.