DBLOG
» WTF
Toggle navigation
DBLOG
主页
OCM
1、概览
2、数据导入导出
3、GaussDB(DWS)数据库管理
4、数据库调优与开发实践
5、湖仓一体
6、开发应用
7、集群管理
8、巡检和维运维
About Me
归档
标签
3、 基于Flink构建实时数仓
无
2025-05-22 15:08:58
1
0
0
admin
[TOC] # Flink简介 ## 1. 概念 Flink是一个分布式的流批一体化的开源处理引擎,用于在无边界和有边界数据流上进行有状态的计算。 - **无界数据流**:数据源源不断地产生,例如来自Kafka的消息流,这类数据流没有结束的概念。 - **有界数据流**:有开始和结束的数据流,可以对所有数据进行处理。 - **有状态计算**:在流处理过程中,将中间状态保存下来,以供后续处理使用。 ## 2. 组件 | 组件 | 描述 | | ------------ | ------------------------------------------------------------ | | JobManager | 负责协调任务执行,分配任务,协调检查点,并处理失败。 | | TaskManager | 在Flink集群中并行执行任务,管理任务的状态和缓冲区。 | | Client | 用于提交Flink作业,并与JobManager通信。 | ## 3. 任务 Flink任务由多个算子组成,且每个算子可以设置各自的并行度。一般任务由source、transformation以及sink算子组成: - **source**:数据流的起点,负责从外部系统读取数据并将其转换为Flink可以处理的内部数据结构。 - **transformation**:用于对数据流应用一系列操作,以转换、聚合、连接或分割数据。 - **sink**:数据流的终点,负责将处理后的数据写入外部系统。 ## 4. API和库 | API | 描述 | | ------------- | -------------------------------------------------------- | | DataStream API| 用于构建流处理应用的核心API,提供了丰富的转换操作符。 | | Table API & SQL| 提供声明式的API,允许以类似SQL的方式查询流和批处理数据。 | | DataSet API | 用于批处理,但在最新的Flink版本中,推荐使用统一的DataStream API。 |  # dws-flink-connector介绍与使用 ## 1. 概述 dws-flink-connector是Flink与GaussDB(DWS)之间的桥梁,通过Flink SQL可以从GaussDB(DWS)中读写数据(包括增量读)。 - 支持实时读取增量数据,满足增量计算需求。 - 支持异步维流join,提升维流join性能。 - 支持Flink Catalog,简化Flink SQL的编写。 - 支持攒批以及并发入库,提升入库性能。 - 支持多种入库方式、主键冲突策略,满足各种入库需求。 ## 2. 批量读 GaussDB(DWS)中的表可以作为数据源供Flink进行批读,语法如下: ```sql -- 创建源表 CREATE TABLE batch_source_test ( a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/database', 'tableName' = 'batch_source_test', 'username' = '***', 'password' = '***' ); ``` ### 说明 - Flink SQL中表字段必须在GaussDB(DWS)中有对应字段。 - `WITH`参数中的`connector`需要指定为`dws`,`tableName`指定为GaussDB(DWS)对应的表名。 - 后续在Flink SQL中可以对`batch_source_test`表进行读取,例如: ```sql SELECT * FROM batch_source_test WHERE a > 100; ``` ## 3. 维流join GaussDB(DWS)中的表可以作为维表用于Flink的维流join(即用实时流和维表join),语法如下: ```sql -- 创建主表以及维度表 CREATE TABLE users ( id INT, name STRING, proctime AS PROCTIME() ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '1000' ); CREATE TABLE batch_source_test ( a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/database', 'tableName' = 'batch_source_test', 'lookupAsync' = 'true', 'username' = '***', 'password' = '***' ); ``` ### 说明 - Flink SQL中表字段必须在GaussDB(DWS)中有对应字段。 - 维表`WITH`参数中的`lookupAsync`表示是否异步读取。 #### 维表join语法 ```sql SELECT * FROM users LEFT JOIN batch_source_test FOR SYSTEM_TIME AS OF users.proctime ON users.id = batch_source_test.a; ``` ## 4. 攒批写 在GaussDB(DWS)中,可以将表作为结果表供Flink写入数据。这种写入可以按照一定的时间或一定的数据量进行批量处理。 ### 语法示例 以下是创建目标表的SQL语句: ```sql -- 创建目标表 CREATE TABLE batch_sink_test ( a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/database', 'tableName' = 'batch_sink_test', 'username' = '***', 'password' = '***' ); ``` ### 说明 - 在Flink SQL中,表的字段必须在GaussDB(DWS)中有对应的字段。 - `WITH`参数中的`connector`需要指定为`dws`,而`tableName`则指定为GaussDB(DWS)中的对应表名。 后续在Flink SQL中可以使用如下语句对`batch_sink_test`表进行写入: ```sql INSERT INTO batch_sink_test SELECT * FROM batch_source_test; ``` 确保在插入数据之前,`batch_source_test`表中有相应的数据和字段结构,以便能够正确写入到`batch_sink_test`表中。 ## 5. 丰富的写入方式 针对不同入库场景,可以配置不同的入库方式。例如,针对大批量数据实时入库,可以将`writeMode`设置为`COPY_MERGE`或者`COPY_UPSERT`,再配合设置写入的并发度(`sink.parallelism`),可以极大提升入库的效率。 后续也会提供直连DN的入库能力,可以去掉CN对数据分发的步骤,减少CN资源压力的同时,又可以更进一步提升入库的速度。 ## 6. 攒批写入典型参数 具体参数配置可以参考:数据仓库服务GaussDB(DWS) ### 数据写入模式说明 | 参数 | 描述 | | --------------- | --------------------------------------------------------------------------------------------- | | `writeMode` | - `auto`:系统自动选择(默认)。<br>- `copy_merge`:将数据复制到临时表,然后合并。<br>- `copy_upsert`:使用复制和更新。<br>- `upsert`:使用upsert SQL入库。<br>- `UPDATE`:使用update语法更新数据。<br>- `COPY_UPDATE`:通过临时表加速更新。<br>- `UPDATE_AUTO`:根据批量大小决定使用UPDATE或COPY_UPDATE。 | | `autoFlushBatchSize` | 自动刷库的批大小(攒批大小)。 | | `autoFlushMaxInterval` | 自动刷库的最大间隔时间(攒批时长)。 | | `copyWriteBatchSize` | 在`writeMode == auto`下,使用copy的批大小。 | ## 7. Flink Catalog 通过Flink Catalog的能力,打通Flink和GaussDB(DWS)表的相互映射,语法如下: ```sql -- 创建对应的catalog CREATE CATALOG dws WITH ( 'type' = 'dws', 'base_url' = 'jdbc:gaussdb://ip:port/', 'database' = 'postgres', 'password' = '***', 'username' = '***' ); -- 使用catalog USE CATALOG dws; -- 直接查询数据库中的表,无需在Flink侧建立映射表 SELECT * FROM batch_source_test WHERE a > 10; ``` ### 说明 - `WITH`参数中的`type`需要指定为`dws`,且`base_url`中无需携带数据库名称。 - 使用`USE CATALOG dws;`语法来指定使用新建的catalog。 - 可无需新建映射表便可以直接查询数据库中的表信息。 - 使用`SHOW CATALOGS;`来查询所有的catalog。 # 实时增量读取 ## 1. 实时增量读取的作用原理 实时增量读取是指只对发生变化的数据进行读取,而不是重新读取整个数据集。这种读取方式可以显著提高处理效率,减少资源消耗。 GaussDB(DWS)通过Binlog来实现增量读取,当对一张表进行DML操作时,进行双写,会将对应的DML操作记录到一张辅助表中,那么就可以通过读取该辅助表来获取增量的数据,用于数据同步或者增量计算。 ## 2. Binlog数据格式 | 字段名称 | 字段类型 | 含义 | | ---------------------------- | -------- | --------------------------------------------------- | | `gs_binlog_sync_point` | BIGINT | Binlog系统字段,表示该记录的同步点值。 | | `gs_binlog_event_sequence` | BIGINT | Binlog的系统字段,用于表示同一事务类操作的先后顺序。 | | `gs_binlog_event_type` | CHAR | Binlog的系统字段,表示当前记录的操作类型。 | | `user_column_1` | 用户列 | 用户自定义的数据列。 | | ... | ... | ... | | `user_column_n` | 用户列 | 用户自定义的数据列。 | ### `gs_binlog_event_type`可能的取值: 1. `'I'`:表示当前Binlog是插入一条新记录。 2. `'d'`:表示当前Binlog是删除一条记录。 3. `'B'`:表示当前Binlog是更新前的记录。 4. `'U'`:表示当前Binlog是更新后的记录。 ## 3. 实时读取Binlog GaussDB(DWS)中的Binlog表可以作为源表供Flink实时读取,语法如下: ```sql -- 创建源表 CREATE TABLE binlog_source_test ( a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'binlog_source_test', 'binlogSlotName' = 'slot', 'username' = '***', 'password' = '***' ); ``` ### 说明 - Flink SQL中表字段必须在GaussDB(DWS)中有对应字段。 - `WITH`参数中`binlog`属性需要设置为`true`,`binlogSlotName`需要设置为自定义的槽位名。 ### 数据同步示例 ```sql INSERT INTO sink_test SELECT * FROM binlog_source_test; ``` ### 增量计算示例 ```sql INSERT INTO sum_test SELECT SUM(a) FROM binlog_source_test; ``` ## 4. 实时读取Binlog注意事项 - 目前GaussDB(DWS)只有Hstore以及Hstore-opt表支持Binlog功能,表需要有主键且设置`enable_binlog=on`。 - 如果多个任务消费同一张表的Binlog数据,需要保证每个任务的`binlogSlotName`唯一。 - 为了达到最高的读取速度,建议将Flink的任务并行度和GaussDB(DWS)集群中的DN数设置一致。 - 使用dws-flink-connector的Sink能力来写入读取到的Binlog数据,需要注意以下几点: - 如果要保证DN内的数据写入顺序则需要设置`connectionSize=1`。 - 如果源端有更新主键操作或者Flink有聚合计算的话,需要将`ignoreUpdateBefore`设置为`false`(默认为`true`)。 ## 5. 实时读取Binlog参数 | 参数 | 数据类型 | 默认值 | 描述 | | ---------------------------- | -------- | ---------- | ---------------------------------------------- | | `binlog` | Boolean | false | 是否读取Binlog信息。 | | `binlogSlotName` | String | Flink映射表的表名 | 槽位信息,可以理解为一个标识。 | | `binlogBatchReadSize` | Integer | 5000 | 增量读取binlog的数据行数。 | | `fullSyncBinlogBatchReadSize`| Integer | 50000 | 全量读取binlog的数据行数。 | | `binlogReadTimeout` | Long | 600000 | 增量消费Binlog数据时超时时间,单位毫秒。 | | `fullSyncBinlogReadTimeout` | Long | 1800000 | 全量消费Binlog数据时超时时间,单位毫秒。 | | `binlogSleepTime` | Long | 500 | 实时消费不到Binlog数据时休眠时间,单位毫秒。 | | `binlogMaxSleepTime` | Long | 10000 | 实时消费不到Binlog数据时最大休眠时间,单位毫秒。| | `connectionPoolSize` | Integer | 5 | JDBC连接池连接大小。 | | `needRedistribution` | Boolean | true | 是否兼容扩充重分布。 | | `checkNodeChangeInterval` | Long | 10000 | 检测节点变化的间隔,只有`needRedistribution=true`时才生效。 | # 实时数仓构建 ## 1. 整体架构 利用Flink强大的实时处理能力和GaussDB(DWS)的Binlog能力,可以快速构建实时数仓,且无需维护其他组件(如Kafka),整体架构分层清晰,数据可以高效流动。  ## 2. 架构优势 - 分层清晰,和离线数仓分层类似(包括ODS、DWD、DWS、ADS等),便于业务人员理解和使用。 - 每一层的数据都存放在GaussDB(DWS)中,支持高效查询与修改;且可以对外单独提供服务,实现数据的高效复用。 - 模型统一,极简架构,无需引入额外组件,便于维护。 - 整体任务链路可以通过Flink SQL来驱动,简化上手难度。 ## 3. 端到端示例(同步场景) 建立源表: ```sql CREATE TABLE binlog_source_test ( a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'binlog_source_test', 'binlogSlotName' = 'slot', 'username' = '***', 'password' = '***' ); ``` 建立目标表: ```sql CREATE TABLE binlog_sink_test ( a INT, b INT, c INT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'binlog_sink_test', 'ignoreUpdateBefore' = 'false', 'username' = '***', 'password' = '***' ); ``` 构建实时同步任务: ```sql INSERT INTO binlog_sink_test SELECT * FROM binlog_source_test; ``` ## 4. 端到端示例(数据打宽) 建立源表: ```sql CREATE TABLE user_info( id INT, name STRING, age INT, proc_time AS PROCTIME(), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'binlog' = 'true', 'tableName' = 'user_info', 'binlogSlotName' = 'slot', 'username' = '***', 'password' = '***' ); ``` 建立维表: ```sql CREATE TABLE phone_info( user_id INT, phoneNumber STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'phone_info', 'lookupAsync' = 'true', 'username' = '***', 'password' = '***' ); ``` 建立目标表: ```sql CREATE TABLE result_info( id INT, name STRING, age INT, phoneNumber STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://ip:port/gaussdb', 'tableName' = 'result_info', 'ignoreUpdateBefore' = 'false', 'username = '***', 'password' = '***' ); ``` ### 构建实时数据打宽任务 在实时数据处理中,可以通过实时流和维表进行连接,构建如下任务: ```sql INSERT INTO result_info SELECT user.id, user.name, user.age, phone.phoneNumber FROM user_info AS user JOIN phone_info FOR SYSTEM_TIME AS OF user.proc_time AS phone ON user.id = phone.user_id; ``` ### 说明 - 在此示例中,`user_info`表作为源表提供用户信息,而`phone_info`表作为维表提供用户的电话号码。 - 通过`FOR SYSTEM_TIME AS OF`语法,可以确保查询的是在特定时间点的维度数据,从而实现实时打宽的功能。
上一篇:
2、湖格式——Hudi
下一篇:
3、 运维监控
0
赞
1 人读过
新浪微博
微信
腾讯微博
QQ空间
人人网