侦探大挑战
128.12M · 2026-04-22
graph LR
A[需要 UPDATE/DELETE?] -->|是| B[主键表]
A -->|否| C[数据可能重复?]
C -->|是| B
C -->|否| D[非主键表]
style B fill:#ffe1e1
style D fill:#e1f5ff
| 维度 | 主键表 | 非主键表 |
|---|---|---|
| 定义 | PRIMARY KEY (id) NOT ENFORCED | 不定义主键 |
| 操作 | INSERT, UPDATE, DELETE | 仅 INSERT |
| Split 策略 | 1 Bucket ≈ 1 Split | 1 Bucket = 多 Splits |
| 批处理并行度 | ≈ Bucket 数(10-100) | >> Bucket 数(100-1000+) |
| 读取方式 | Merge(慢) | Concat(快) |
| 吞吐量 | 150-250 MB/s | 400-600 MB/s |
| 适用场景 | CDC、维度表 | 日志、指标 |
-- 建表
CREATE TABLE my_pk_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED -- 定义主键
) PARTITIONED BY (dt STRING)
WITH (
'bucket' = '64', -- 并行度关键:Bucket 数要足够
'compaction.max-file-num' = '50' -- 频繁 Compaction
);
-- 批处理查询配置
SET 'execution.runtime-mode' = 'batch';
SET 'scan.infer-parallelism' = 'true';
SET 'scan.infer-parallelism.max' = '128'; -- 略大于 Bucket 数
SET 'scan.split-assign-mode' = 'fair'; -- 推荐 FAIR
SET 'split.target-size' = '128mb'; -- 默认即可
-- 预期:并行度 ≈ 64
-- 建表
CREATE TABLE my_append_table (
log_id STRING,
data STRING
-- 不定义主键
) PARTITIONED BY (dt STRING)
WITH (
'bucket' = '16', -- Bucket 可以少一些
'compaction.max-file-num' = '100'
);
-- 批处理查询配置
SET 'execution.runtime-mode' = 'batch';
SET 'scan.infer-parallelism' = 'true';
SET 'scan.infer-parallelism.max' = '500'; -- 可以很高
SET 'scan.split-assign-mode' = 'preemptive'; -- 两种都可以
SET 'split.target-size' = '64mb'; -- 可以更小,增加并行度
-- 预期:并行度 >> 16,取决于数据量
批处理并行度 ≈ Bucket 数量
流处理并行度 = Bucket 数量(固定)
示例:
批处理并行度 ≈ 总数据量 / split.target-size
流处理并行度 = Bucket 数量(Fixed Bucket)
流处理并行度 >> Bucket(Unaware Bucket)
示例:
| 参数 | 主键表推荐 | 非主键表推荐 |
|---|---|---|
bucket | 64-256 | 8-64 |
split.target-size | 128mb-256mb | 64mb-128mb |
scan.infer-parallelism.max | Bucket × 2 | 500-1000 |
scan.split-assign-mode | fair | preemptive |
compaction 频率 | 每天 | 每周 |
| 指标 | 主键表 | 非主键表 | 倍数 |
|---|---|---|---|
| 执行时间 | 8 分钟 | 3 分钟 | 2.7x |
| 吞吐量 | 200 MB/s | 550 MB/s | 2.8x |
| CPU 利用率 | 80% | 50% | 0.6x |
| 内存占用 | 16GB | 8GB | 0.5x |
结论:非主键表读取性能约为主键表的 2.5-3 倍
A:
解决方案:
A:
-- 方案 1:增加 Bucket 数(需要重建表)
ALTER TABLE my_table SET ('bucket' = '64');
-- 方案 2:等待 Compaction 完成(临时方案)
CALL sys.compact('database.my_table');
-- Compaction 后,如果都是高 Level 文件,可以切分
-- 方案 3:改为非主键表(如果不需要更新)
CREATE TABLE my_new_table AS SELECT * FROM my_table;
A:
-- 方案 1:在查询时去重
SELECT DISTINCT * FROM my_append_table WHERE dt = '2026-01-25';
-- 方案 2:在 INSERT 时去重
INSERT INTO target_table
SELECT DISTINCT * FROM source_table;
-- 方案 3:改为主键表(自动去重)
CREATE TABLE my_pk_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) AS SELECT * FROM my_append_table;
A:
主键表:
Bucket 数 = 期望的并行度
推荐:
- 小规模(< 100GB):32-64
- 中规模(100GB - 1TB):64-128
- 大规模(> 1TB):128-256
非主键表:
Bucket 数 = 期望的流式并行度(批处理不受限)
推荐:
- 小规模:8-16
- 中规模:16-32
- 大规模:32-64
原因:非主键表批处理并行度不依赖 Bucket 数
A:
-- 查看表定义
SHOW CREATE TABLE my_table;
-- 输出示例 1(主键表):
-- PRIMARY KEY (id) NOT ENFORCED
--
-- 输出示例 2(非主键表):
-- (没有 PRIMARY KEY)
-- 或者查看元数据
SELECT * FROM my_table$schemas;
-- ============ 主键表批处理模板 ============
-- 1. 建表
CREATE TABLE my_pk_table (
id BIGINT,
data STRING,
update_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) PARTITIONED BY (dt STRING)
WITH (
'bucket' = '64',
'compaction.max-file-num' = '50',
'compaction.min-file-num' = '10'
);
-- 2. 配置
SET 'execution.runtime-mode' = 'batch';
SET 'scan.infer-parallelism' = 'true';
SET 'scan.infer-parallelism.max' = '128';
SET 'scan.split-assign-mode' = 'fair';
-- 3. 查询
SELECT * FROM my_pk_table WHERE dt = '2026-01-25';
-- 4. 优化建议
-- - 增加 Bucket 数到 128(如果并行度不够)
-- - 定期执行:CALL sys.compact('db.my_pk_table');
-- ============ 非主键表批处理模板 ============
-- 1. 建表
CREATE TABLE my_append_table (
log_id STRING,
data STRING,
timestamp BIGINT
) PARTITIONED BY (dt STRING)
WITH (
'bucket' = '16',
'compaction.max-file-num' = '100'
);
-- 2. 配置
SET 'execution.runtime-mode' = 'batch';
SET 'scan.infer-parallelism' = 'true';
SET 'scan.infer-parallelism.max' = '500';
SET 'split.target-size' = '64mb';
SET 'scan.split-assign-mode' = 'preemptive';
-- 3. 查询
SELECT * FROM my_append_table WHERE dt = '2026-01-25';
-- 4. 优化建议
-- - 减小 split.target-size 到 32mb(如果需要更高并行度)
-- - 定期合并小文件:CALL sys.compact('db.my_append_table');
-- ============ 主键表流处理模板 ============
-- 1. 建表
CREATE TABLE my_pk_stream_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'bucket' = '128', -- 流处理并行度 = Bucket 数
'changelog-producer' = 'input',
'scan.mode' = 'latest'
);
-- 2. 配置
SET 'execution.runtime-mode' = 'streaming';
SET 'scan.parallelism' = '128';
SET 'scan.snapshot-time-interval' = '10s';
SET 'execution.checkpointing.interval' = '60s';
SET 'state.backend' = 'rocksdb';
-- 3. 查询
SELECT
COUNT(DISTINCT id) as user_cnt
FROM my_pk_stream_table;
-- ============ 非主键表流处理模板 ============
-- 1. 建表
CREATE TABLE my_append_stream_table (
log_id STRING,
data STRING
) WITH (
'bucket' = '64', -- 可以比主键表少
'scan.mode' = 'latest'
);
-- 2. 配置
SET 'execution.runtime-mode' = 'streaming';
SET 'scan.parallelism' = '64';
SET 'scan.snapshot-time-interval' = '10s';
SET 'execution.checkpointing.interval' = '60s';
-- 3. 查询
SELECT
COUNT(*) as log_cnt
FROM my_append_stream_table;
场景:1 个分区,10 个 Buckets,每个 1GB
主键表(有 Level 0):
├─ Split 数量:10 个(= Bucket 数)
├─ 并行度:10
└─ 原因:Key Range 重叠,不能切分
非主键表:
├─ Split 数量:80 个(10GB / 128MB)
├─ 并行度:80
└─ 原因:可以自由切分
读取 100GB 数据:
主键表(需要 Merge):
[████████░░] 8 分钟
非主键表(顺序拼接):
[███░░░░░░░] 3 分钟
性能提升:2.7x
-- 错误配置
CREATE TABLE my_table (
id BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'bucket' = '4' -- 太少!批处理并行度只有 4
);
-- 正确配置
CREATE TABLE my_table (
id BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'bucket' = '64' -- 并行度可以达到 64
);
-- 低效配置
CREATE TABLE my_append_table (
log_id STRING
) WITH (
'bucket' = '256' -- 太多!会产生大量小文件
);
-- 推荐配置
CREATE TABLE my_append_table (
log_id STRING
) WITH (
'bucket' = '16' -- 适中,批处理并行度通过 Split 切分实现
);
-- 错误配置
SET 'split.target-size' = '32mb'; -- 对主键表无效(无法切分)
-- 正确做法
-- 主键表:增加 Bucket 数量而不是减小 Split
ALTER TABLE my_pk_table SET ('bucket' = '128');
1. 提高并行度
└─ 增加 Bucket 数量(核心手段)
2. 减少 Merge 开销
└─ 定期 Compaction
└─ 减少 Level 0 文件
3. 批处理优化
└─ 选择 Compaction 完成后的快照
└─ 使用 FAIR 分配模式
4. 流处理优化
└─ Bucket 数 = 期望并行度
1. 提高并行度
└─ 减小 split.target-size(核心手段)
└─ 提高 scan.infer-parallelism.max
2. 减少小文件
└─ 定期 Compaction
└─ 适当减少 Bucket 数
3. 批处理优化
└─ 充分利用高并行度
└─ 使用 PREEMPTIVE 分配模式
4. 流处理优化
└─ 考虑使用 Unaware Bucket(如果允许)
问自己 3 个问题:
1. 是否需要 UPDATE 或 DELETE?
└─ 是 → 主键表
└─ 否 → 继续
2. 数据是否可能重复(需要去重)?
└─ 是 → 主键表
└─ 否 → 继续
3. 读取性能是否非常关键?
└─ 是 → 非主键表
└─ 否 → 都可以,推荐非主键表(性能更好)
检查清单:
主键表:
Bucket 数 >= 32?
scan.split-assign-mode = 'fair'?
定期执行 Compaction?
scan.infer-parallelism.max >= Bucket × 2?
非主键表:
split.target-size <= 128mb?
scan.infer-parallelism.max >= 200?
Bucket 数不要太多(<= 64)?
定期合并小文件?
如果你喜欢这篇文章,请转发、点赞。扫描下方二维码关注我们,您会收到更多优质文章推送 在这里插入图片描述
关注「Java源码进阶」,获取海量java,大数据,机器学习资料!