城市繁荣合并建造
94.53M · 2026-03-29

本文将深入介绍 Apache DolphinScheduler 所采用的数据库模式,此模式主要用于持久化存储工作流定义、执行状态、调度信息以及系统元数据。它具备广泛的兼容性,可支持 MySQL、PostgreSQL 和 H2 等多种数据库,其具体定义存储在 dolphinscheduler - dao/src/main/resources/sql 目录下。
DolphinScheduler 的数据库模式分为七个主要功能组:
* 组 * 目的 * 关键表 ** ---- * ---- * ---- ** 工作流管理 * 存储带有版本控制的工作流和任务定义 * t_ds_workflow_definition、t_ds_task_definition、t_ds_workflow_task_relation ** 执行状态 * 跟踪运行时实例及其状态 * t_ds_workflow_instance、t_ds_task_instance、t_ds_command ** 调度 * 通过 Quartz 管理基于 cron 的调度 * t_ds_schedules、QRTZ_* 表 ** 资源管理 * 数据源、文件和 UDF 元数据 * t_ds_datasource、t_ds_resources、t_ds_udfs ** 管理 * 用户、租户、项目和权限 * t_ds_user、t_ds_tenant、t_ds_project ** 告警 * 告警配置和历史记录 * t_ds_alert、t_ds_alertgroup ** 服务注册 * 基于 JDBC 的协调(ZooKeeper 的替代方案) * t_ds_jdbc_registry_* 表 *
DolphinScheduler 严格区分定义(模板)和实例(执行)。这实现了版本控制、并发执行和审计跟踪。

关键设计原则:
t_ds_workflow_definition工作流模板的主表。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** id * int * 自动递增主键 ** code * bigint * 唯一工作流标识符(跨版本稳定) ** version * int * 版本号(默认 1) ** name * varchar(255) * 工作流名称 ** project_code * bigint * 所属项目 ** release_state * tinyint * 0 = 离线,1 = 在线 ** global_params * text * JSON 格式的全局参数 ** execution_type * tinyint * 0 = 并行,1 = 串行等待,2 = 串行丢弃,3 = 串行优先级 ** timeout * int * 超时时间(分钟) ** user_id * int * 创建者用户 ID *
索引:
UNIQUE KEY workflow_unique (name, project_code)UNIQUE KEY uniq_workflow_definition_code (code)KEY idx_project_code (project_code)t_ds_workflow_definition_log存储工作流定义所有版本的审计日志。
镜像 t_ds_workflow_definition 的结构,额外列:operator、operate_time,主键:(code, version)。
t_ds_task_definition可在工作流中重用的任务模板。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** code * bigint * 唯一任务标识符 ** version * int * 版本号 ** task_type * varchar(50) * Shell、SQL、Python、Spark 等 ** task_params * longtext * JSON 格式的任务配置 ** worker_group * varchar(255) * 目标工作线程组 ** fail_retry_times * int * 失败重试次数 ** fail_retry_interval * int * 重试间隔(分钟) ** timeout * int * 任务超时时间(分钟) ** cpu_quota * int * CPU 限制(-1 = 无限制) ** memory_max * int * 内存限制(MB,-1 = 无限制) *
t_ds_workflow_task_relation通过指定任务之间的边来定义 DAG 结构。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** workflow_definition_code * bigint * 父工作流 ** workflow_definition_version * int * 工作流版本 ** pre_task_code * bigint * 前置任务(根节点为 0) ** post_task_code * bigint * 后置任务 ** condition_type * tinyint * 0 = 无,1 = 判断,2 = 延迟 ** condition_params * text * JSON 格式的条件配置 *
注意:pre_task_code = 0 表示根节点(无前驱任务)。
t_ds_workflow_instance工作流的运行时执行记录。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** id * int * 主键 ** workflow_definition_code * bigint * 引用定义 ** workflow_definition_version * int * 本次执行锁定的版本 ** state * tinyint * 0 = 提交,1 = 运行中,2 = 暂停准备,3 = 已暂停,4 = 停止准备,5 = 已停止,6 = 失败,7 = 成功,8 = 需要容错,9 = 已终止,10 = 等待,11 = 等待依赖 ** state_history * text * 状态转换日志 ** start_time * datetime * 执行开始时间 ** end_time * datetime * 执行结束时间 ** command_type * tinyint * 0 = 开始,1 = 从当前开始,2 = 恢复,3 = 恢复暂停,4 = 从失败处开始,5 = 补充,6 = 调度,7 = 重新运行,8 = 暂停,9 = 停止,10 = 恢复等待 ** host * varchar(135) * 执行此工作流的主服务器主机 ** executor_id * int * 触发执行的用户 ** tenant_code * varchar(64) * 用于资源隔离的租户 ** next_workflow_instance_id * int * 用于串行执行模式 *
索引:
KEY workflow_instance_index (workflow_definition_code, id)KEY start_time_index (start_time, end_time)t_ds_task_instance单个任务的运行时执行记录。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** id * int * 主键 ** task_code * bigint * 引用任务定义 ** task_definition_version * int * 锁定的版本 ** workflow_instance_id * int * 父工作流实例 ** state * tinyint * 与 workflow_instance 相同的状态值 ** submit_time * datetime * 提交到队列的时间 ** start_time * datetime * 实际执行开始时间 ** end_time * datetime * 执行结束时间 ** host * varchar(135) * 执行任务的工作线程主机 ** execute_path * varchar(200) * 工作线程上的工作目录 ** log_path * text * 日志文件路径 ** retry_times * int * 当前重试次数 ** var_pool * text * 供下游任务使用的变量 *
索引:KEY idx_task_instance_code_version (task_code, task_definition_version)
t_ds_command 表实现了基于队列的执行模型,其中命令触发工作流实例。
t_ds_command 结构* 列 * 类型 * 描述 ** ---- * ---- * ---- ** command_type * tinyint * 0 = 开始,1 = 从当前开始,2 = 恢复,3 = 恢复暂停,4 = 从失败处开始,5 = 补充,6 = 调度,7 = 重新运行,8 = 暂停,9 = 停止 ** workflow_definition_code * bigint * 目标工作流 ** workflow_instance_id * int * 用于恢复/重新执行操作 ** workflow_instance_priority * int * 0 = 最高,1 = 高,2 = 中,3 = 低,4 = 最低 ** command_param * text * JSON 格式的执行参数 ** worker_group * varchar(255) * 目标工作线程组 ** tenant_code * varchar(64) * 执行的租户 ** dry_run * tinyint * 0 = 正常,1 = 试运行(无实际执行) *
处理流程:
t_ds_command。MasterSchedulerThread 持续扫描该表(按优先级、id 排序)。t_ds_workflow_instance 记录。t_ds_task_instance 记录。t_ds_error_command。DolphinScheduler 使用复杂的版本控制系统,支持:

t_ds_workflow_definition 和 t_ds_task_definition 中。*_log 表中,具有 UNIQUE KEY (code, version)。release_state = 1(在线)。DolphinScheduler 集成了 Quartz 调度程序以实现基于 cron 的调度。模式包括标准 Quartz 表以及一个映射表。

t_ds_schedules* 列 * 类型 * 描述 ** ---- * ---- * ---- ** workflow_definition_code * bigint * 目标工作流(唯一) ** start_time * datetime * 调度活动开始时间 ** end_time * datetime * 调度活动结束时间 ** timezone_id * varchar(40) * cron 表达式的时区 ** crontab * varchar(255) * cron 表达式 ** release_state * int * 0 = 离线,1 = 在线 ** failure_strategy * int * 失败时的行为 ** workflow_instance_priority * int * 实例的默认优先级 *
Quartz 表要点:
QRTZ_TRIGGERS.NEXT_FIRE_TIME:已索引,便于高效扫描。QRTZ_CRON_TRIGGERS.CRON_EXPRESSION:解析后的 cron 定义。QRTZ_SCHEDULER_STATE:跟踪 Quartz 调度程序实例。t_ds_datasource存储 SQL 任务的数据库连接配置。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** name * varchar(64) * 数据源名称 ** type * tinyint * 数据库类型(MySQL、PostgreSQL、Hive 等) ** connection_params * text * JSON 格式的连接配置(主机、端口、数据库、凭据) ** user_id * int * 所有者用户 *
约束:UNIQUE KEY (name, type) - 防止数据源重复。
t_ds_resources(已弃用)注意:此表在模式中已标记为弃用。资源元数据正在迁移到单独的存储后端。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** full_name * varchar(128) * 包括租户的完整路径 ** type * int * 文件类型(文件/UDF) ** size * bigint * 文件大小(字节) ** is_directory * boolean * 目录标志 ** pid * int * 父目录 ID *

t_ds_tenant* 列 * 类型 * 描述 ** ---- * ---- * ---- ** tenant_code * varchar(64) * 唯一租户标识符(唯一) ** queue_id * int * 任务的默认 YARN 队列 ** description * varchar(255) * 租户描述 *
默认租户:系统创建一个默认租户,id = -1,tenant_code = 'default'。
t_ds_user* 列 * 类型 * 描述 ** ---- * ---- * ---- ** user_name * varchar(64) * 登录用户名(唯一) ** user_password * varchar(64) * 哈希密码 ** user_type * tinyint * 0 = 普通用户,1 = 管理员 ** tenant_id * int * 关联的租户(默认 -1) ** email * varchar(64) * 电子邮件地址 ** state * tinyint * 0 = 禁用,1 = 启用 *
t_ds_project* 列 * 类型 * 描述 ** ---- * ---- * ---- ** code * bigint * 唯一项目代码(唯一) ** name * varchar(255) * 项目名称(唯一) ** user_id * int * 创建者/所有者 ** description * varchar(255) * 项目描述 *
对于不使用 ZooKeeper 的部署,DolphinScheduler 提供基于 JDBC 的注册表用于服务协调。

t_ds_jdbc_registry_data存储类似于 ZooKeeper 节点的注册表项。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** data_key * varchar(256) * 类似路径的键(唯一) ** data_value * text * 序列化数据 ** data_type * varchar(64) * EPHEMERAL(客户端断开连接时删除)或 PERSISTENT ** client_id * bigint * 所属客户端 *
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** last_update_time * timestamp * 上次修改时间 *
t_ds_jdbc_registry_lock实现分布式锁。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** lock_key * varchar(256) * 锁标识符(唯一) ** lock_owner * varchar(256) * 持有锁的客户端(格式:ip_processId) ** client_id * bigint * 所属客户端 *
t_ds_jdbc_registry_client_heartbeat跟踪活动客户端以清理临时数据。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** id * bigint * 客户端 ID(主键) ** client_name * varchar(256) * 客户端标识符 ** last_heartbeat_time * bigint * 上次心跳时间戳 ** connection_config * text * 连接元数据 *
清理逻辑:当客户端的心跳过期时,其临时注册表数据和锁将自动删除。

t_ds_alert由工作流/任务失败或完成生成的告警记录。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** title * varchar(512) * 告警标题 ** sign * char(40) * 内容的 SHA1 哈希值(用于去重) ** content * text * 告警消息正文 ** alert_status * tinyint * 0 = 等待,1 = 成功,2 = 失败 ** warning_type * tinyint * 1 = 工作流成功,2 = 工作流/任务失败 ** workflow_instance_id * int * 源工作流实例 ** alertgroup_id * int * 目标告警组 *
索引:KEY idx_sign (sign) - 实现去重。
t_ds_alertgroup告警通道组。
* 列 * 类型 * 描述 ** ---- * ---- * ---- ** group_name * varchar(255) * 唯一组名 ** alert_instance_ids * varchar(255) * 逗号分隔的插件实例 ID ** description * varchar(255) * 组描述 *
该模式包含针对常见查询模式精心设计的索引:
- 按定义查询工作流实例: `KEY workflow_instance_index (workflow_definition_code, id)` - 按定义查询任务实例: `KEY idx_task_instance_code_version (task_code, task_definition_version)` - 用于监控的时间范围查询*: `KEY start_time_index (start_time, end_time)`基于优先级的命令扫描:`KEY priority_id_index (workflow_instance_priority, id)`- 正向和反向 DAG 遍历: `KEY idx_pre_task_code_version (pre_task_code, pre_task_version)` 正向和反向 DAG 遍历: `KEY idx_post_task_code_version (post_task_code, post_task_version)` `KEY idx_code (project_code, workflow_definition_code)`在数据库级别强制执行的关键业务规则:
* 表 * 约束 * 目的 ** ---- * ---- * ---- ** t_ds_workflow_definition * UNIQUE (name, project_code) * 项目中无重复的工作流名称 ** t_ds_workflow_definition * UNIQUE (code) * 全局工作流标识符 ** t_ds_workflow_definition_log * UNIQUE (code, version) * 每个版本一条记录 ** t_ds_datasource * UNIQUE (name, type) * 每种类型无重复的数据源名称 ** t_ds_schedules * UNIQUE (workflow_definition_code) * 每个工作流一个调度 *
DolphinScheduler 在 dolphinscheduler - dao/src/main/resources/sql/upgrade 中维护用于跨版本模式迁移的升级脚本。
t_ds_dq_*)。execution_type(并行/串行模式)。next_workflow_instance_id。tenant_code。t_ds_project_parameter 和 t_ds_project_preference。数据库访问通过 dolphinscheduler - dao 中的 DAO 层进行抽象。关键服务类:
ProcessService:工作流/任务定义和实例的 CRUD 操作。CommandService:命令队列管理。ProjectService:项目和权限管理。ResourcesService:资源元数据操作。大多数操作使用 Spring 的 @Transactional 注解实现:
系统使用 HikariCP 进行连接池,在 application.yaml 中配置: