本文主要包括:
Flink时间语义与Watermark
Event Time:事件创建的时间
Ingestion Time:数据进入Flink的时间
Processing Time:执行操作算子的本地系统时间,与机器相关
WaterMark是用来处理乱序数据的
- Watermark是一种衡量Event Time进展的机制们可以设定延迟触发
- Watermark是用于处理乱序事件的吗,而正确的处理乱序事件,通常用Watermark机制集合window来实现
- 数据流重的Watermark用于表示timesamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的
- Watermark用来让程序自己平衡延迟和结果正确性
Watermark的特点:
- Watermark是一条特殊的数据记录
- Watermark必须单调递增,以确保任务的事件事件始终在向前推进,而不是在后退
- Watermark与数据的时间戳相关
每条数据都会计算出一个watermark,每个watermark都会和之前的watermark做比较,取更大的一个,确保watermark是向前推进的
多个分区的watermark,会取当前window下,所有分区重最小的一个,为了防止数据丢失
每个watermark都会和window的结束时间(window.getEnd())做比较,如果时间大于等于window结束时间,window就会被触发,处理的数据会小于window结束时间的,等于window结束时间的会放到下一个window
程序运行以后,会一直获取Watermark(执行getCurrentWatermark()函数),不管有没有数据进来
具体的可以看之前写的demo
WaterMarkDemo
当Flink 用来跟踪事件时间进度的水印已经超过了元素所属窗口的结束时间戳时,
即数据延迟的时候,默认情况下,数据会丢失,为了防止这种情况可以使用以下两种方式来避免:
- 可以通过设置allowedLateness来避免
当数据晚于Watermark后,为了防止数据丢失,可以通过设置allowedLateness来避免,
Allowed lateness 指定元素在被丢弃之前可以延迟多长时间,其默认值为 0。 - 获取延迟数据作为副输出(把延迟的数据放到另外一个流单独处理)
具体可以参考
Flink官网-Allowed Lateness
Flink窗口
一般真实的数据流都是无界的,但是window可以把无限的数据流切分,得到有限的数据集进行处理
window就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶重进行分析
窗口化的Flink程序的一般结构如下,第一个代码段中是分组的流,而第二段是非分组的流。正如我们所见,唯一的区别是分组的stream
调用keyBy(…)
和window(…)
,而非分组的stream
中window()
换成了windowAll(…)
,这些也将贯穿都这一页的其他部分中。
Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
方括号[]内的命令是可选的
WindowAssigner
WindowAssigner是负责将每一个到来的元素分配给一个或者多个窗口(window),Flink 提供了一些常用的预定义窗口分配器,即:滚动窗口、滑动窗口、会话窗口和全局窗口。
你也可以通过继承WindowAssigner类来自定义自己的窗口。所有的内置窗口分配器(除了全局窗口 global window)都是通过时间来分配元素到窗口中的,这个时间要么是处理的时间,要么是事件发生的时间
window分类:
- 时间窗口(Time Window)
** 滚动时间窗口
** 滑动时间窗口
** 会话窗口 - 计数窗口(Count Window)
** 滚动计数窗口
** 滑动计数窗口
窗口是滚动还是滑动,是根据WindowAssigner
来区分的滚动窗口(Tumbling Windows)
- 将数据依据固定的窗口长度对数据进行切分
- 时间对其,窗口长度固定,没有重叠
TumblingEventTimeWindows,这个WindowAssigner必须要结合watermark使用,否则,window不会触发
TumblingProcessingTimeWindows,这个WindowAssigner不需要watermark滑动窗口(Sliding Windows)
- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
- 窗口长度固定,可以有重叠
会话窗口(Session Windows)
- 由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口
- 特点:时间无对齐
window的生命周期
当第一个应该属于该窗口的元素到达时,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的时间时,该窗口将被完全删除,
Flink 确保了只清除基于时间的window,其他类型的window不清除
此外,每个窗口都有一个触发器(请参阅触发器)和一个函数(ProcessWindowFunction、ReduceFunction 或 AggregateFunction)(请参阅窗口函数)。
该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。触发策略可能类似于“当窗口中的元素数量超过 4 时”,或“当水印通过窗口末尾时”。
触发器还可以决定在创建和删除窗口之间的任何时间清除窗口的内容。在这种情况下,清除仅指窗口中的元素,而不是窗口元数据。这意味着新数据仍然可以添加到该窗口。
除了上述之外,您还可以指定一个 Evictor(参见 Evictors),它能够在触发器触发之后和之前从窗口中删除元素
Window Functions
在定义了窗口分配器之后,我们需要指定我们想要在这些窗口中的每一个上执行的计算。这是窗口函数的职责,
它用于在系统确定窗口已准备好处理时处理每个(可能是键控的)窗口的元素。窗口函数可以是ReduceFunction
、AggregateFunction
或ProcessWindowFunction
之一。
前两个可以更有效地执行(参见状态大小部分),因为 Flink 可以在每个窗口的元素到达时增量聚合它们。ProcessWindowFunction
获取包含在窗口中的所有元素的Iterable
以及有关元素所属窗口的附加元信息。
带有ProcessWindowFunction
的窗口转换不能像其他情况一样有效地执行,因为 Flink 在调用函数之前必须在内部缓冲窗口的所有元素。
这可以通过将 ProcessWindowFunction
与 ReduceFunction
或 AggregateFunction
结合来获得窗口元素的增量聚合和 ProcessWindowFunction
接收的附加窗口元数据来缓解。
ReduceFunction
ReduceFunction 指定如何组合来自输入的两个元素以生成相同类型的输出元素。 Flink 使用 ReduceFunction 来增量聚合窗口的元素。
ReduceFunction 有多种实现方式
第一种是直接窗口内聚合,不能对窗口外的数据做聚合,这个是最基本的reduce
第二种,先执行窗口内的聚合,然后,可以执行窗口外的,代码如下:
public <R> SingleOutputStreamOperator<R> reduce(
ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) {
TypeInformation<R> resultType =
getProcessWindowFunctionReturnType(function, input.getType(), null);
return reduce(reduceFunction, function, resultType);
}
ReduceFunction 处理玩的结果,会当成输入写入到ProcessWindowFunction
这种适合处理,先用reduce预聚合,然后再通过ProcessWindowFunction整体聚合
AggregateFunction同样有这种功能,可能传入ProcessWindowFunction
AggregateFunction
AggregateFunction 是 ReduceFunction 的通用版本,它具有三种类型:输入类型 (IN)、累加器类型 (ACC) 和输出类型 (OUT)。
输入类型是输入流中元素的类型,AggregateFunction 有一种方法可以将一个输入元素添加到累加器中。
该接口还具有用于创建初始累加器、将两个累加器合并为一个累加器以及从累加器中提取输出(OUT 类型)的方法。 我们将在下面的示例中看到它是如何工作的。
与 ReduceFunction 相同,Flink 将在窗口的输入元素到达时增量地聚合它们。
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
上面的示例计算窗口中元素的第二个字段的平均值。
ProcessWindowFunction
ProcessWindowFunction 获得一个包含窗口所有元素的 Iterable 和一个可以访问时间和状态信息的 Context 对象,这使其能够提供比其他窗口函数更大的灵活性。
这是以性能和资源消耗为代价的,因为元素不能增量聚合,而是需要在内部缓冲,直到窗口被认为准备好进行处理。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
该示例显示了一个 ProcessWindowFunction,它对窗口中的元素进行计数。此外,窗口函数将有关窗口的信息添加到输出中。
请注意,将 ProcessWindowFunction 用于简单的聚合(例如 count)是非常低效的
增量窗口聚合使用 AggregateFunction 或者 ReduceFunction同样可以达到相同的想过,并且,效率很高
WindowFunction (Legacy):apply
在某些可以使用 ProcessWindowFunction 的地方,您也可以使用 WindowFunction。这是 ProcessWindowFunction 的旧版本,提供较少的上下文信息并且没有一些高级功能,例如每个窗口的键控状态。
此接口将在某些时候被弃用。
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
在 ProcessWindowFunction 中使用每个窗口状态
除了访问键控状态之外,ProcessWindowFunction 还可以使用键控状态,该键控状态的范围限定为函数当前正在处理的窗口。 在这种情况下,了解每个窗口状态所指的窗口是什么很重要。 涉及不同的“窗口”:
这地方没看懂
flink window相关内容,具体可以参考Flink Window
Flink 1.17.2提交任务到yarn,taskmanager的metric显示异常
原因是maven项目flink-yarn与hbase的maven有冲突,把maven冲突解决了以后,就可以了。具体哪个包,没有细研究
flink-1.13.6里没有这些参数,flink-1.17.2里有,而且,flink-1.18.0里也有,也存在flink-1.17.2相同的问题,所以,肯定还是参数配置的问题
Flink 1.17.2 standalone web页面显示异常
直接访问8081端口,web页面访问不通
解决办法:把flink-conf.yml中的localhost全部改成0.0.0.0