火柴人塔防对决
118.22MB · 2026-02-04
| 实现类 | 线程安全 | 锁机制 | 并发瓶颈 | 适用场景 |
|---|---|---|---|---|
| HashMap | 无 | 多线程下数据错乱/死循环(Java 7扩容) | 单线程环境 | |
| Hashtable | 全表synchronized | 每次操作锁整个Map | 遗留系统 | |
| Collections.synchronizedMap | 外部包装锁整个Map | 同Hashtable | 简单同步需求 | |
| ConcurrentHashMap (Java 8+) | CAS + synchronized(桶) | 锁单个链表/树头节点 | 高并发核心场景 |
// 错误示范:HashMap在并发下可能死循环(Java 7)或数据丢失
Map<String, Integer> unsafeMap = new HashMap<>();
// 正确选择:高并发场景首选
ConcurrentHashMap<String, Integer> safeMap = new ConcurrentHashMap<>();
| 特性 | Java 7 (Segment分段锁) | Java 8+ (CAS + synchronized) |
|---|---|---|
| 核心结构 | Segment数组 + HashEntry数组 | Node数组 + 链表/红黑树 |
| 锁粒度 | 段级锁(默认16段) | 桶级锁(锁单个链表头/树根) |
| 扩容机制 | 单线程扩容 | 多线程协同扩容 |
| 数据结构 | 仅链表 | 链表→红黑树(优化长链查询) |
| 计数优化 | 无 | CounterCell分散计数压力 |
ConcurrentHashMap的桶(table数组元素)可存储5种核心节点类型,每种承担特定职责:
| 节点类型 | hash值 | 使用场景 | 核心作用 | 关键特性 |
|---|---|---|---|---|
| Node | ≥0 | 普通链表节点 | 存储键值对(链表结构) | volatile next保证可见性;默认结构 |
| TreeNode | ≥0 | 红黑树节点 | 树节点数据载体 | 不直接存于桶中,由TreeBin管理;继承Node |
| TreeBin | -2 | 桶头节点(树根代理) | 管理红黑树 + 读写锁 | 桶中实际存储的是TreeBin;内部含root/first等指针;写操作加锁 |
| ForwardingNode | -1 | 扩容迁移中 | 占位 + 指向新表 | nextTable指向新数组;触发helpTransfer协助扩容 |
| ReservationNode | -3 | computeIfAbsent执行中 | 临时占位防并发插入 | 仅用于computeIfAbsent;计算完成后替换为正常节点 |
桶位置 (table[i])
│
├─ 情况1: null → CAS插入Node(链表头)
│
├─ 情况2: Node(链表)
│ ├─ 长度 < 8 → 继续链表插入
│ └─ 长度 ≥ 8 且 table.length ≥ 64 → treeifyBin()
│ └─ 桶替换为 TreeBin(内部构建TreeNode红黑树)
│
├─ 情况3: TreeBin(hash=-2)
│ └─ 所有树操作通过TreeBin代理(含读写锁控制)
│
├─ 情况4: ForwardingNode(hash=-1)
│ └─ 扩容中:put/get线程调用helpTransfer()协助迁移
│
└─ 情况5: ReservationNode(hash=-3)
└─ computeIfAbsent执行中:其他线程等待或重试
TreeBin(代理节点),TreeBin内部维护TreeNode组成的红黑树ReentrantLock,写操作加锁,读操作通过waiter和lockState实现无锁读(类似读写锁)computeIfAbsent计算value期间存在,计算完成后立即替换为Node/TreeBin| 项目 | 值/规则 | 说明 |
|---|---|---|
| 初始容量 | 构造参数(默认16) | 实际初始化时调整为≥指定值的最小2次幂 |
| 负载因子 | 固定0.75 | 不可修改(与HashMap不同) |
| 扩容阈值 | sizeCtl = (n << 1) - (n >>> 1) | 即 n * 0.75(n为当前容量) |
| 扩容倍数 | 2倍 | 新容量 = 旧容量 << 1 |
| 最小树化容量 | 64 | 链表转树需同时满足:长度≥8 且 table.length≥64 |
| 最小迁移步长 | 16 | 单线程至少负责16个桶的迁移 |
| Hash Key计算 & 桶位置计算 | int h = key.hashCode(); int hash = spread(h); int i = (n - 1) & hash; // n = table.length | STEP 1: 获取原始哈希值 STEP 2: 扰动 + 符号位处理(核心!) STEP 3: 计算桶索引(位运算取模) |
static final int HASH_BITS = 0x7fffffff; // 二进制: 01111111 11111111 11111111 11111111
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
addCount更新元素总数
↓
是否超过sizeCtl阈值? → 否 → 结束
↓ 是
是否有线程正在扩容? → 是 → 尝试协助扩容(helpTransfer)
↓ 否
CAS将sizeCtl设为 -(1 + rs) 【标记扩容开始】
↓
transfer() 扩容主流程:
1. 创建新数组 nextTable(容量×2)
2. 设置transferIndex = table.length(从后向前分配任务)
3. 每个线程通过CAS领取迁移区间 [bound, i)
4. 遍历负责的桶:
- 空桶 → CAS设为ForwardingNode
- 非空桶 → synchronized锁住头节点 → 迁移数据 → 设为ForwardingNode
5. 所有桶迁移完成 → CAS替换table = nextTable → 重置sizeCtl
// 构造时指定初始容量100
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(100);
// 实际初始化容量 = 128(≥100的最小2次幂)
// 扩容阈值 = 128 * 0.75 = 96
// 当元素数 > 96 时触发扩容 → 新容量 = 256
table (Node数组)
│
├─ 桶0: null
├─ 桶1: Node → Node → Node (链表,长度<8)
├─ 桶2: TreeBin (红黑树根) → TreeNode...
├─ 桶3: ForwardingNode (hash=-1, 扩容中占位)
└─ 桶4: ReservationNode (computeIfAbsent占位)
// 懒初始化的主数组
transient volatile Node<K,V>[] table;
// 扩容时的新数组
private transient volatile Node<K,V>[] nextTable;
// 神奇的控制字段(一值多义!)
private transient volatile int sizeCtl;
// 计数优化:baseCount + CounterCell数组
private transient volatile long baseCount;
private transient volatile CounterCell[] counterCells;
>0:初始化容量 或 扩容阈值(如12 = 16*0.75)-1:正在初始化-(1 + n):n个线程正在扩容(如-3表示2个线程扩容中)final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 二次哈希:(h ^ (h>>>16)) & 0x7fffffff
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 1. 懒初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 2. 桶为空:CAS无锁插入(关键优化!)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<>(hash, key, value, null)))
break;
}
// 3. 遇到ForwardingNode:协助扩容(多线程协作精髓)
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 4. 桶非空:synchronized锁住头节点(仅锁当前桶!)
else {
V oldVal = null;
synchronized (f) { // 锁粒度:单个桶
if (tabAt(tab, i) == f) { // 双重检查防ABA
if (fh >= 0) { // 链表
// ...遍历插入/更新
binCount = 链表长度;
}
else if (f instanceof TreeBin) { // 红黑树
// ...树插入
binCount = 2;
}
}
}
// 5. 链表转树判断(需同时满足:长度≥8 且 数组长度≥64)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null) return oldVal;
break;
}
}
addCount(1L, binCount); // 计数+可能触发扩容
return null;
}
设计精髓:
开始
│
├─ 校验key/value非空 → 抛NPE
├─ spread()二次哈希 → (h ^ (h>>>16)) & 0x7fffffff
│
├─【循环】for(;;)
│ ├─ table未初始化? → initTable()(CAS设置sizeCtl=-1)
│ │
│ ├─ 计算桶索引 i = (n-1) & hash
│ │
│ ├─ 桶为空? → CAS插入新Node → break
│ │
│ ├─ 桶头hash == MOVED(-1)? → helpTransfer()协助扩容 → continue
│ │
│ └─ 桶非空:
│ └─ synchronized(桶头f) 【锁粒度:单桶】
│ ├─ 双重检查:tabAt(tab,i)==f?
│ ├─ f.hash >= 0? → 链表遍历插入/更新 → 记录binCount
│ ├─ f instanceof TreeBin? → 调用putTreeVal() → binCount=2
│ └─ f instanceof ReservationNode? → 抛UOE(compute操作中)
│
├─ 链表长度≥8 且 table.length≥64? → treeifyBin()转树
│
├─ addCount(1, binCount):
│ ├─ CAS更新baseCount
│ ├─ 失败 → CounterCell分散计数
│ └─ 检查是否需扩容
│
└─ 返回旧值/新值
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // tabAt用Unsafe.getObjectVolatile
if ((eh = e.hash) == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val; // val为volatile,保证可见性
else if (eh < 0) // 树节点或ForwardingNode
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { // 遍历链表
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
val和next均为volatile,利用happens-before原则保证写操作对后续读可见开始
│
├─ 计算hash → spread(key.hashCode())
├─ 检查table非空 & 长度>0
│
├─ 获取桶头节点 e = tabAt(tab, (n-1)&hash) 【volatile读】
│ ├─ e == null? → return null
│ │
│ ├─ e.hash == h 且 key匹配? → return e.val 【volatile读保证可见性】
│ │
│ ├─ e.hash < 0? (TreeBin/-2 或 ForwardingNode/-1)
│ │ └─ 调用 e.find(h, key)
│ │ ├─ TreeBin.find():遍历红黑树(无锁读)
│ │ └─ ForwardingNode.find():在nextTable中查找
│ │
│ └─ 遍历链表(e.next):
│ └─ 逐节点比对key → 匹配则返回val
│
└─ 未找到 → return null
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 每个线程负责的桶区间(最小16)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
if (nextTab == null) { // 首次触发扩容的线程创建新数组
try { nextTab = (Node<K,V>[])new Node<?,?>[n << 1]; }
catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return; }
nextTable = nextTab;
transferIndex = n; // 从后向前分配迁移任务
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<>(nextTab); // 转发节点
boolean advance = true;
for (int i = 0, bound = 0;;) {
while (advance) {
// CAS分配迁移区间 [bound, i)
if (--i >= bound || finishing) advance = false;
else if ((bound = transferIndex) <= 0) { finishing = true; break; }
else if (U.compareAndSwapInt(this, TRANSFERINDEX, bound,
bound = (nextBound = bound - stride) < 0 ? 0 : nextBound)) {
i = bound + stride - 1;
advance = false;
}
}
if (finishing) { // 所有桶迁移完成
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); // 新阈值 = 新容量 * 0.75
return;
}
if (i < 0 || i >= n || tabAt(tab, i) == null)
advance = casTabAt(tab, i, null, fwd); // 空桶直接标记
else {
Node<K,V> f = tabAt(tab, i);
if (f.hash == MOVED) // 已被其他线程迁移
advance = true;
else {
synchronized (f) { // 锁住当前桶迁移
if (tabAt(tab, i) == f) {
// ...将链表/树节点迁移到新表(头插法转尾插法防死循环)
setTabAt(tab, i, fwd); // 原桶设为ForwardingNode
advance = true;
}
}
}
}
}
}
扩容革命:
helpTransfer协助transfer(tab, nextTab)
│
├─ 计算步长stride = (n>>>3)/NCPU(最小16)
├─ 首次调用? → 创建nextTab(容量×2)→ nextTable = nextTab
│
├─【循环】处理每个桶
│ ├─ CAS领取迁移区间 [bound, i)
│ │ └─ transferIndex原子递减分配任务
│ │
│ ├─ i < 0? → 检查是否所有线程完成 → finishing=true
│ │
│ ├─ 桶为空? → CAS设为ForwardingNode → advance=true
│ │
│ ├─ 桶头为ForwardingNode? → advance=true(已被迁移)
│ │
│ └─ 桶有数据:
│ └─ synchronized(桶头f)
│ ├─ 双重检查
│ ├─ 链表迁移:按hash&新长度分高低位,尾插法迁移至nextTab
│ ├─ 树迁移:调用TreeBin.split()拆分树
│ └─ CAS将原桶设为ForwardingNode
│
├─ finishing=true?
│ ├─ CAS替换table = nextTab
│ ├─ nextTable = null
│ └─ sizeCtl = (n<<1) - (n>>>1) 【新阈值】
│
└─ 结束
// addCount核心逻辑简化
private final void addCount(long x, int check) {
long b, s;
// 1. 优先尝试更新baseCount(低竞争时高效)
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
// 2. 失败则使用CounterCell(类似LongAdder思想)
CounterCell a; long v; int m;
if (as != null && (m = as.length - 1) >= 0 &&
(a = as[ThreadLocalRandom.getProbe() & m]) != null) {
// CAS更新对应Cell
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x);
} else {
// 3. 首次创建CounterCell或扩容
fullAddCount(x, ...);
}
}
// 4. 检查是否需扩容...
}
sumCount() = baseCount + 所有CounterCell.value累加addCount(x, check)
│
├─ 尝试CAS更新baseCount
│ ├─ 成功 → 检查是否需扩容 → 结束
│ └─ 失败 → 进入CounterCell流程
│
├─ CounterCell数组存在?
│ ├─ 是 → 获取当前线程对应Cell(probe & (length-1))
│ │ ├─ Cell存在 → CAS更新Cell.value
│ │ └─ Cell为空 → fullAddCount初始化Cell
│ └─ 否 → fullAddCount初始化CounterCell数组
│
├─ fullAddCount核心:
│ ├─ 初始化CounterCell数组(长度2)
│ ├─ 创建新CounterCell并放入数组
│ └─ 重试更新(含扩容逻辑)
│
└─ 检查元素总数是否超过sizeCtl → 触发transfer扩容
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(16);
// 原子操作(无需外部同步)
map.putIfAbsent("count", 0); // 不存在才插入
map.compute("count", (k, v) -> v == null ? 1 : v + 1); // 原子累加
map.merge("total", 10, Integer::sum); // 合并更新
// forEach遍历(弱一致性:可能看不到最新修改,但不会ConcurrentModificationException)
map.forEach((k, v) -> System.out.println(k + "=" + v));
public class CHMDemo {
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();
int threads = 10;
CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
// 原子累加:内部使用synchronized+volatile保证安全
map.compute("counter", (k, v) -> v == null ? 1L : v + 1);
}
latch.countDown();
}).start();
}
latch.await();
System.out.println("Expected: " + (threads * 1000)); // 10000
System.out.println("Actual: " + map.get("counter")); // 稳定输出10000
}
}
运行结果:多次执行均精准输出10000,验证线程安全性
️ 对比实验:将ConcurrentHashMap替换为HashMap,结果通常<10000(数据丢失)
// 注:以下为简化演示,生产环境勿用反射操作内部结构
public class NodeTypesDemo {
public static void main(String[] args) throws Exception {
// 1. Node链表验证(插入7个同桶元素)
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
for (int i = 0; i < 7; i++) map.put("key" + i, i);
System.out.println("链表长度7: 桶头类型 = Node");
// 2. TreeBin验证(触发树化:需≥8节点 + 容量≥64)
map = new ConcurrentHashMap<>(128);
for (int i = 0; i < 10; i++) map.put("tree" + i, i);
System.out.println("树化后: 桶头类型 = TreeBin (hash=-2)");
// 3. ForwardingNode验证(触发扩容)
map = new ConcurrentHashMap<>(16);
for (int i = 0; i < 20; i++) map.put("expand" + i, i);
System.out.println("扩容中: 桶头类型 = ForwardingNode (hash=-1)");
// 4. ReservationNode验证(computeIfAbsent执行中)
Thread t1 = new Thread(() ->
map.computeIfAbsent("reserve", k -> {
try { Thread.sleep(50); } catch (Exception e) {}
return 1;
})
);
t1.start();
Thread.sleep(10); // 确保进入compute
System.out.println("compute中: 桶头类型 = ReservationNode (hash=-3)");
t1.join();
}
}
| 设计 | 解决的问题 | 工程价值 |
|---|---|---|
| TreeBin代理树 | 避免树操作锁整个桶 | 读操作无锁(通过lockState状态机),写操作细粒度锁 |
| ForwardingNode | 扩容期间保证线程安全访问 | 业务线程自动协助扩容,避免Stop-The-World |
| ReservationNode | computeIfAbsent原子性保障 | 防止ABA问题,避免重复计算 |
| CounterCell | 高并发计数竞争 | 性能提升10倍+(对比单baseCount) |
| 场景 | 建议 | 原因 |
|---|---|---|
| 高并发读多写少 | 首选 | 无锁get + 桶级锁写 |
| 需要null键/值 | 改用Collections.synchronizedMap | CHM强制非空 |
| 强一致性遍历 | ️ 谨慎 | 迭代器弱一致性(可能漏新元素/见旧元素) |
| 初始容量预估 | 构造时指定 | 避免频繁扩容(如预估10万元素:new ConcurrentHashMap<>(131072)) |
| 复合操作 | 用compute/merge | 避免"check-then-act"竞态条件 |
| 特性 | HashMap | ConcurrentHashMap |
|---|---|---|
| null键/值 | 支持 | 抛NullPointerException |
| 迭代器 | fail-fast | 弱一致性(无ConcurrentModificationException) |
| 扩容机制 | 单线程头插法(Java 7有环风险) | 多线程协同 + 尾插法(绝对安全) |
| 初始容量 | 16 | 16(但sizeCtl控制逻辑更复杂) |
ConcurrentHashMap是Java并发编程的教科书级设计:
掌握ConcurrentHashMap,不仅是掌握一个容器,更是理解高并发系统设计的核心范式。当你下次面对并发难题时,CHM的设计思想将为你点亮一盏明灯。
Q:为什么 ConcurrentHashMap 不允许 null 键/值?
A:在并发场景下,get(key) 返回 null 无法区分“key 不存在”还是“value 为 null",易引发歧义。而单线程 HashMap 可通过 containsKey 辅助判断,但 ConcurrentHashMap 的弱一致性迭代器无法保证 containsKey 与 get 原子性,故强制禁止。
Q:桶数=16 时插入 13 个元素会扩容吗?
A:会。阈值 = 16 × 0.75 = 12,第 13 个元素触发扩容(新桶数=32)。
Q:为什么树化要求桶数 ≥64?
A:小容量时哈希冲突主因是桶太少,扩容能更高效分散元素;树化有锁开销,小数组频繁树化/链化反而降低性能。
Q:为什么不用 Objects.hashCode(key) 而直接调用 key.hashCode()?
A:ConcurrentHashMap 要求 key 非 null(构造时校验),直接调用避免额外判空开销,符合“为性能极致优化”设计哲学。
Q:如果自定义类未重写 hashCode() 会怎样?
A:使用 Object.hashCode()(默认基于内存地址),同一对象多次插入位置相同,但不同对象位置随机 → 极易哈希冲突!务必重写 hashCode() 且保证与 equals() 一致。
Q:为什么扰动只移16位?不是8位或24位?
A:
32位 int:高16位与低16位异或,信息混合最充分
实测验证:16位扰动在常见数据集(URL、String)上冲突率最低
移位过少(8位):高位信息利用不足;移位过多(24位):低位信息丢失
Q:(n-1) & hash 在 n 非2幂时会怎样?
A:
索引计算错误(如 n=10, n-1=9(1001₂) → 仅低4位中第0、3位有效)
哈希分布严重不均(大量元素挤在少数桶)
ConcurrentHashMap 通过 tableSizeFor 强制保证 n 为2幂
参考资料
java.util.concurrent.ConcurrentHashMap