数据库模式

本文将深入介绍 Apache DolphinScheduler 所采用的数据库模式,此模式主要用于持久化存储工作流定义、执行状态、调度信息以及系统元数据。它具备广泛的兼容性,可支持 MySQL、PostgreSQL 和 H2 等多种数据库,其具体定义存储在 dolphinscheduler - dao/src/main/resources/sql 目录下。

DolphinScheduler 的数据库模式分为七个主要功能组:

* 组 * 目的 * 关键表 ** ---- * ---- * ---- ** 工作流管理 * 存储带有版本控制的工作流和任务定义 * t_ds_workflow_definitiont_ds_task_definitiont_ds_workflow_task_relation ** 执行状态 * 跟踪运行时实例及其状态 * t_ds_workflow_instancet_ds_task_instancet_ds_command ** 调度 * 通过 Quartz 管理基于 cron 的调度 * t_ds_schedulesQRTZ_* 表 ** 资源管理 * 数据源、文件和 UDF 元数据 * t_ds_datasourcet_ds_resourcest_ds_udfs ** 管理 * 用户、租户、项目和权限 * t_ds_usert_ds_tenantt_ds_project ** 告警 * 告警配置和历史记录 * t_ds_alertt_ds_alertgroup ** 服务注册 * 基于 JDBC 的协调(ZooKeeper 的替代方案) * t_ds_jdbc_registry_* 表 *

定义与实例分离

DolphinScheduler 严格区分定义(模板)和实例(执行)。这实现了版本控制、并发执行和审计跟踪。

关键设计原则

  • 基于代码的标识:工作流和任务都使用代码(bigint)作为跨版本的稳定标识符。
  • 复合键:定义使用(代码,版本)作为复合自然键。
  • 版本不可变性:每个版本都是不可变的;更改会创建新版本。
  • 实例引用:实例引用特定版本的定义。

工作流定义表

t_ds_workflow_definition

工作流模板的主表。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** id * int * 自动递增主键 ** code * bigint * 唯一工作流标识符(跨版本稳定) ** version * int * 版本号(默认 1) ** name * varchar(255) * 工作流名称 ** project_code * bigint * 所属项目 ** release_state * tinyint * 0 = 离线,1 = 在线 ** global_params * text * JSON 格式的全局参数 ** execution_type * tinyint * 0 = 并行,1 = 串行等待,2 = 串行丢弃,3 = 串行优先级 ** timeout * int * 超时时间(分钟) ** user_id * int * 创建者用户 ID *

索引

  • UNIQUE KEY workflow_unique (name, project_code)
  • UNIQUE KEY uniq_workflow_definition_code (code)
  • KEY idx_project_code (project_code)

t_ds_workflow_definition_log

存储工作流定义所有版本的审计日志。

镜像 t_ds_workflow_definition 的结构,额外列:operatoroperate_time,主键:(code, version)

t_ds_task_definition

可在工作流中重用的任务模板。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** code * bigint * 唯一任务标识符 ** version * int * 版本号 ** task_type * varchar(50) * Shell、SQL、Python、Spark 等 ** task_params * longtext * JSON 格式的任务配置 ** worker_group * varchar(255) * 目标工作线程组 ** fail_retry_times * int * 失败重试次数 ** fail_retry_interval * int * 重试间隔(分钟) ** timeout * int * 任务超时时间(分钟) ** cpu_quota * int * CPU 限制(-1 = 无限制) ** memory_max * int * 内存限制(MB,-1 = 无限制) *

t_ds_workflow_task_relation

通过指定任务之间的边来定义 DAG 结构。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** workflow_definition_code * bigint * 父工作流 ** workflow_definition_version * int * 工作流版本 ** pre_task_code * bigint * 前置任务(根节点为 0) ** post_task_code * bigint * 后置任务 ** condition_type * tinyint * 0 = 无,1 = 判断,2 = 延迟 ** condition_params * text * JSON 格式的条件配置 *

注意pre_task_code = 0 表示根节点(无前驱任务)。

执行状态表

t_ds_workflow_instance

工作流的运行时执行记录。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** id * int * 主键 ** workflow_definition_code * bigint * 引用定义 ** workflow_definition_version * int * 本次执行锁定的版本 ** state * tinyint * 0 = 提交,1 = 运行中,2 = 暂停准备,3 = 已暂停,4 = 停止准备,5 = 已停止,6 = 失败,7 = 成功,8 = 需要容错,9 = 已终止,10 = 等待,11 = 等待依赖 ** state_history * text * 状态转换日志 ** start_time * datetime * 执行开始时间 ** end_time * datetime * 执行结束时间 ** command_type * tinyint * 0 = 开始,1 = 从当前开始,2 = 恢复,3 = 恢复暂停,4 = 从失败处开始,5 = 补充,6 = 调度,7 = 重新运行,8 = 暂停,9 = 停止,10 = 恢复等待 ** host * varchar(135) * 执行此工作流的主服务器主机 ** executor_id * int * 触发执行的用户 ** tenant_code * varchar(64) * 用于资源隔离的租户 ** next_workflow_instance_id * int * 用于串行执行模式 *

索引

  • KEY workflow_instance_index (workflow_definition_code, id)
  • KEY start_time_index (start_time, end_time)

t_ds_task_instance

单个任务的运行时执行记录。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** id * int * 主键 ** task_code * bigint * 引用任务定义 ** task_definition_version * int * 锁定的版本 ** workflow_instance_id * int * 父工作流实例 ** state * tinyint * 与 workflow_instance 相同的状态值 ** submit_time * datetime * 提交到队列的时间 ** start_time * datetime * 实际执行开始时间 ** end_time * datetime * 执行结束时间 ** host * varchar(135) * 执行任务的工作线程主机 ** execute_path * varchar(200) * 工作线程上的工作目录 ** log_path * text * 日志文件路径 ** retry_times * int * 当前重试次数 ** var_pool * text * 供下游任务使用的变量 *

索引KEY idx_task_instance_code_version (task_code, task_definition_version)

命令模式与工作流执行

命令队列

t_ds_command 表实现了基于队列的执行模型,其中命令触发工作流实例。

t_ds_command 结构

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** command_type * tinyint * 0 = 开始,1 = 从当前开始,2 = 恢复,3 = 恢复暂停,4 = 从失败处开始,5 = 补充,6 = 调度,7 = 重新运行,8 = 暂停,9 = 停止 ** workflow_definition_code * bigint * 目标工作流 ** workflow_instance_id * int * 用于恢复/重新执行操作 ** workflow_instance_priority * int * 0 = 最高,1 = 高,2 = 中,3 = 低,4 = 最低 ** command_param * text * JSON 格式的执行参数 ** worker_group * varchar(255) * 目标工作线程组 ** tenant_code * varchar(64) * 执行的租户 ** dry_run * tinyint * 0 = 正常,1 = 试运行(无实际执行) *

处理流程

  1. 通过 API、调度程序或重试逻辑将命令插入 t_ds_command
  2. 主服务器的 MasterSchedulerThread 持续扫描该表(按优先级、id 排序)。
  3. 主服务器生成 t_ds_workflow_instance 记录。
  4. 主服务器分析 DAG 并为就绪任务创建 t_ds_task_instance 记录。
  5. 成功处理的命令将被删除;失败的命令将移动到 t_ds_error_command

基于代码的版本控制模型

DolphinScheduler 使用复杂的版本控制系统,支持:

  • 不同版本的并发执行。
  • 安全更新而不影响正在运行的实例。
  • 完整的变更审计跟踪。

版本管理规则

  • 当前版本表:只有“当前”版本存在于 t_ds_workflow_definitiont_ds_task_definition 中。
  • 日志表:所有版本保存在 *_log 表中,具有 UNIQUE KEY (code, version)
  • 在线状态:每个代码只能有一个版本的 release_state = 1(在线)。
  • 实例锁定:工作流实例在创建时锁定到特定版本。
  • 版本不可变性:一旦某个版本被实例引用,其日志记录即为不可变。

Quartz 集成

DolphinScheduler 集成了 Quartz 调度程序以实现基于 cron 的调度。模式包括标准 Quartz 表以及一个映射表。

t_ds_schedules

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** workflow_definition_code * bigint * 目标工作流(唯一) ** start_time * datetime * 调度活动开始时间 ** end_time * datetime * 调度活动结束时间 ** timezone_id * varchar(40) * cron 表达式的时区 ** crontab * varchar(255) * cron 表达式 ** release_state * int * 0 = 离线,1 = 在线 ** failure_strategy * int * 失败时的行为 ** workflow_instance_priority * int * 实例的默认优先级 *

Quartz 表要点

  • QRTZ_TRIGGERS.NEXT_FIRE_TIME:已索引,便于高效扫描。
  • QRTZ_CRON_TRIGGERS.CRON_EXPRESSION:解析后的 cron 定义。
  • QRTZ_SCHEDULER_STATE:跟踪 Quartz 调度程序实例。

数据源管理

t_ds_datasource

存储 SQL 任务的数据库连接配置。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** name * varchar(64) * 数据源名称 ** type * tinyint * 数据库类型(MySQL、PostgreSQL、Hive 等) ** connection_params * text * JSON 格式的连接配置(主机、端口、数据库、凭据) ** user_id * int * 所有者用户 *

约束UNIQUE KEY (name, type) - 防止数据源重复。

文件资源

t_ds_resources(已弃用)

注意:此表在模式中已标记为弃用。资源元数据正在迁移到单独的存储后端。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** full_name * varchar(128) * 包括租户的完整路径 ** type * int * 文件类型(文件/UDF) ** size * bigint * 文件大小(字节) ** is_directory * boolean * 目录标志 ** pid * int * 父目录 ID *

项目、用户和租户层次结构

关键管理表

t_ds_tenant

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** tenant_code * varchar(64) * 唯一租户标识符(唯一) ** queue_id * int * 任务的默认 YARN 队列 ** description * varchar(255) * 租户描述 *

默认租户:系统创建一个默认租户,id = -1tenant_code = 'default'

t_ds_user

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** user_name * varchar(64) * 登录用户名(唯一) ** user_password * varchar(64) * 哈希密码 ** user_type * tinyint * 0 = 普通用户,1 = 管理员 ** tenant_id * int * 关联的租户(默认 -1) ** email * varchar(64) * 电子邮件地址 ** state * tinyint * 0 = 禁用,1 = 启用 *

t_ds_project

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** code * bigint * 唯一项目代码(唯一) ** name * varchar(255) * 项目名称(唯一) ** user_id * int * 创建者/所有者 ** description * varchar(255) * 项目描述 *

对于不使用 ZooKeeper 的部署,DolphinScheduler 提供基于 JDBC 的注册表用于服务协调。

注册表详情

t_ds_jdbc_registry_data

存储类似于 ZooKeeper 节点的注册表项。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** data_key * varchar(256) * 类似路径的键(唯一) ** data_value * text * 序列化数据 ** data_type * varchar(64) * EPHEMERAL(客户端断开连接时删除)或 PERSISTENT ** client_id * bigint * 所属客户端 *

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** last_update_time * timestamp * 上次修改时间 *

t_ds_jdbc_registry_lock

实现分布式锁。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** lock_key * varchar(256) * 锁标识符(唯一) ** lock_owner * varchar(256) * 持有锁的客户端(格式:ip_processId) ** client_id * bigint * 所属客户端 *

t_ds_jdbc_registry_client_heartbeat

跟踪活动客户端以清理临时数据。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** id * bigint * 客户端 ID(主键) ** client_name * varchar(256) * 客户端标识符 ** last_heartbeat_time * bigint * 上次心跳时间戳 ** connection_config * text * 连接元数据 *

清理逻辑:当客户端的心跳过期时,其临时注册表数据和锁将自动删除。

告警表

t_ds_alert

由工作流/任务失败或完成生成的告警记录。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** title * varchar(512) * 告警标题 ** sign * char(40) * 内容的 SHA1 哈希值(用于去重) ** content * text * 告警消息正文 ** alert_status * tinyint * 0 = 等待,1 = 成功,2 = 失败 ** warning_type * tinyint * 1 = 工作流成功,2 = 工作流/任务失败 ** workflow_instance_id * int * 源工作流实例 ** alertgroup_id * int * 目标告警组 *

索引KEY idx_sign (sign) - 实现去重。

t_ds_alertgroup

告警通道组。

* 列 * 类型 * 描述 ** ---- * ---- * ---- ** group_name * varchar(255) * 唯一组名 ** alert_instance_ids * varchar(255) * 逗号分隔的插件实例 ID ** description * varchar(255) * 组描述 *

关键索引

该模式包含针对常见查询模式精心设计的索引:

  • 工作流和任务查找
- 按定义查询工作流实例:  `KEY workflow_instance_index (workflow_definition_code, id)`  - 按定义查询任务实例:  `KEY idx_task_instance_code_version (task_code, task_definition_version)`  - 用于监控的时间范围查询*:  `KEY start_time_index (start_time, end_time)`
  • 命令处理
基于优先级的命令扫描:`KEY priority_id_index (workflow_instance_priority, id)`
  • DAG 关系查询
- 正向和反向 DAG 遍历:  `KEY idx_pre_task_code_version (pre_task_code, pre_task_version)`   正向和反向 DAG 遍历:   `KEY idx_post_task_code_version (post_task_code, post_task_version)`  `KEY idx_code (project_code, workflow_definition_code)`

唯一约束

在数据库级别强制执行的关键业务规则:

* 表 * 约束 * 目的 ** ---- * ---- * ---- ** t_ds_workflow_definition * UNIQUE (name, project_code) * 项目中无重复的工作流名称 ** t_ds_workflow_definition * UNIQUE (code) * 全局工作流标识符 ** t_ds_workflow_definition_log * UNIQUE (code, version) * 每个版本一条记录 ** t_ds_datasource * UNIQUE (name, type) * 每种类型无重复的数据源名称 ** t_ds_schedules * UNIQUE (workflow_definition_code) * 每个工作流一个调度 *

DolphinScheduler 在 dolphinscheduler - dao/src/main/resources/sql/upgrade 中维护用于跨版本模式迁移的升级脚本。

近期模式变更

3.3.0 变更

  • 将表和列从“process”重命名为“workflow”。
  • 删除数据质量表(t_ds_dq_*)。
  • 添加用于替代 ZooKeeper 的 JDBC 注册表。
  • 从任务表中删除与缓存相关的列。

3.2.0 变更

  • 向工作流定义中添加 execution_type(并行/串行模式)。
  • 为串行执行链添加 next_workflow_instance_id
  • 向命令和实例表中添加 tenant_code
  • 创建 t_ds_project_parametert_ds_project_preference

服务层访问

数据库访问通过 dolphinscheduler - dao 中的 DAO 层进行抽象。关键服务类

  • ProcessService:工作流/任务定义和实例的 CRUD 操作。
  • CommandService:命令队列管理。
  • ProjectService:项目和权限管理。
  • ResourcesService:资源元数据操作。

事务管理

大多数操作使用 Spring 的 @Transactional 注解实现:

  • 原子性地创建工作流实例及其任务实例。
  • 消费命令并创建实例。
  • 版本更新与日志表同步。

连接池

系统使用 HikariCP 进行连接池,在 application.yaml 中配置:

  • 默认池大小:50 个连接。
  • 连接超时:30 秒。
  • 空闲超时:600 秒。
",
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:alixiixcom@163.com