当前位置:
首页
文章
后端
详情

7.一文搞懂Flink中窗口的概念

1.前言

在上一篇文章当中说了,如果需要进行双流join操作,可以选择在窗口的范围内进行,join操作会以窗口范围内的所有数据做inner join,然后将匹配到的所有数据交给计算函数进行处理,这就是窗口join的执行方式,但是这里也有一个之前没有提到过的概念,那就是“窗口”。

窗口在数据计算的过程中很常见,它要做的实际上就是在没有尽头的数据流中切割出一段一段的范围区间,然后对这个区间的数据进行相应的计算工作。接下来,我们就本着这个理念出发,去聊聊Flink中窗口到底是什么。

2.时间语义

看到这里你可能会有个疑问,为什么我要聊窗口,但是这里却写的是时间语义呢?其实这不难理解,大家想一下,窗口本身就是在流式数据中切割出来的一个一个的小部分,这里会有一个逻辑切割的动作,那这个动作依赖的是什么呢?那就是时间概念啦,不过要注意一下,Flink是以事件为驱动的,这里说的时间二字,大家可以理解为Flink对流动数据的一个判断依据,也可以说是数据自身的顺序标识!如果不去使用时间语义,那在流动中的数据就没有先后之分,数据开窗的结果就会产生误差。

在Flink中一共有三个时间语义,它们分别是souce读取数据时的摄入时间、数据进入到程序中的处理时间、数据本身自带的时间戳的事件时间。在这三种语义中,摄入时间出现的频率极低,我本身并没有使用过,所以不做讲解。而处理时间和事件时间就能够满足极大多数场景的要求了,这里也就围绕事件事件和处理时间来说。

2.1 处理时间

处理时间是个好东西,因为它比事件时间要简单很多。因为它是数据进入到Flink程序的时间,所以数据肯定是秉承着先后顺序的,也就是满足时间上的单调递增的,实际上就是顺序流。顺序流理所当然的不会发生数据乱序的情况嘛。在开窗的时候,不需要各个节点的协调,所以逻辑上更加清晰明显。

2.2 事件时间

事件时间就要复杂一些了,可以想一下。事件时间本身就是数据被创建时候的时间戳信息,虽然从埋点出发的时候顺序保持单调。但是Flink毕竟是分布式的计算,必然会出现因为传输的问题和不同服务器之间性能上的差异而产生的数据乱序,让后产生的数据比先产生的数据先进入到程序,这也就导致了无法用其本身自带的时间戳进行数据现后的判断了,因此如果选择了事件时间语义,就要通过一个名叫水位线的机制来标明当前数据流的时间进展。

3.水位线

既然上面已经提到了,事件时间通过水位线机制来判断数据流的时间进展,那就要先谈谈水位线之后才能去聊窗口了。看到这里大家不妨想一下,为什么事件时间要引入一个水位线的机制才能使用窗口计算呢?而处理时间就不用这么麻烦呢?这是因为窗口计算是一种在流式数据中进行批计算的方式,它需要凑够所有属于当前开窗范围内的数据都来了,才能出发计算。处理时间中没有乱序情况,所以当选用处理时间语义的时候,属于当前窗口的最后一条数据来了之后窗口即可触发计算。但是事件时间可能会出现属于当前窗口的数据在能够触发窗口计算的数据后面到来的情况发生,这就会导致数据来了窗口已经不见了,不仅造成了数据丢失还影响了计算结果。

这就引入了水位线的作用,为了避免上述情况的发生,我们可以通过在数据流中加入一个标记来表明当前数据流的时间进展,这个标记就是水位线机制。而且水位线机制还能以广播的形式发送给下游所有任务,也就是不会发生在下游多并行度的情况下,一个水位线标识只进入到了一个并行度而导致其他并行度中的窗口无法触发计算的情况发生。

3.1 顺序流中的水位线

那既然水位线机制是为了指明乱序流中的时间进展而指定的,那顺序流中是不是就不要水位线的概念了呢?实际上不然,水位线本身就是用来标记时间进展的,并不是单纯的为了事件时间而服务。只不过在顺序流生成水位线的时候可以指定水位线周期生成,因为如果每来一条数据就生成一条水位线,就会导致出现大量一致的水位线,这不仅会浪费性能,还能增加无用的集群压力。

3.2 乱序流中的水位线

乱序流中的水位线有个问题需要考虑,因为水位线是从时间戳中提取出来的,如果正好赶上从迟到数据中生成水位线,就会造成“时光倒流”的情况发生,所以就需要在生成水位线之后对时间戳做一个判断,判断当前水位线是否大于之前的那个,只有大于才会插入。而且还会有个问题,那就是当触发窗口计算的数据来了之后依然有迟到的数据没有过来呢,可是窗口却触发了计算。那这条数据就会丢失,所以在指明乱序流事件语义的时候,还要加入一个延迟时间的概念,让窗口即使读到了能够触发计算的数据,却能不触发计算,继续等待一段时间。只有当延迟的时间到达了,才会触发计算,销毁窗口。如果还有迟到的数据,只能放到侧输出流中做保存操作了。

3.3 生成水位线的策略

在flink中,有一个专门的方法用来生成水位线,这个方法是assignTimestampsAndWatermarks。它用来为流中的数据分配时间戳,并生成水位线来指示事件时间。如果想周期生成水位线,可以在环境变量的配置参数中通过调用setAutoWatermarkInterval方法来完成。

在Flink内部有两个能够开箱即用的水位线生成器,分别对应单调和乱序,其实在底层这两种方法实现方式是类似的,如果对于乱序流中传入的延迟时间参数为0的话,那这两种水位线生成器就没有什么差别了。

3.4 水位线的传递

水位线的传递是一件比较头疼的事情,大家试想一下如果上游任务有5条并行度,下游任务有3条并行度。当上游向下游传递数据的时候,水位线也是向下游传递的。3个并行度接收5个并行度发送过来的数据,那就证明肯定会有一个到两个的下游并行子任务(3并行度)需要接收一个以上的上游并行子任务(5并行度),那这个时候流淌在水位线里面的水位线怎么传递?怎么搞?

其实这种情况是这个样子的,在多个上游任务向一个下游任务发送数据的时候,下游任务会给每一个向它发送数据的上游任务设置一个“分区水位线”,而自己要接收的水位线就是当前分区水位线中最小的那个。上游的任务每做一次水位线更新,下游子任务就会从更新后的水位线中拿到最小的那个作为自己的水位线。

4.窗口

前面说了这么多,终于要到最重要的窗口这一块啦。之前也说了,窗口计算是在流式数据上切割下来一块一块的范围数据做批处理的,单纯的普通切割没办法满足所有的场景,所以需要从多个方面去考虑窗口的开窗类型。

4.1 按时间开窗 与 按数据数量开窗

4.1.1时间窗口

按照时间开窗的方法是通过窗口的开始时间和结束时间进行窗口的切割的,整体采用的是左闭右开的取数范围,窗口的数据范围是开窗时间到关窗时间-1毫秒,这个区间,即:[开窗时间,关窗时间-1)

按照时间的窗口可以分为滚动、滑动、会话、全局。
滚动:固定范围开窗,窗口不存在重叠部分。

处理时间 -window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.hours(-8))) //定义为中国时区
事件时间 -window(TumblingEventTimeWindows.of(Time.seconds(5)))

滑动:按照固定范围和滑动步长进行开窗,窗口存在重叠部分。

.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

会话:按照固定时间没有来数据而关闭窗口。

//处理时间会话窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

window(EventTimeSessionWindows.withGap(Time.seconds(10)))

全局:它是将所有的数据汇聚到一个窗口,并且需要自定义触发逻辑,否则永远都不会触发计算。

4.1.2计数窗口

计数窗口就是按照元素的格式进行数据截取的,当数据达到个数的时候就触发窗口的计算。在底层代码实现上,是通过全局窗口来实现的。
滚动计数窗口:

stream.keyBy(...)
.countWindow(10)

滑动计数窗口

stream.keyBy(...)
.countWindow(10,3)

事件时间会话窗口

stream.keyBy(...)
.window(GlobalWindows.create()); 
//需要自定义触发计算

4.2 窗口计算

在介绍完了窗口如何分配之后,就可以进行下一个步骤了。那就是窗口是如何进行计算的,不能处理数据的时候只进行划分不进行计算吧。
计算逻辑需要根据 当前数据是否被key by方法处理过来划分,是按键分区还是非按键分区。只有这样判断之后,才好继续。

4.2.1 按键分区状态

按键分区状态通过调用window方法完成计算。如果数据被key by处理过后,那么数据就会按照key进行逻辑分区,在keyed流上做开窗计算的时候,每一个key都会有一个自己的计算窗口。

4.2.2 非按键分区状态

非按键分区状态通过调用windowall方法完成计算,如果没有被key by处理过后的数据进行开窗,那么所有的数据都会进入到当前的这个窗口,也就会让窗口的并行度变成了 1 .

5.窗口函数

如果要对数据流做窗口计算,就先要生成其水位线,然后指定窗口内数据的分配原则,在这一切都做完之后,就需要进行具体窗口的计算逻辑编写了。

窗口函数可以按照计算方式的不同划分为增量聚合窗口函数和全窗口函数。

5.1 增量聚合函数

增量聚合函数是对窗口数据的计算方式,具体可以分为两大类:归约函数和聚合函数

5.1.1 归约函数

归约函数中保存的是窗口内数据的聚合值,每当有新的数据进入到窗口之后,就会触发归约计算,让新进入的数据归约到已经聚合的数据结果中,但是要求进来的数据类型要和归约的数据数据类型一致。

5.1.2 聚合函数

聚合函数与归约函数类型,当窗口选择使用聚合函数进行计算的时候,如果选用聚合计算,就能够让中间聚合的值的数据类型与进入数据流中的数据类型不一致

5.2 全窗口函数

全窗口函数和增量聚合函数的区别在于,增量聚合函数是来一条数据进行一次计算,而全窗口函数是等到数据都来了之后再进行计算。虽然与增量聚合函数相比,全窗口函数在进行计算的时候对集群造成的压力比较高,但是选用这种计算规则能够获取运行时的上下文信息。

在函数种类方面,全窗口函数可以分为 窗口函数 和处理窗口函数。但是窗口函数的功能被处理窗口函数给全覆盖了,所以可以直接使用处理窗口函数。通过这个函数,能够获得窗口内数据的迭代器信息。

5.3 窗口函数联合使用

上面说了,增量聚合窗口和全窗口函数都有各自的计算逻辑,前者压力小但是无法获得上下文信息,后者压力大但是可以获得上下文信息。如果想要同时拥有二者的能力就可以将这两种函数联合在一起使用。前者进行当前窗口的数据计算,后者负责接收前面窗口的计算结果,虽然这会让全窗口函数中的迭代器中只有一个数据,但是却能够获得运行时上下文信息。

6.怎么保证窗口计算的时候数据不丢失

Flink中为了保证窗口计算的时候,数据不会丢失给数据设置了三重保障。
1.水位线延迟时间
在使用乱序流时间语义的时候,可以指定水位线的延迟时间,让窗口能够在延迟的区间内等待迟到的数据。
2.允许迟到数据
在窗口计算的时候,开启允许迟到数据。这样窗口就在被水位线触发计算之后,继续等待一段时间,当有新的数据进入到窗口时,会触发二次计算。
3.开启迟到数据进入到侧输出流中,虽然有了上面两种保障的支持,数据丢失的可能性已经很低了。但是万一有漏网之鱼就不太好了,所以可以通过这个功能,让迟到的数据进入到侧输出流中进行保存。

7.其他API

1.触发器
用来自定义窗口何时触发计算的。
2.移除器
在窗口函数执行之前,移除某些数据
3.允许延迟
让数据在水位线延迟之后到达的数据依然能够进入到窗口中。
4.侧输出流
让在允许延迟之后到达的数据进入通过这个方法,输入到对应的topic中。

8.结语
今天的这篇文章字数有点多,但是却基本上涵盖了所有与窗口相关的内容,所以花点时间看也是应该的。截至到目前为止,已经将Flink中很多的特性都进行了讲解,除了基础计算算子之外。窗口、状态、检查点、多留转换都说完了。那接下来就讲讲Flink中的一些理论知识。当理论知识讲完了,也就没什么好说的了,搞定这些内容的话,Flink一定程度上的使用也没有问题了。到时我会继续讲讲与FlinkSQL相关的内容的,如果我写的东西对大家有帮助,还希望能够点赞、投币、转发、收藏哦!

免责申明:本站发布的内容(图片、视频和文字)以转载和分享为主,文章观点不代表本站立场,如涉及侵权请联系站长邮箱:xbc-online@qq.com进行反馈,一经查实,将立刻删除涉嫌侵权内容。

同类热门文章

深入了解C++中的new操作符:使用具体实例学习

C++中的new操作符是动态分配内存的主要手段之一。在程序运行时,我们可能需要动态地创建和销毁对象,而new就是为此提供了便利。但是,使用new也常常会引发一些问题,如内存泄漏、空指针等等。因此,本文将通过具体的示例,深入介绍C++中的new操作符,帮助读者更好地掌握其使用。


深入了解C++中的new操作符:使用具体实例学习

怎么用Java反射获取包下所有类? 详细代码实例操作

Java的反射机制就是在运行状态下,对于任何一个类,它能知道这个类的所有属性和方法;对于任何一个对象,都能调用这个对象的任意一个方法。本篇文章将通过具体的代码示例,展示如何通过Java反射来获取包下的所有类。


怎么用Java反射获取包下所有类? 详细代码实例操作

员工线上学习考试系统

有点播,直播,在线支付,三级分销等功能,可以对学员学习情况的监督监控,有源码,可二次开发。支持外网和局域网私有化部署,经过测试源码完整可用!1、视频点播:视频播放,图文资料,课件下载,章节试学,限时免

员工线上学习考试系统

了解Java中的volati关键字的作用 以及具体使用方法

本篇文章将和大家分享一下Java当中的volatile关键字,下面将为各位小伙伴讲述volatile关键字的作用以及它的具体使用方法。


了解Java中的volati关键字的作用 以及具体使用方法

Java Map 所有的值转为String类型

可以使用 Java 8 中的 Map.replaceAll() 方法将所有的值转为 String 类型: 上面的代码会将 map 中所有的值都转为 String 类型。 HashMap 是 Java

Java Map 所有的值转为String类型