DBLOG
» WTF
Toggle navigation
DBLOG
主页
OCM
1、概览
2、数据导入导出
3、GaussDB(DWS)数据库管理
4、数据库调优与开发实践
5、湖仓一体
6、开发应用
7、集群管理
8、巡检和维运维
About Me
归档
标签
4、数据调度工具
无
2025-05-22 15:09:06
0
0
0
admin
# 调度功能介绍 ## Airflow 基础介绍 (1) **什么是 Airflow?** Apache Airflow 是一个开源平台,用于开发、调度和监控面向批处理的工作流程。Airflow 的创建是一个可扩展的 Python 框架,主要特点是所有工作流都用 Python 代码定义,此特点有如下几个优点: - **动态**:Airflow 管道配置为 Python 代码,允许动态管道生成。 - **可扩展**:Airflow 框架包含可连接多种技术的运算符,所有 Airflow 组件都可扩展,易于适应运行环境。 - **灵活**:利用 Jinja 模板引擎内置工作流参数化。  ## Airflow 基础介绍 (2) **分布式架构:** - **Workers**:执行分配的任务。 - **Scheduler**:负责将必要的任务添加到队列。 - **Web Server**:HTTP 服务器,提供对 DAG/任务状态等信息的访问。 - **Database**:存储有关任务、DAG、变量、连接等状态的信息。 - **Celery**: - **Broker**:存储要执行的任务。 - **Result backend**:存储已完成任务的状态。  ## Airflow 基础介绍 (3) **组件通信:** - [1] Web server -> Worker:获取执行日志。 - [2] Web server -> DAG files:解析 DAG。 - [3] Web server -> Database:获取任务状态。 - [4] Worker -> DAG files:解析 DAG,并执行任务。 - [5] Worker -> Database:获取并存储有关连接配置、变量等信息。 - [6] Worker -> Result backend:保存任务的状态。 - [7] Worker -> Queue broker:执行分配的任务。 - [8] Scheduler -> DAG files:解析 DAG,并分配任务。 - [9] Scheduler -> Database:存储 DAG 信息和执行状态。 - [10] Scheduler -> Result backend:获取已完成任务的状态信息。 - [11] Scheduler -> Queue broker:分配任务。 ## Airflow 的创建 (1)  ## Airflow 的创建 (2) - **选择需要访问的 DWS**:Airflow 和 DWS 不在同一网络,选择 DWS 后,会自动创建匹配的终端节点和终端节点服务,Airflow 通过终端节点 IP 和终端服务端口可以访问 DWS(DWS 需绑定 ELB,ELB 无需绑定外网)。 - **选择 OBS 访问凭据**:访问 OBS,需要用户的 AK/SK。在数据加密服务 -> 凭据管理 -> 凭据列表中创建凭据,凭据值分别为 access.key、secret.key,值为对应的 AK、SK。 - **选择 OBS**,配置 DAG 文件存储目录、日志文件存储目录。  ## Airflow 的创建 (3) - **自定义参数配置**(Airflow 创建后,可在 Airflow 详情页,修改自定义参数,重启 Airflow 生效): - 可以配置 Airflow 支持的参数,如: - `AIRFLOW__CELERY__WORKER_CONCURRENCY`(最大并发数) - `AIRFLOW__CELERY__SYNC_PARALLELISM`(同步任务状态进程数) - 下述参数不支持自定义配置: - `AIRFLOW__LOGGING__BASE_LOG_FOLDER`(日志目录) - `AIRFLOW__CORE__DAGS_FOLDER`(DAG 文件目录) - `AIRFLOW__CORE__EXECUTOR`(使用的执行器) - `AIRFLOW__WEBSERVER__BASE_URL`(Web 基础 URL) - `AIRFLOW__DATABASE__SQL_ALCHEMY_CONN`(元数据信息数据库连接) - `AIRFLOW__CELERY__BROKER_URL`(消息队列连接) - `AIRFLOW__CELERY__RESULT_BACKEND`(任务状态存储数据库连接) ## Airflow 的删除 - 当用户不再需要使用某个集群时,可以删除该集群。删除的集群无法恢复,同时集群中的用户数据、已执行任务的历史记录也会自动删除且无法再访问。删除集群时,不会删除集群使用的 DAG 文件。  --- # 调度最佳实践实操 ## 新建数据库连接 - 在 Airflow 集群管理界面中,单击对应集群的“登录”按钮,可访问 Airflow 的 Web UI。 - 输入创建 Airflow 集群时输入的管理员账号与密码进行登录。 ### 执行任务 - 访问 Admin -> Connections -> +,创建连接,输入 Connection ID、Connection type、Host、Database、User、Password、Port 等关键信息: - Host 为创建终端节点服务时生成的终端节点 IP。 - Port 为创建终端节点服务时输入的终端端口。 - Database、User、Password 为创建终端节点服务时选择 GaussDB(DWS) 所对应的数据库、用户名和密码。  ## 编写 DAG 文件并上传 - 按照 Airflow DAG 任务规范,编写对应任务。 - 在 Airflow 中新建连接,通过 base_hook 获取连接 ID 获取对应数仓的连接信息,避免在 DAG 文件中硬编码数仓密码。 - 上传文件至 Airflow 配置的 DAGs 目录: - Airflow 会定时扫描 DAGs 目录下的文件,文件上传后,等待 1 分钟左右,可在 DAGs 页面查看上传的文件是否被正确加载。 - 点击进入详情页,可查看对应任务的源码。   ## 执行任务 - 可通过配置 DAG 中的 `start_date` 和 `schedule` 来设置任务的执行时间和周期,也可在页面触发任务立即执行。 - 任务执行完成后,显示对应的执行结果,执行成功后,可去数据库查看数据是否插入数据库,是否符合预期。 
上一篇:
4、分区表
下一篇:
4、集群容灾管理
0
赞
1 人读过
新浪微博
微信
腾讯微博
QQ空间
人人网