先发制人2026
72.36M · 2026-03-22
最近Apache Geaflow需要实现异步资源调度,经过和社区成员的沟通后,发现可以沿着doris分片调度的思路去走,故有此系列
graph TB
A[已分析的AST] --> B[Planner]
B --> C[PlanNode树生成]
C --> D[单节点逻辑计划]
D --> E[分布式计划转换]
E --> F[PlanFragment列表]
F --> G[Coordinator协调器]
G --> H[BE执行]
位置:fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
public abstract class Planner {
private static final Logger LOG = LogManager.getLogger(Planner.class);
// 查询计划被分解为多个 Fragment(片段)
protected ArrayList<PlanFragment> fragments = Lists.newArrayList();
// 查询配置选项
protected TQueryOptions queryOptions;
// 子类必须实现的核心方法
public abstract List<ScanNode> getScanNodes();
public abstract void plan(StatementBase queryStmt,
TQueryOptions queryOptions) throws UserException;
public abstract DescriptorTable getDescTable();
public abstract List<RuntimeFilter> getRuntimeFilters();
public abstract Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt);
public List<PlanFragment> getFragments() {
return fragments;
}
}
设计要点:
plan() 方法:核心规划逻辑getScanNodes() 方法:获取所有扫描节点getRuntimeFilters() 方法:获取运行时过滤器handleQueryInFe() 方法:FE 端直接处理的查询graph TB
A[Planner 抽象基类] --> B[NereidsPlanner]
A --> C[GroupCommitPlanner]
B --> D[PrepareCommandPlanner]
B --> E[FastInsertIntoValuesPlanner]
主要实现类:
NereidsPlanner:新一代优化器的 Planner
fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.javaGroupCommitPlanner:Group Commit 场景的 Planner
fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java位置:fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
public abstract class PlanNode {
protected PlanNodeId id; // 节点唯一标识
protected PlanFragmentId fragmentId; // 所属 Fragment ID
protected long limit; // LIMIT 限制
protected List<TupleId> tupleIds; // 输出的 Tuple ID 列表
protected List<TupleId> nullableTupleIds; // 可为 NULL 的 Tuple ID
protected List<Expr> conjuncts; // 谓词条件(WHERE 子句)
protected List<RuntimeFilter> runtimeFilters; // 运行时过滤器
protected ArrayList<PlanNode> children; // 子节点
// 统计信息
protected long cardinality; // 输出行数估算
protected double avgRowSize; // 平均行大小
// 核心方法
public abstract void init(Analyzer analyzer) throws UserException;
public abstract void computeStats(Analyzer analyzer) throws UserException;
protected abstract void toThrift(TPlanNode msg);
public void treeToThrift(TPlanNode container);
}
设计要点:
graph TB
A[PlanNode] --> B[ScanNode]
A --> C[JoinNodeBase]
A --> D[AggregationNode]
A --> E[SortNode]
A --> F[UnionNode]
A --> G[ExchangeNode]
A --> H[SelectNode]
B --> B1[OlapScanNode]
B --> B2[SchemaScanNode]
B --> B3[FileScanNode]
B --> B4[JdbcScanNode]
C --> C1[HashJoinNode]
C --> C2[NestLoopJoinNode]
C --> C3[CrossJoinNode]
位置:fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
public class OlapScanNode extends ScanNode {
private OlapTable olapTable;
// 分区裁剪结果
private Collection<Long> selectedPartitionIds;
// Tablet 分布信息
private Map<Long, Integer> bucketSeq2locations;
private List<Long> scanTabletIds;
// 预聚合标识
private boolean isPreAggregation = true;
private String reasonOfPreAggregation = "";
// 索引选择
private long selectedIndexId;
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
// 1. 分区裁剪
selectedPartitionIds = partitionPrune(analyzer);
// 2. 选择最优索引(Rollup 或物化视图)
selectedIndexId = selectBestRollupByRollupSelector(analyzer);
// 3. Tablet 裁剪和分配
computeTabletInfo();
// 4. 判断是否可以预聚合
canPreAggregation();
}
private void computeTabletInfo() {
// 根据分区选择 Tablet
for (Long partitionId : selectedPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
MaterializedIndex index = partition.getIndex(selectedIndexId);
for (Tablet tablet : index.getTablets()) {
scanTabletIds.add(tablet.getId());
// 获取 Tablet 的副本位置
List<Replica> replicas = tablet.getReplicas();
for (Replica replica : replicas) {
Backend backend = Catalog.getCurrentSystemInfo()
.getBackend(replica.getBackendId());
if (backend.isAlive()) {
// 记录 Backend 位置信息
}
}
}
}
}
}
关键功能:
分区裁剪(Partition Pruning):
索引选择(Rollup Selection):
Tablet 分配:
预聚合优化:
位置:fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
public class HashJoinNode extends JoinNodeBase {
// 等值连接条件:a = b 或 a <=> b
private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
// 非等值连接条件
private List<Expr> otherJoinConjuncts;
// 分布式执行模式
private DistributionMode distrMode;
// Colocate Join 标识
private boolean isColocate = false;
private String colocateReason = "";
public enum DistributionMode {
NONE, // 本地 Join(数据已在同一节点)
BROADCAST, // 广播小表
PARTITIONED, // 双边 Shuffle
BUCKET_SHUFFLE, // 桶 Shuffle
COLOCATE // Colocate Join(数据已按相同规则分布)
}
public HashJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner,
TableRefInfo innerRef, List<Expr> eqJoinConjuncts,
List<Expr> otherJoinConjuncts) {
super(id, "HASH JOIN", StatisticalType.HASH_JOIN_NODE,
outer, inner, innerRef);
// 处理等值连接条件
for (Expr eqJoinPredicate : eqJoinConjuncts) {
BinaryPredicate eqJoin = (BinaryPredicate) eqJoinPredicate;
// EQ_FOR_NULL (<=> 操作符) 可能转换为 EQ
if (eqJoin.getOp().equals(BinaryPredicate.Operator.EQ_FOR_NULL)) {
if (!eqJoin.getChild(0).isNullable() ||
!eqJoin.getChild(1).isNullable()) {
eqJoin.setOp(BinaryPredicate.Operator.EQ);
}
}
this.eqJoinConjuncts.add(eqJoin);
}
this.otherJoinConjuncts = otherJoinConjuncts;
this.distrMode = DistributionMode.NONE;
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.HASH_JOIN_NODE;
msg.hash_join_node = new THashJoinNode();
msg.hash_join_node.join_op = joinOp.toThrift();
// 标识是否为广播 Join
msg.hash_join_node.setIsBroadcastJoin(
distrMode == DistributionMode.BROADCAST);
// 序列化等值连接条件
for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) {
TEqJoinCondition eqJoinCondition = new TEqJoinCondition(
eqJoinPredicate.getChild(0).treeToThrift(),
eqJoinPredicate.getChild(1).treeToThrift()
);
eqJoinCondition.setOpcode(eqJoinPredicate.getOp().getOpcode());
msg.hash_join_node.addToEqJoinConjuncts(eqJoinCondition);
}
// 序列化非等值连接条件
for (Expr e : otherJoinConjuncts) {
msg.hash_join_node.addToOtherJoinConjuncts(e.treeToThrift());
}
}
}
Join 分布式策略选择:
BROADCAST(广播):
PARTITIONED(分区/Shuffle):
COLOCATE(数据本地化):
BUCKET_SHUFFLE(桶 Shuffle):
位置:fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
public class AggregationNode extends PlanNode {
private final AggregateInfo aggInfo;
// 是否需要 Finalize 阶段(分布式聚合的最后一步)
private boolean needsFinalize;
// 是否使用流式预聚合
private boolean useStreamingPreagg;
// Colocate 聚合标识
private boolean isColocate = false;
// 按 Group Key 排序的信息
private SortInfo sortByGroupKey;
public AggregationNode(PlanNodeId id, PlanNode input,
AggregateInfo aggInfo) {
super(id, aggInfo.getOutputTupleId().asList(),
"AGGREGATE", StatisticalType.AGG_NODE);
this.aggInfo = aggInfo;
this.children.add(input);
this.needsFinalize = true;
updateplanNodeName();
}
private void updateplanNodeName() {
StringBuilder sb = new StringBuilder();
sb.append("VAGGREGATE");
sb.append(" (");
if (aggInfo.isMerge()) {
sb.append("merge"); // 合并中间结果
} else {
sb.append("update"); // 更新聚合状态
}
if (needsFinalize) {
sb.append(" finalize"); // 输出最终结果
} else {
sb.append(" serialize"); // 序列化中间状态
}
sb.append(")");
setPlanNodeName(sb.toString());
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.AGGREGATION_NODE;
// 序列化聚合函数
List<TExpr> aggregateFunctions = Lists.newArrayList();
for (FunctionCallExpr e : aggInfo.getMaterializedAggregateExprs()) {
aggregateFunctions.add(e.treeToThrift());
}
msg.agg_node = new TAggregationNode(
aggregateFunctions,
aggInfo.getOutputTupleId().asInt(),
aggInfo.getOutputTupleId().asInt(),
needsFinalize
);
msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
msg.agg_node.setIsColocate(isColocate);
// 序列化分组表达式
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
}
}
}
聚合分阶段执行:
单阶段聚合(Single-Phase Aggregation):
SELECT COUNT(*) FROM table两阶段聚合(Two-Phase Aggregation):
三阶段聚合(Three-Phase Aggregation):
流式预聚合(Streaming Preagg):
问题:一个复杂查询如何在分布式环境中执行?
解决方案:将查询计划分解为多个 PlanFragment(片段),每个 Fragment 可以在不同节点并行执行。
示例查询:
SELECT t1.id, SUM(t2.amount)
FROM table1 t1
JOIN table2 t2 ON t1.id = t2.id
GROUP BY t1.id;
Fragment 分解:
Fragment 2 (Final Aggregation & Output)
└── VAGGREGATE (merge finalize)
└── EXCHANGE (接收 Fragment 1 的数据)
Fragment 1 (Join & Pre-Aggregation)
└── VAGGREGATE (update serialize)
└── HASH JOIN
├── OLAP_SCAN (table1) - 左表
└── EXCHANGE (接收 Fragment 0 的数据) - 右表
Fragment 0 (Right Table Scan)
└── OLAP_SCAN (table2)
位置:fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
public class PlanFragment {
private final PlanFragmentId fragmentId;
private PlanNode planRoot; // Fragment 的根节点
private DataPartition dataPartition; // 数据分区方式
private DataPartition outputPartition; // 输出分区方式
// Fragment 的数据接收者
private DataSink sink;
// Fragment 的并行度(实例数)
private int numInstances;
// Runtime Filter 信息
private List<RuntimeFilter> targetRuntimeFilters;
private List<RuntimeFilter> builderRuntimeFilters;
public PlanFragment(PlanFragmentId id, PlanNode root,
DataPartition partition) {
this.fragmentId = id;
this.planRoot = root;
this.dataPartition = partition;
setFragmentInPlanTree(planRoot);
}
// 递归设置所有 PlanNode 的 Fragment ID
private void setFragmentInPlanTree(PlanNode node) {
if (node == null) return;
node.setFragmentId(this.fragmentId);
for (PlanNode child : node.getChildren()) {
if (child instanceof ExchangeNode) continue;
setFragmentInPlanTree(child);
}
}
}
Fragment 关键属性:
planRoot:Fragment 的执行计划树根节点
dataPartition:数据分区方式(决定如何并行执行)
UNPARTITIONED:单节点执行RANDOM:随机分区(Round-Robin)HASH_PARTITIONED:按 Hash 分区RANGE_PARTITIONED:按范围分区sink:数据输出目标
DataStreamSink:发送给其他 Fragment(ExchangeNode)ResultSink:返回给客户端OlapTableSink:写入 OLAP 表(INSERT)numInstances:Fragment 的并行实例数
作用:连接不同 Fragment,接收其他 Fragment 的输出数据。
public class ExchangeNode extends PlanNode {
private PlanFragmentId inputFragmentId; // 数据来源 Fragment
// Exchange 类型
private enum ExchangeType {
MERGING, // 归并排序(保持有序)
HASH, // Hash 分区
BROADCAST, // 广播
PASSTHROUGH // 透传(不重新分区)
}
}
Fragment 连接示例:
Fragment 1: [OLAP_SCAN] --DataStreamSink-->
Fragment 2: [ExchangeNode] --> [HASH_JOIN]
graph TB
A[单节点逻辑计划] --> B[createPlanFragments]
B --> C[识别Fragment边界]
C --> D[为每个Fragment分配并行度]
D --> E[计算Fragment执行位置]
E --> F[生成FragmentExecParams]
F --> G[分配ScanRange到BE]
G --> H[PlanFragment列表]
位置:fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
public class Coordinator {
private List<PlanFragment> fragments;
private List<ScanNode> scanNodes;
// Fragment 实例信息
private Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap;
// Backend 执行实例
private Map<TNetworkAddress, BackendExecState> backendExecStates;
/**
* 计算 Fragment 的执行位置和并行度
*/
private void computeFragmentExecParams() throws Exception {
for (PlanFragment fragment : fragments) {
FragmentExecParams params = new FragmentExecParams(fragment);
// 1. 识别 Fragment 类型
if (containsScanNode(fragment)) {
// 包含 ScanNode 的 Fragment
computeScanRangeAssignment(fragment, params);
} else {
// 不包含 ScanNode 的 Fragment(如聚合、Join 右表)
computeInstancesForNonScanFragment(fragment, params);
}
fragmentExecParamsMap.put(fragment.getFragmentId(), params);
}
}
/**
* 为包含 ScanNode 的 Fragment 分配 Tablet
*/
private void computeScanRangeAssignment(
PlanFragment fragment, FragmentExecParams params) {
for (ScanNode scanNode : fragment.getScanNodes()) {
if (scanNode instanceof OlapScanNode) {
OlapScanNode olapScan = (OlapScanNode) scanNode;
// 获取需要扫描的所有 Tablet
List<Long> tabletIds = olapScan.getScanTabletIds();
for (Long tabletId : tabletIds) {
// 获取 Tablet 的副本位置
List<Replica> replicas = getTabletReplicas(tabletId);
// 选择最优副本(考虑负载均衡)
Replica selectedReplica = selectBestReplica(replicas);
Backend backend = getBackend(
selectedReplica.getBackendId());
// 分配 ScanRange 到 Backend
params.addScanRange(backend, tabletId, scanNode);
}
}
}
}
/**
* 为不包含 ScanNode 的 Fragment 计算实例数
*/
private void computeInstancesForNonScanFragment(
PlanFragment fragment, FragmentExecParams params) {
// 根据输入 Fragment 的并行度决定
PlanNode planRoot = fragment.getPlanRoot();
if (planRoot instanceof ExchangeNode) {
ExchangeNode exchangeNode = (ExchangeNode) planRoot;
PlanFragmentId inputFragmentId =
exchangeNode.getInputFragmentId();
FragmentExecParams inputParams =
fragmentExecParamsMap.get(inputFragmentId);
if (fragment.getDataPartition().isPartitioned()) {
// 需要 Shuffle:每个接收节点创建实例
params.setInstanceNum(inputParams.getInstanceNum());
} else {
// 不需要 Shuffle:单实例汇总
params.setInstanceNum(1);
}
}
}
}
public class FragmentExecParams {
public PlanFragment fragment;
// Fragment 的所有执行实例
public List<FInstanceExecParam> instanceExecParams;
/**
* 单个 Fragment 实例的执行参数
*/
public static class FInstanceExecParam {
public TUniqueId instanceId; // 实例 ID
public TNetworkAddress host; // 执行节点
// ScanNode 的扫描范围
public Map<Integer, List<TScanRangeParams>> perNodeScanRanges;
// 接收数据的节点(用于 DataStreamSink)
public List<TNetworkAddress> destinations;
}
}
执行实例分配示例:
假设有 3 个 BE 节点(BE1、BE2、BE3),扫描的表有 6 个 Tablet:
Fragment 0 (OLAP_SCAN)
├── Instance 0 @ BE1: Scan Tablet[0, 3]
├── Instance 1 @ BE2: Scan Tablet[1, 4]
└── Instance 2 @ BE3: Scan Tablet[2, 5]
Fragment 1 (AGGREGATE)
├── Instance 0 @ BE1: Receive from Fragment 0
├── Instance 1 @ BE2: Receive from Fragment 0
└── Instance 2 @ BE3: Receive from Fragment 0
问题:Join 操作中,如果能提前知道右表的过滤条件,可以在扫描左表时就过滤掉不匹配的数据。
解决方案:在右表构建 Hash Table 时生成 Bloom Filter 或 MinMax Filter,下推到左表扫描。
示例:
SELECT * FROM large_table l
JOIN small_table s ON l.id = s.id
WHERE s.category = 'electronics';
执行流程:
Build 阶段:扫描 small_table(右表)
category = 'electronics's.id 构建 Bloom FilterProbe 阶段:扫描 large_table(左表)
l.id IN (Bloom Filter) 的数据Join 执行:
位置:fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
public class RuntimeFilter {
private RuntimeFilterId id;
// Filter 类型
public enum RuntimeFilterType {
IN_FILTER, // IN 列表过滤
BLOOM_FILTER, // Bloom Filter
MIN_MAX_FILTER, // MinMax 范围过滤
IN_OR_BLOOM_FILTER // IN + Bloom 组合
}
// Build 端(生成 Filter 的节点)
private PlanNodeId builderNodeId;
// Target 端(应用 Filter 的节点列表)
private List<PlanNodeId> targetNodeIds;
// Join 条件表达式
private Expr srcExpr; // Build 端表达式(如 s.id)
private List<Expr> targetExprs; // Target 端表达式(如 l.id)
// Filter 大小和选择性
private long filterSizeBytes;
private double selectivity;
}
Runtime Filter 生成时机:
在 HashJoinNode 初始化时,Planner 会分析:
如果满足条件,生成 Runtime Filter 并:
builderRuntimeFiltertargetRuntimeFilter下推到存储层:
Runtime Filter 可以下推到 BE 的存储引擎:
示例:
OLAP_SCAN (large_table)
├── conjuncts: [l.category = 'active']
└── runtime_filters: [l.id IN BloomFilter(s.id)]
↓ (下推到 BE)
Segment 读取时应用 Bloom Filter
位置:fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
public class NereidsPlanner extends Planner {
protected Plan parsedPlan; // 解析后的逻辑计划
protected Plan analyzedPlan; // 分析后的逻辑计划
protected Plan rewrittenPlan; // 重写后的逻辑计划
protected Plan optimizedPlan; // 优化后的逻辑计划
protected PhysicalPlan physicalPlan; // 物理计划
private CascadesContext cascadesContext; // Cascades 优化框架
@Override
public void plan(StatementBase queryStmt, TQueryOptions queryOptions)
throws UserException {
LogicalPlan parsedPlan = logicalPlanAdapter.getLogicalPlan();
// 1. 预处理(Hint 处理等)
plan = preprocess(plan);
// 2. 分析(类型推导、名称解析)
analyze(showAnalyzeProcess);
// 3. 重写(规则优化)
rewrite(showRewriteProcess);
// 4. CBO 优化(代价优化)
optimize(showPlanProcess);
// 5. 选择最优物理计划
PhysicalPlan physicalPlan = chooseNthPlan(
getRoot(), requireProperties, nth);
// 6. 后处理(Runtime Filter 生成等)
physicalPlan = postProcess(physicalPlan);
// 7. 分布式计划生成
distribute(physicalPlan, explainLevel);
}
private void optimize(boolean showPlanProcess) {
// Cascades 优化器
Optimizer optimizer = new Optimizer(cascadesContext);
optimizer.optimize();
}
}
| 对比维度 | 传统 Planner | NereidsPlanner |
|---|---|---|
| 优化框架 | 基于规则的优化(RBO) | Cascades 框架(CBO) |
| 代价模型 | 简单启发式 | 完善的代价模型 + 统计信息 |
| 逻辑计划表示 | AST(StatementBase) | Plan 树(LogicalPlan) |
| 物理计划生成 | 直接转换 | 通过 Memo 搜索最优计划 |
| Join 顺序优化 | 左深树 + 简单调整 | 动态规划 + Bushy Tree |
| 表达式优化 | 有限的常量折叠 | 完整的表达式重写框架 |
| 子查询处理 | 部分 Unnesting | 完整的 Decorrelation |
| 优化规则 | 约 50 个硬编码规则 | 300+ 模式匹配规则 |
| 扩展性 | 新增规则需修改多处代码 | 声明式规则,易于扩展 |
graph TB
A[LogicalPlan] --> B[Analyze分析]
B --> C[Rewrite重写]
C --> D[Optimize优化]
D --> E[PhysicalPlan]
B --> B1[类型推导]
B --> B2[名称解析]
B --> B3[子查询Unnesting]
C --> C1[谓词下推]
C --> C2[列裁剪]
C --> C3[常量折叠]
C --> C4[公共表达式消除]
D --> D1[Join顺序优化]
D --> D2[聚合下推]
D --> D3[物理算子选择]
D --> D4[Runtime Filter生成]
Cascades 框架核心概念:
Memo:
Search Space:
Cost-Based Selection:
示例:Join 顺序优化
传统 Planner:
SELECT * FROM A, B, C WHERE A.id = B.id AND B.id = C.id;
-- 固定生成: (A JOIN B) JOIN C
NereidsPlanner:
-- 搜索空间包含:
-- Option 1: (A JOIN B) JOIN C
-- Option 2: (A JOIN C) JOIN B
-- Option 3: (B JOIN C) JOIN A
-- 根据统计信息和代价模型选择最优
SELECT
o.order_date,
c.customer_name,
SUM(o.amount) as total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.order_date >= '2024-01-01'
AND c.region = 'ASIA'
GROUP BY o.order_date, c.customer_name
HAVING SUM(o.amount) > 1000
ORDER BY total_amount DESC
LIMIT 100;
┌─ RESULT SINK
│
└─ TOP-N [LIMIT 100, ORDER BY total_amount DESC]
│
└─ VAGGREGATE (merge finalize)
│ output: SUM(sum_amount)
│ group by: order_date, customer_name
│ having: SUM(sum_amount) > 1000
│
└─ EXCHANGE [HASH_PARTITIONED: order_date, customer_name]
│
└─ VAGGREGATE (update serialize)
│ output: SUM(amount) as sum_amount
│ group by: order_date, customer_name
│
└─ HASH JOIN [INNER JOIN]
│ join condition: o.customer_id = c.id
│ distribution: BROADCAST
│
├─ OLAP_SCAN (orders)
│ predicates: order_date >= '2024-01-01'
│ runtime_filters: customer_id IN BloomFilter
│
└─ EXCHANGE [BROADCAST]
│
└─ OLAP_SCAN (customers)
predicates: region = 'ASIA'
Fragment 0:扫描 customers 表(右表)
PlanRoot: OLAP_SCAN (customers)
predicates: region = 'ASIA'
DataSink: DataStreamSink (BROADCAST to Fragment 2)
Instances:
- Instance 0 @ BE1: Scan Tablet[0,3,6]
- Instance 1 @ BE2: Scan Tablet[1,4,7]
- Instance 2 @ BE3: Scan Tablet[2,5,8]
Fragment 1:Result 输出
PlanRoot: TOP-N [LIMIT 100]
├─ VAGGREGATE (merge finalize)
└─ EXCHANGE (from Fragment 2)
DataSink: ResultSink (返回给客户端)
Instances:
- Instance 0 @ FE Coordinator (单实例汇总)
Fragment 2:扫描 orders 表 + Join + 预聚合
PlanRoot: VAGGREGATE (update serialize)
└─ HASH JOIN
├─ OLAP_SCAN (orders)
│ predicates: order_date >= '2024-01-01'
│ runtime_filters: customer_id IN BloomFilter
└─ EXCHANGE (from Fragment 0, BROADCAST)
DataSink: DataStreamSink (HASH_PARTITIONED to Fragment 1)
Instances:
- Instance 0 @ BE1: Scan orders Tablet[0,3], Join, Agg
- Instance 1 @ BE2: Scan orders Tablet[1,4], Join, Agg
- Instance 2 @ BE3: Scan orders Tablet[2,5], Join, Agg
sequenceDiagram
participant FE as FE Coordinator
participant BE1 as BE1
participant BE2 as BE2
participant BE3 as BE3
FE->>BE1: Execute Fragment 0 Instance 0
FE->>BE2: Execute Fragment 0 Instance 1
FE->>BE3: Execute Fragment 0 Instance 2
Note over BE1,BE3: 扫描 customers 表
BE1-->>BE1: Scan customers, filter region='ASIA'
BE2-->>BE2: Scan customers, filter region='ASIA'
BE3-->>BE3: Scan customers, filter region='ASIA'
BE1->>BE1: Build Bloom Filter on customer_id
BE2->>BE2: Build Bloom Filter on customer_id
BE3->>BE3: Build Bloom Filter on customer_id
BE1->>FE: Publish Runtime Filter
FE->>BE1: Execute Fragment 2 Instance 0 (with RF)
FE->>BE2: Execute Fragment 2 Instance 1 (with RF)
FE->>BE3: Execute Fragment 2 Instance 2 (with RF)
Note over BE1,BE3: 扫描 orders 表 (应用 Runtime Filter)
BE1-->>BE1: Scan orders + Apply Bloom Filter
BE2-->>BE2: Scan orders + Apply Bloom Filter
BE3-->>BE3: Scan orders + Apply Bloom Filter
Note over BE1,BE3: Broadcast Join (接收 customers 数据)
BE1->>BE1: Receive customers from all BEs
BE2->>BE2: Receive customers from all BEs
BE3->>BE3: Receive customers from all BEs
BE1-->>BE1: Hash Join + Pre-Agg
BE2-->>BE2: Hash Join + Pre-Agg
BE3-->>BE3: Hash Join + Pre-Agg
FE->>FE: Execute Fragment 1 Instance 0
BE1->>FE: Send aggregated data (Hash Partitioned)
BE2->>FE: Send aggregated data (Hash Partitioned)
BE3->>FE: Send aggregated data (Hash Partitioned)
FE-->>FE: Merge Aggregation + TOP-N + LIMIT
FE-->>Client: Return Result
Runtime Filter:
Broadcast Join:
两阶段聚合:
谓词下推:
order_date >= '2024-01-01' 下推到存储层region = 'ASIA' 下推到存储层列裁剪:
Planner 架构:
PlanNode 树:
PlanFragment:
分布式策略:
NereidsPlanner:
选择合适的 Join 分布式策略:
-- 大表 JOIN 小表:使用 Broadcast
SELECT /*+ SET_VAR(broadcast_row_limit=10000000) */ *
FROM large_table l
JOIN small_table s ON l.id = s.id;
-- 大表 JOIN 大表:使用 Shuffle
SELECT * FROM table1 t1
JOIN table2 t2 ON t1.key = t2.key;
-- Colocate Join:建表时设置
CREATE TABLE table1 (...)
DISTRIBUTED BY HASH(key) BUCKETS 32
PROPERTIES("colocate_with" = "group1");
CREATE TABLE table2 (...)
DISTRIBUTED BY HASH(key) BUCKETS 32
PROPERTIES("colocate_with" = "group1");
利用流式预聚合:
-- 适合流式预聚合(GROUP BY 基数适中)
SELECT region, COUNT(*)
FROM orders
GROUP BY region; -- region 只有 10 个值
-- 不适合流式预聚合(GROUP BY 基数极高)
SELECT user_id, COUNT(*)
FROM events
GROUP BY user_id; -- user_id 有 1 亿个值
手动设置:
SET enable_streaming_preaggregation = false; -- 禁用流式预聚合
-- 增加 Runtime Filter 等待时间(适用于大表 Join)
SET runtime_filter_wait_time_ms = 10000;
-- 调整 Bloom Filter 大小
SET runtime_bloom_filter_size = 16777216; -- 16MB
-- 设置 Runtime Filter 类型
SET runtime_filter_type = "BLOOM_FILTER,MIN_MAX";
-- 创建分区表
CREATE TABLE orders (
order_id BIGINT,
order_date DATE,
amount DECIMAL(10,2)
)
PARTITION BY RANGE(order_date) (
PARTITION p202401 VALUES LESS THAN ("2024-02-01"),
PARTITION p202402 VALUES LESS THAN ("2024-03-01"),
...
);
-- 查询时利用分区裁剪
SELECT * FROM orders
WHERE order_date >= '2024-02-01'
AND order_date < '2024-03-01';
-- 只扫描 p202402 分区