本文主要包括:
FlinkSQL
FlinkSQL JOIN
Flink 作为流式数据处理框架的领跑者,在吞吐量、时延、准确型、容错性等方面都有优异的表现。在 API 方面,它为用户提供了较底层的 DataStream API,也推出了 Table API 和 SQL 等编程接口。特别来看,SQL 以其易用、易迁移的特点,深受广大用户的欢迎。
在常见的数据分析场景中,JOIN(关联)操作是一项很有挑战性的工作,因为它涉及到左右两个表(流)的状态匹配,对内存的压力较大;而相比恒定的批数据而言,流数据更加难以预测,例如数据可能乱序、可能晚到,甚至可能丢失,因此需要缓存的状态量更加庞大,甚至会严重拖慢整体的数据处理进度。由此可见,流的 JOIN 并没有一个全能且通用的方案,我们必须在 低时延 和 高精准 等维度间进行取舍。
考虑到不同业务场景的时效性、准确型要求不同,Flink 提供了多种流式的 JOIN 操作,用户可以根据实际情况选择最适合自己的类型。下面我们对它们逐一进行介绍。
常规 JOIN(Regular JOIN)
常规 JOIN(Regular JOIN)是语法最简单的一类 JOIN,和传统数据库的 JOIN 语法完全一致。对于左表和右表的任何变动,都会触发实时计算和更新,因此它的结果是“逐步逼近”最终的精确值,也就是下游可能看到变来变去的结果。为了支持结果的更新,下游目的表需要 定义主键 (PRIMARY KEY NOT ENFORCED)。
常规 JOIN 支持 INNER、LEFT、RIGHT 等多种 JOIN 类型。其中 INNER JOIN 只会下发 Upsert 数据流(即只有更新和插入,没有删除操作),而 LEFT 和 RIGHT JOIN 则会下发更多类型的 Changelog 数据流(包含了插入、更新、删除等各种类型)。对于各类数据流的区别和转化,请参见 Flink 官方文档:动态表。
常规 JOIN 运行时需要保留左表和右表的状态,且随着时间的推移,状态会无限增长,最终可能导致作业 OOM 崩溃或异常缓慢。因此我们强烈建议用户在 Flink 参数中设置 table.exec.state.ttl 选项,它可以指定 JOIN 状态的保留时间,以便 Flink 及时清理过期的状态。
时间区间 JOIN(Interval JOIN)
时间区间 JOIN 是另一种关联策略,它与上述的常规 JOIN 不同之处在于,左右表仅在某个时间范围(给定上界和下界)内进行关联,且只支持普通 Append 数据流,不支持含 Retract 的动态表。如下图(来自 Flink 官方文档)。它的好处是由于给定了关联的区间,因此只需要保留很少的状态,内存压力较小。但是缺点是如果关联的数据晚到或者早到,导致落不到 JOIN 区间内,就可能导致结果不准确。此外,只有当区间过了以后,JOIN 结果才会输出,因此会有一定的延迟存在。
窗口 JOIN
窗口 JOIN 也是用法非常简单的一种 JOIN 类型。它以窗口为界,对窗口里面的左表、右表数据进行关联操作。由于 Flink 支持滑动(TUMBLE)、滚动(HOP 也叫做 SLIDING)、会话(SESSION)等不同窗口类型,因此可以根据业务需求进行选择。
窗口 JOIN 不强制要求左右表必须包含时间戳字段,但是如果您使用时间相关窗口的话,也需要提供相关的时间戳来划分窗口。
和上述 时间区间 JOIN 类似,窗口 JOIN 的输出也是最终值,也就是说不会出现 常规 JOIN 那样不断变动的结果。但是缺点也一样,它只能在窗口结束后输出关联结果,且对于早到或者晚到等不在窗口内的数据是无法参与计算的,因此实时性和准确性方面都相对较差。
时态表 JOIN(Temporal JOIN)
时态表 JOIN 是一类特殊的关联操作:本文前半部分介绍的各种 JOIN 类型都是基于最新的数据进行关联,而 时态表 JOIN 则可以根据左表记录中的时间戳,在右表的历史版本中进行查询和关联。例如我们的商品价格表会随时间不断变动,左表来了一条时间戳为 10:00 的订单记录,那么它会对右表在 10:00 的商品价格快照(当时的价格)进行关联并输出结果;如果随后左表来了一条 10:30 的订单记录,那么它会对右表在 10:30 时的商品价格进行后续的关联。这种特性对于统计不断变动的时序数据非常有用。
时态表 JOIN 分为 事件时间(Event Time) 和 处理时间(Processing Time) 两种类型,且只支持 INNER 和 LEFT JOIN。由于基于处理时间的时态表 JOIN 存在 Bug(参见 FLINK-19830),因此在最新的 Flink 版本中已被禁用。我们这里主要介绍基于事件时间的时态表 JOIN。
由于时态表 JOIN 需要得知不同时刻下右表的不同版本,因此它的右表必须是 Changelog 动态表(即 Upsert、Retract 数据流,而非 Append 数据流),且两侧的源表都必须定义 WATERMARK FOR。随着 Watermark 水位推进,Flink 可以逐步清理失效的数据,因此时态表 JOIN 的内存压力相对也不大。此外,还要求时态表的主键必须包含在 JOIN 等值条件中。
具体可参考[Flink SQL 双表 JOIN 介绍与原理简析][2]
多表join测试
测试大数据量下多表Join,数据是否会一直缓存着,数据具体缓存在哪边?
因为之前在测试flinksql多表join的时候,发现只要一个表数据变动,哪怕另外一个表数据没有变动,数据也会关联上。这里再次验证一下
数据量分布:
表名 | 数据量 |
---|---|
customer | 1500000 |
orders | 15000000 |
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=3sec;
CREATE TABLE customer (
`c_custkey` int primary key ,
`c_name` string ,
`c_address` string ,
`c_nationkey` int ,
`c_phone` string ,
`c_acctbal` decimal(15,2) ,
`c_mktsegment` string ,
`c_comment` string
) WITH (
'connector'= 'mysql-cdc',
'hostname'= '150.158.190.192',
'port'= '3306',
'username'= 'root',
'password'='Gjc123!@#',
'server-time-zone'= 'Asia/Shanghai',
'scan.startup.mode'='latest-offset',
'database-name'= 'tpch',
'table-name'= 'customer'
);
CREATE TABLE orders (
`o_orderkey` int primary key ,
`o_custkey` int ,
`o_orderstatus` string ,
`o_totalprice` decimal(15,2) ,
`o_orderdate` date ,
`o_orderpriority` string ,
`o_clerk` string,
`o_shippriority` int ,
`o_comment` string
) WITH (
'connector'= 'mysql-cdc',
'hostname'= '150.158.190.192',
'port'= '3306',
'username'= 'root',
'password'='Gjc123!@#',
'server-time-zone'= 'Asia/Shanghai',
'scan.startup.mode'='latest-offset',
'database-name'= 'tpch',
'table-name'= 'orders'
);
create table result_print(
c_custkey int,
c_name string ,
o_orderkey int,
o_custkey int ,
o_orderstatus string
) WITH (
'connector' = 'print'
);
insert into result_print
select c_custkey,c_name,o_orderkey,o_custkey,o_orderstatus
from customer a
left join orders b
on a.c_custkey = b.o_custkey
-- 在mysql里执行的sql
update orders set o_orderstatus = 'C' where o_orderkey = 1;
update customer set c_name = 'aaa' where c_custkey = 369001;
update orders set o_orderstatus = 'B';
update customer set c_name = 'aaa';
测试步骤:
- 先只修改customer表的数据,orders表不变,这时候,关联出来取自orders表的数据均为空
- 修改orders表的数据,这时候关联出来的数据就正常了
- 把orders和customer表的数据全部修改一次,为了看状态存在哪,能存多少时间?
通过这次测试,可以看出,两表join,两个流的数据是会被缓存下来,如果关联的数据在另外一个流里没有(另外一个表没有变动),这时候是关联不上的
通过观察,发现这两个流的数据会被缓存在checkpoint里
[root@V-SH-101-227 log]# hadoop fs -du -h /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/
296.1 M 888.4 M /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/chk-2585
128 M 768 M /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/chk-2586
0 0 /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/shared
0 0 /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/taskowned
[root@V-SH-101-227 log]# hadoop fs -du -h /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/
296.1 M 888.4 M /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/chk-2586
0 384 M /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/chk-2587
0 0 /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/shared
0 0 /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/taskowned
[root@V-SH-101-227 log]# hadoop fs -du -h /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/
296.1 M 888.4 M /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/chk-2586
0 384 M /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/chk-2587
0 0 /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/shared
0 0 /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/taskowned
[root@V-SH-101-227 log]# hadoop fs -du -h /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/
296.1 M 888.4 M /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/chk-2586
0 384 M /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/chk-2587
0 0 /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/shared
0 0 /flink/checkpoint/sql/7465947e9b337b3b4f74beb2e6442731/taskowned
可以看出checkpoint很大,这就会有一个问题,双流jion,如果表特别大,checkpoint就会很大,而且,如果代码修改了,数据只能从头再来一次,局限性有点大,会在网上继续找找有什么方法规避