课程概述

Day 2 深入探索 Java 内存模型 (JMM),理解多线程环境下数据不一致的根源,掌握 volatile 关键字的原理与应用,并通过实战代码理解可见性、原子性和有序性问题。

学习目标

  • 深入理解 Java 内存模型的抽象结构
  • 掌握 JMM 三大特性:原子性、可见性、有序性
  • 熟练掌握 volatile 关键字的原理和使用场景
  • 理解 happens-before 原则和内存屏障机制
  • 掌握指令重排序问题和解决方案
  • 完成可见性问题的实战练习

理论基础

1. Java 内存模型 (JMM) 概述

1.1 JMM 的抽象概念

Java 内存模型 (Java Memory Model, JMM) 是一个抽象的概念,它描述了 Java 程序中各种变量(线程共享变量)的访问规则,以及在 JVM 中将变量存储到内存和从内存中读取变量这样的底层细节。

image.png

1.2 JMM 的三大特性

特性定义问题场景解决方案
原子性一个或多个操作,要么全部执行且执行的过程不会被任何因素打断,要么就都不执行i++ 操作非原子性synchronizedAtomic
可见性当一个线程修改了共享变量的值,其他线程能够立即得知这个修改线程缓存导致变量不可见volatilesynchronized
有序性程序执行的顺序按照代码的先后顺序执行指令重排序导致执行顺序改变volatilesynchronized

1.3 内存间的交互操作

JMM 定义了 8 种原子操作来完成主内存和工作内存的交互:

操作作用解释
lock (锁定)作用于主内存把一个变量标识为一条线程独占状态
unlock (解锁)作用于主内存把一个处于锁定状态的变量释放出来
read (读取)作用于主内存把一个变量的值从主内存传输到线程的工作内存
load (载入)作用于工作内存把 read 操作从主内存中得到的变量值放入工作内存的变量副本中
use (使用)作用于工作内存把工作内存中一个变量的值传递给执行引擎
assign (赋值)作用于工作内存把一个从执行引擎接收到的值赋给工作内存的变量
store (存储)作用于工作内存把工作内存中一个变量的值传送到主内存中
write (写入)作用于主内存把 store 操作从工作内存中得到的变量的值放入主内存的变量中

2. happens-before 原则

2.1 什么是 happens-before

happens-before 是 JMM 中最重要的原则之一,它定义了两个操作之间的偏序关系。如果操作 A happens-before 操作 B,那么 A 操作的结果对 B 操作是可见的。

2.2 八大 happens-before 规则

  1. 程序次序规则:在一个线程内,书写在前面的代码 happens-before 书写在后面的代码
  2. 管程锁定规则:一个 unlock 操作 happens-before 后面对同一个锁的 lock 操作
  3. volatile 变量规则:对一个 volatile 变量的写操作 happens-before 后面对这个变量的读操作
  4. 线程启动规则:Thread 对象的 start() 方法 happens-before 于此线程的每一个动作
  5. 线程终止规则:线程中的所有操作都 happens-before 对此线程的终止检测
  6. 线程中断规则:对线程 interrupt() 方法的调用 happens-before 发生于被中断线程的代码检测到中断事件
  7. 对象终结规则:一个对象的初始化完成 happens-before 于它的 finalize() 方法的开始
  8. 传递性:如果 A happens-before B,且 B happens-before C,那么 A happens-before C
public class HappensBeforeDemo {
    private int value = 0;
    private volatile boolean flag = false;
    private final Object lock = new Object();

    // 示例1:程序次序规则
    public void programOrderRule() {
        int a = 1;        // 1
        int b = 2;        // 2 - 1 happens-before 2
        int c = a + b;    // 3 - 2 happens-before 3
        System.out.println(c); // 4 - 3 happens-before 4
    }

    // 示例2:管程锁定规则
    public void monitorLockRule() {
        synchronized (lock) {
            value = 42;   // 写操作
        } // unlock happens-before 后续的 lock

        synchronized (lock) {
            System.out.println(value); // 能看到 value = 42
        }
    }

    // 示例3:volatile变量规则
    public void volatileRule() {
        flag = true;      // volatile写 - happens-before 后续的volatile读
        if (flag) {       // volatile读 - 能看到上面的写入
            System.out.println("Flag is true");
        }
    }

    // 示例4:线程启动规则
    public void threadStartRule() {
        value = 100;      // 1
        Thread thread = new Thread(() -> {
            System.out.println(value); // 2 - 能看到 value = 100
        });
        thread.start();   // start() happens-before 线程中的动作
    }

    // 示例5:传递性规则
    public void transitivityRule() {
        // A happens-before B
        synchronized (lock) {
            value = 200;  // A
        }

        // B happens-before C
        Thread thread = new Thread(() -> {
            synchronized (lock) {
                System.out.println(value); // C - 能看到 value = 200
            }
        });
        thread.start(); // 启动线程
    }
}

可见性问题实战

1. 可见性问题演示

1.1 经典的可见性问题

/**
 * 可见性问题演示:子线程可能永远看不到主线程对flag的修改
 */
public class VisibilityProblem {

    // 如果不加volatile,子线程可能永远看不到flag的变化
    private static boolean flag = false;

    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 可见性问题演示 ===");

        // 创建子线程
        Thread workerThread = new Thread(() -> {
            System.out.println("子线程启动,开始监控flag...");
            int localCount = 0;

            while (!flag) {  // 如果没有volatile,可能无限循环
                localCount++;
                // 偶尔打印,避免控制台刷屏
                if (localCount % 100_000_000 == 0) {
                    System.out.println("子线程仍在循环,计数: " + localCount);
                }
            }

            System.out.println("子线程检测到flag = true,退出循环");
            System.out.println("子线程最终计数: " + localCount);
        }, "Worker-Thread");

        workerThread.start();

        // 主线程休眠1秒后修改flag
        Thread.sleep(1000);
        System.out.println("主线程将设置 flag = true");
        flag = true;
        System.out.println("主线程已设置 flag = true");

        // 再等待2秒,看子线程是否能正常退出
        Thread.sleep(2000);

        if (workerThread.isAlive()) {
            System.out.println("警告:子线程仍在运行,可能存在可见性问题!");
            // 强制中断(仅用于演示)
            workerThread.interrupt();
        } else {
            System.out.println("子线程已正常退出");
        }
    }
}

1.2 深度分析:为什么会出现可见性问题?

/**
 * 深度分析可见性问题的根源
 */
public class VisibilityAnalysis {

    // 模拟缓存行和内存一致性问题
    private static class SharedData {
        // 不使用volatile,模拟缓存一致性协议失效
        public int counter = 0;
        public boolean ready = false;

        // 使用volatile,确保可见性
        public volatile boolean volatileReady = false;
    }

    private static final SharedData data = new SharedData();

    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 可见性问题深度分析 ===");

        // 测试1:无volatile的情况
        System.out.println("n--- 测试1:无volatile修饰的变量 ---");
        testWithoutVolatile();

        Thread.sleep(1000);

        // 测试2:有volatile的情况
        System.out.println("n--- 测试2:使用volatile修饰的变量 ---");
        testWithVolatile();
    }

    private static void testWithoutVolatile() throws InterruptedException {
        data.ready = false;
        data.counter = 0;

        Thread writerThread = new Thread(() -> {
            try {
                Thread.sleep(500); // 确保读取线程先开始
                data.counter = 100;
                data.ready = true;
                System.out.println("写入线程:ready = true, counter = 100");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Writer-Thread");

        Thread readerThread = new Thread(() -> {
            int localCount = 0;
            while (!data.ready) {
                localCount++;
                // 空循环,依赖CPU缓存中的ready值
            }
            System.out.println("读取线程:检测到ready = true");
            System.out.println("读取线程:counter = " + data.counter +
                             " (可能看不到更新后的值)");
        }, "Reader-Thread");

        readerThread.start();
        writerThread.start();

        // 设置超时,避免无限等待
        readerThread.join(3000);
        if (readerThread.isAlive()) {
            System.out.println("读取线程超时,可能存在可见性问题");
            readerThread.interrupt();
        }
    }

    private static void testWithVolatile() throws InterruptedException {
        data.volatileReady = false;
        data.counter = 0;

        Thread writerThread = new Thread(() -> {
            try {
                Thread.sleep(500);
                data.counter = 200;
                data.volatileReady = true;
                System.out.println("写入线程:volatileReady = true, counter = 200");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Volatile-Writer");

        Thread readerThread = new Thread(() -> {
            int localCount = 0;
            while (!data.volatileReady) {
                localCount++;
            }
            System.out.println("读取线程:检测到volatileReady = true");
            System.out.println("读取线程:counter = " + data.counter +
                             " (应该能看到更新后的值)");
        }, "Volatile-Reader");

        readerThread.start();
        writerThread.start();

        readerThread.join(3000);
        if (readerThread.isAlive()) {
            System.out.println("读取线程超时,这不应该发生");
            readerThread.interrupt();
        } else {
            System.out.println("读取线程正常退出,volatile保证了可见性");
        }
    }
}

2. volatile 关键字深度解析

2.1 volatile 的基本使用

/**
 * volatile关键字的基本使用和特性演示
 */
public class VolatileBasics {

    // volatile变量:保证可见性和有序性,但不保证原子性
    private volatile boolean flag = false;
    private volatile int counter = 0;

    // 非volatile变量:用于对比
    private boolean normalFlag = false;
    private int normalCounter = 0;

    public static void main(String[] args) throws InterruptedException {
        VolatileBasics demo = new VolatileBasics();

        System.out.println("=== volatile关键字基础演示 ===");

        // 测试1:volatile保证可见性
        demo.testVisibility();

        Thread.sleep(1000);

        // 测试2:volatile不保证原子性
        demo.testAtomicity();

        Thread.sleep(1000);

        // 测试3:volatile防止指令重排序
        demo.testOrdering();
    }

    // 测试可见性
    private void testVisibility() throws InterruptedException {
        System.out.println("n--- 测试1:volatile可见性保证 ---");

        Thread writer = new Thread(() -> {
            try {
                Thread.sleep(500);
                flag = true;
                normalFlag = true;
                System.out.println("写入线程:设置 flag = true, normalFlag = true");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread volatileReader = new Thread(() -> {
            int count = 0;
            while (!flag) {
                count++;
            }
            System.out.println("volatile读取线程:检测到flag = true,循环次数: " + count);
        });

        Thread normalReader = new Thread(() -> {
            int count = 0;
            while (!normalFlag) {
                count++;
            }
            System.out.println("普通读取线程:检测到normalFlag = true,循环次数: " + count);
        });

        volatileReader.start();
        normalReader.start();
        writer.start();

        volatileReader.join(2000);
        normalReader.join(2000);

        if (volatileReader.isAlive()) {
            volatileReader.interrupt();
            System.out.println("volatile读取线程超时(不应该发生)");
        }

        if (normalReader.isAlive()) {
            normalReader.interrupt();
            System.out.println("普通读取线程超时(可能发生)");
        }
    }

    // 测试原子性(演示volatile不能保证原子性)
    private void testAtomicity() throws InterruptedException {
        System.out.println("n--- 测试2:volatile不能保证原子性 ---");

        final int THREAD_COUNT = 10;
        final int INCREMENTS_PER_THREAD = 1000;
        Thread[] threads = new Thread[THREAD_COUNT];

        // 重置计数器
        counter = 0;
        normalCounter = 0;

        // 创建多个线程增加volatile计数器
        for (int i = 0; i < THREAD_COUNT; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < INCREMENTS_PER_THREAD; j++) {
                    counter++;  // 非原子操作:读取-修改-写入
                    normalCounter++;
                }
            });
        }

        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }

        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }

        int expectedCount = THREAD_COUNT * INCREMENTS_PER_THREAD;
        System.out.println("预期计数: " + expectedCount);
        System.out.println("volatile计数器实际值: " + counter);
        System.out.println("普通计数器实际值: " + normalCounter);
        System.out.println("volatile计数器是否正确: " + (counter == expectedCount));
        System.out.println("普通计数器是否正确: " + (normalCounter == expectedCount));
    }

    // 测试有序性(防止指令重排序)
    private void testOrdering() {
        System.out.println("n--- 测试3:volatile防止指令重排序 ---");

        // 这个例子演示了volatile如何防止指令重排序
        // 在单例模式的懒汉式中,volatile是必需的
        Singleton instance1 = Singleton.getInstance();
        Singleton instance2 = Singleton.getInstance();

        System.out.println("单例模式测试完成");
        System.out.println("实例1是否等于实例2: " + (instance1 == instance2));
    }

    // 演示volatile在单例模式中的应用
    private static class Singleton {
        private static volatile Singleton instance;

        private Singleton() {
            // 防止反射创建实例
            if (instance != null) {
                throw new IllegalStateException("Singleton already initialized");
            }
        }

        public static Singleton getInstance() {
            if (instance == null) {  // 第一次检查
                synchronized (Singleton.class) {
                    if (instance == null) {  // 第二次检查
                        instance = new Singleton();
                    }
                }
            }
            return instance;
        }
    }
}

2.2 volatile 的底层实现原理

/**
 * volatile底层实现原理演示和分析
 */
public class VolatileImplementation {

    // volatile变量
    private volatile int volatileVar = 0;

    // 普通变量
    private int normalVar = 0;

    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== volatile底层实现原理分析 ===");

        VolatileImplementation demo = new VolatileImplementation();

        // 演示内存屏障的效果
        demo.testMemoryBarriers();

        Thread.sleep(500);

        // 演示缓存一致性协议
        demo.testCacheCoherence();
    }

    // 测试内存屏障的效果
    private void testMemoryBarriers() throws InterruptedException {
        System.out.println("n--- 内存屏障效果演示 ---");

        // 设置变量
        volatileVar = 1;
        normalVar = 1;

        Thread reader1 = new Thread(() -> {
            int value1 = volatileVar;  // LoadLoad屏障
            int value2 = normalVar;    // 可能被重排序到volatileVar读取之前
            System.out.println("线程1读取:volatileVar=" + value1 + ", normalVar=" + value2);
        });

        Thread reader2 = new Thread(() -> {
            int value3 = normalVar;    // 普通读取
            int value4 = volatileVar;  // LoadLoad屏障确保normalVar读取在volatileVar之前
            System.out.println("线程2读取:normalVar=" + value3 + ", volatileVar=" + value4);
        });

        reader1.start();
        reader2.start();

        reader1.join();
        reader2.join();

        System.out.println("内存屏障确保了volatile读操作的有序性");
    }

    // 测试缓存一致性协议
    private void testCacheCoherence() throws InterruptedException {
        System.out.println("n--- 缓存一致性协议演示 ---");

        final int ITERATIONS = 100;

        Thread writer = new Thread(() -> {
            for (int i = 1; i <= ITERATIONS; i++) {
                volatileVar = i;
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
                if (i % 20 == 0) {
                    System.out.println("写入线程:volatileVar = " + i);
                }
            }
        });

        Thread reader = new Thread(() -> {
            int lastValue = 0;
            int readCount = 0;

            while (readCount < ITERATIONS) {
                int currentValue = volatileVar;
                if (currentValue != lastValue) {
                    readCount++;
                    lastValue = currentValue;
                    System.out.println("读取线程:检测到新值 " + currentValue +
                                     " (第" + readCount + "次读取)");
                }

                // 模拟其他工作
                try {
                    Thread.sleep(5);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });

        writer.start();
        reader.start();

        writer.join();
        reader.join();

        System.out.println("缓存一致性协议确保了所有线程看到最新的volatile值");
    }
}

/**
 * volatile字节码分析助手
 */
public class VolatileBytecodeAnalysis {

    // 编译命令:javap -c VolatileBytecodeAnalysis
    public static void main(String[] args) {
        System.out.println("请使用 javap -c VolatileBytecodeAnalysis 查看字节码");
        System.out.println("对比volatile变量和普通变量的字节码差异");
    }

    private volatile int volatileField;
    private int normalField;

    public void writeVolatile() {
        volatileField = 1;  // 字节码中会有putfield指令,但JVM会插入内存屏障
    }

    public void writeNormal() {
        normalField = 1;    // 普通的putfield指令
    }

    public int readVolatile() {
        return volatileField;  // 字节码中会有getfield指令,但JVM会插入内存屏障
    }

    public int readNormal() {
        return normalField;    // 普通的getfield指令
    }
}

3. 指令重排序与内存屏障

3.1 指令重排序问题演示

/**
 * 指令重排序问题演示
 */
public class ReorderingProblem {

    private static int x = 0, y = 0;
    private static int a = 0, b = 0;

    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 指令重排序问题演示 ===");

        int reorderCount = 0;
        int totalTests = 1000000;

        for (int i = 0; i < totalTests; i++) {
            // 重置变量
            x = y = a = b = 0;

            // 创建两个线程
            Thread t1 = new Thread(() -> {
                a = 1;  // 操作1
                x = b;  // 操作2 - 可能被重排序到操作1之前
            });

            Thread t2 = new Thread(() -> {
                b = 1;  // 操作3
                y = a;  // 操作4 - 可能被重排序到操作3之前
            });

            t1.start();
            t2.start();

            t1.join();
            t2.join();

            // 检查是否发生了指令重排序
            if (x == 0 && y == 0) {
                reorderCount++;
                System.out.println("第" + (i + 1) + "次测试检测到指令重排序!x=" + x + ", y=" + y);

                // 只显示前几次
                if (reorderCount >= 5) {
                    break;
                }
            }
        }

        System.out.println("总测试次数: " + Math.min(totalTests, reorderCount == 0 ? totalTests : 5000));
        System.out.println("检测到重排序次数: " + reorderCount);

        if (reorderCount > 0) {
            System.out.println("指令重排序确实发生了!这说明需要内存屏障来保证有序性。");
        } else {
            System.out.println("在当前测试中未检测到明显的指令重排序(可能需要更多测试)");
        }
    }
}

3.2 内存屏障的详细分析

/**
 * 内存屏障详细分析和演示
 */
public class MemoryBarrierAnalysis {

    // 使用volatile来演示内存屏障的插入
    private volatile boolean flag = false;
    private int data = 0;
    private int ready = 0;

    public static void main(String[] args) throws InterruptedException {
        MemoryBarrierAnalysis demo = new MemoryBarrierAnalysis();

        System.out.println("=== 内存屏障详细分析 ===");

        // 演示不同类型的内存屏障
        demo.demoLoadLoadBarriers();

        Thread.sleep(500);

        demo.demoStoreStoreBarriers();

        Thread.sleep(500);

        demo.demoLoadStoreBarriers();

        Thread.sleep(500);

        demo.demoStoreLoadBarriers();
    }

    // LoadLoad屏障演示
    private void demoLoadLoadBarriers() throws InterruptedException {
        System.out.println("n--- LoadLoad屏障演示 ---");
        System.out.println("LoadLoad屏障:确保Load1的读取操作先于Load2及后续读取操作");

        data = 42;
        flag = true;  // volatile写

        Thread reader = new Thread(() -> {
            // LoadLoad屏障在这里插入(由volatile读触发)
            boolean flagValue = flag;  // Load1
            int dataValue = data;      // Load2 - 确保在Load1之后

            System.out.println("LoadLoad屏障效果:flag=" + flagValue + ", data=" + dataValue);
            System.out.println("确保了data的读取在flag读取之后进行");
        });

        reader.start();
        reader.join();
    }

    // StoreStore屏障演示
    private void demoStoreStoreBarriers() throws InterruptedException {
        System.out.println("n--- StoreStore屏障演示 ---");
        System.out.println("StoreStore屏障:确保Store1立刻对其他处理器可见,先于Store2及后续存储");

        Thread writer = new Thread(() -> {
            data = 100;     // Store1
            // StoreStore屏障在这里插入(由volatile写触发)
            ready = 1;      // Store2 - volatile写

            System.out.println("StoreStore屏障确保data=100的写入对ready=1的写入之前可见");
        });

        Thread observer = new Thread(() -> {
            while (ready != 1) {
                Thread.yield();
            }
            int dataValue = data;
            System.out.println("观察者线程看到ready=1,data=" + dataValue +
                             " (应该能看到data=100)");
        });

        observer.start();
        writer.start();

        writer.join();
        observer.join();
    }

    // LoadStore屏障演示
    private void demoLoadStoreBarriers() throws InterruptedException {
        System.out.println("n--- LoadStore屏障演示 ---");
        System.out.println("LoadStore屏障:确保Load1数据装载先于Store2及后续存储刷新到内存");

        flag = false;  // 重置
        data = 0;

        Thread processor = new Thread(() -> {
            boolean flagValue = flag;  // Load1
            // LoadStore屏障在这里插入
            data = 200;               // Store1 - 确保在Load之后

            System.out.println("LoadStore屏障:先读取flag,后写入data");
        });

        Thread flagSetter = new Thread(() -> {
            try {
                Thread.sleep(100);
                flag = true;  // 设置flag
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        flagSetter.start();
        processor.start();

        processor.join();
        flagSetter.join();
    }

    // StoreLoad屏障演示
    private void demoStoreLoadBarriers() throws InterruptedException {
        System.out.println("n--- StoreLoad屏障演示 ---");
        System.out.println("StoreLoad屏障:确保Store1立刻对其他处理器可见,先于Load2及后续装载");

        final boolean[] stopFlag = {false};

        Thread writer = new Thread(() -> {
            int counter = 0;
            while (!stopFlag[0]) {
                counter++;
                data = counter;      // Store1
                // StoreLoad屏障在这里插入(由volatile读写触发)
                boolean currentFlag = flag; // Load1

                if (counter % 100000 == 0) {
                    System.out.println("写入线程:data=" + counter + ", flag=" + currentFlag);
                }
            }
        });

        Thread reader = new Thread(() -> {
            try {
                Thread.sleep(100);
                flag = true;  // volatile写
                Thread.sleep(100);
                stopFlag[0] = true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        writer.start();
        reader.start();

        writer.join();
        reader.join();

        System.out.println("StoreLoad屏障确保存储操作对后续加载操作可见");
    }
}

实战练习

1. 基础练习 ⭐

任务: 使用 volatile 解决简单的可见性问题

/**
 * 基础练习:使用volatile解决可见性问题
 */
public class BasicExercise {

    // 任务1:使用volatile修复可见性问题
    private volatile boolean shutdownRequested = false;

    // 任务2:使用volatile实现简单的状态机
    private volatile String currentState = "INIT";

    public static void main(String[] args) throws InterruptedException {
        BasicExercise exercise = new BasicExercise();

        System.out.println("=== 基础练习:volatile可见性问题 ===");

        // 练习1:优雅停止线程
        exercise.exercise1_GracefulShutdown();

        Thread.sleep(1000);

        // 练习2:状态通知
        exercise.exercise2_StateNotification();
    }

    // 练习1:优雅停止线程
    private void exercise1_GracefulShutdown() throws InterruptedException {
        System.out.println("n--- 练习1:优雅停止工作线程 ---");

        Thread worker = new Thread(() -> {
            System.out.println("工作线程开始运行...");
            int taskCount = 0;

            while (!shutdownRequested) {
                taskCount++;

                // 模拟工作
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }

                // 定期报告状态
                if (taskCount % 10 == 0) {
                    System.out.println("工作线程:已完成 " + taskCount + " 个任务");
                }
            }

            System.out.println("工作线程:收到停止信号,正在清理资源...");
            try {
                Thread.sleep(200); // 模拟清理工作
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            System.out.println("工作线程:已停止,总共处理了 " + taskCount + " 个任务");
        }, "Worker-Thread");

        worker.start();

        // 主线程运行一段时间后请求停止
        Thread.sleep(2000);
        System.out.println("主线程:请求停止工作线程");
        shutdownRequested = true;

        worker.join(1000);
        if (worker.isAlive()) {
            System.out.println("主线程:工作线程未及时响应,强制中断");
            worker.interrupt();
        } else {
            System.out.println("主线程:工作线程已优雅停止");
        }
    }

    // 练习2:状态通知
    private void exercise2_StateNotification() throws InterruptedException {
        System.out.println("n--- 练习2:基于volatile的状态通知 ---");

        String[] states = {"INIT", "READY", "PROCESSING", "COMPLETED"};

        Thread stateMonitor = new Thread(() -> {
            String lastState = "";
            while (!"COMPLETED".equals(currentState)) {
                if (!currentState.equals(lastState)) {
                    lastState = currentState;
                    System.out.println("状态监控器:检测到状态变化 -> " + lastState);
                }

                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            System.out.println("状态监控器:处理已完成");
        }, "State-Monitor");

        Thread stateChanger = new Thread(() -> {
            try {
                for (String state : states) {
                    currentState = state;
                    System.out.println("状态设置器:设置状态为 " + state);
                    Thread.sleep(300);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "State-Changer");

        stateMonitor.start();
        stateChanger.start();

        stateChanger.join();
        stateMonitor.join();

        System.out.println("练习2完成:状态通知机制工作正常");
    }
}

2. 进阶练习 ⭐⭐

任务: 实现一个基于 volatile 的自旋锁

/**
 * 进阶练习:基于volatile的自旋锁实现
 */
public class AdvancedExercise {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 进阶练习:volatile自旋锁 ===");

        // 测试自旋锁
        testSpinLock();

        Thread.sleep(1000);

        // 测试可重入自旋锁
        testReentrantSpinLock();
    }

    // 测试基础自旋锁
    private static void testSpinLock() throws InterruptedException {
        System.out.println("n--- 测试基础自旋锁 ---");

        SpinLock spinLock = new SpinLock();
        AtomicInteger counter = new AtomicInteger(0);
        int threadCount = 5;
        int incrementsPerThread = 1000;
        CountDownLatch latch = new CountDownLatch(threadCount);

        // 创建多个线程竞争锁
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    for (int j = 0; j < incrementsPerThread; j++) {
                        spinLock.lock();
                        try {
                            // 临界区:简单计数
                            counter.incrementAndGet();

                            // 模拟一些工作
                            Thread.sleep(1);
                        } finally {
                            spinLock.unlock();
                        }
                    }
                    System.out.println("线程 " + threadId + " 完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            }, "SpinLock-Thread-" + i).start();
        }

        latch.await();
        System.out.println("所有线程完成,最终计数: " + counter.get());
        System.out.println("预期计数: " + (threadCount * incrementsPerThread));
        System.out.println("计数正确: " + (counter.get() == threadCount * incrementsPerThread));
    }

    // 测试可重入自旋锁
    private static void testReentrantSpinLock() throws InterruptedException {
        System.out.println("n--- 测试可重入自旋锁 ---");

        ReentrantSpinLock reentrantLock = new ReentrantSpinLock();
        AtomicInteger recursiveCounter = new AtomicInteger(0);

        Thread recursiveThread = new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("外层锁获取成功");

                reentrantLock.lock();
                try {
                    System.out.println("内层锁获取成功");

                    reentrantLock.lock();
                    try {
                        System.out.println("最内层锁获取成功");
                        recursiveCounter.set(100);
                    } finally {
                        reentrantLock.unlock();
                        System.out.println("最内层锁释放");
                    }
                } finally {
                    reentrantLock.unlock();
                    System.out.println("内层锁释放");
                }
            } finally {
                reentrantLock.unlock();
                System.out.println("外层锁释放");
            }
        }, "Recursive-Thread");

        recursiveThread.start();
        recursiveThread.join();

        System.out.println("可重入测试完成,递归计数器值: " + recursiveCounter.get());
    }
}

/**
 * 基于volatile的简单自旋锁实现
 */
class SpinLock {
    private volatile boolean locked = false;

    public void lock() {
        // 自旋直到获取锁
        while (locked) {
            // 自旋等待
            Thread.onSpinWait(); // Java 9+ 提示处理器进入自旋等待状态
        }

        // 设置锁状态
        locked = true;
    }

    public void unlock() {
        locked = false;
    }
}

/**
 * 基于volatile的可重入自旋锁实现
 */
class ReentrantSpinLock {
    private volatile Thread owner = null;
    private volatile int recursionCount = 0;

    public void lock() {
        Thread currentThread = Thread.currentThread();

        // 如果已经持有锁,增加重入计数
        if (owner == currentThread) {
            recursionCount++;
            return;
        }

        // 自旋直到获取锁
        while (owner != null) {
            Thread.onSpinWait();
        }

        // 获取锁
        owner = currentThread;
        recursionCount = 1;
    }

    public void unlock() {
        Thread currentThread = Thread.currentThread();

        if (owner != currentThread) {
            throw new IllegalStateException("当前线程未持有锁");
        }

        recursionCount--;

        if (recursionCount == 0) {
            owner = null;
        }
    }
}

3. 挑战练习 ⭐⭐⭐

任务: 实现一个高性能的缓存系统,使用 volatile 保证数据一致性

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * 挑战练习:高性能缓存系统实现
 */
public class ChallengeExercise {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("=== 挑战练习:高性能缓存系统 ===");

        // 测试缓存系统
        testCacheSystem();

        Thread.sleep(1000);

        // 性能测试
        performanceTest();
    }

    // 基础功能测试
    private static void testCacheSystem() throws InterruptedException {
        System.out.println("n--- 缓存系统功能测试 ---");

        CacheSystem<String, String> cache = new CacheSystem<>();

        // 测试基本的put/get操作
        cache.put("key1", "value1");
        cache.put("key2", "value2");

        System.out.println("获取key1: " + cache.get("key1"));
        System.out.println("获取key2: " + cache.get("key2"));
        System.out.println("获取不存在的key: " + cache.get("nonexistent"));

        // 测试并发访问
        int threadCount = 10;
        int operationsPerThread = 100;
        CountDownLatch latch = new CountDownLatch(threadCount);
        AtomicInteger successCount = new AtomicInteger(0);
        AtomicInteger missCount = new AtomicInteger(0);

        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    for (int j = 0; j < operationsPerThread; j++) {
                        String key = "key" + (threadId * 10 + j % 10);
                        String value = "value" + j;

                        // 尝试获取
                        String cachedValue = cache.get(key);
                        if (cachedValue == null) {
                            // 缓存未命中,放入缓存
                            cache.put(key, value);
                            missCount.incrementAndGet();
                        } else {
                            // 缓存命中
                            successCount.incrementAndGet();
                        }

                        // 模拟一些处理时间
                        if (j % 20 == 0) {
                            Thread.sleep(1);
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            }, "Cache-Thread-" + i).start();
        }

        latch.await();

        System.out.println("并发测试完成:");
        System.out.println("缓存命中次数: " + successCount.get());
        System.out.println("缓存未命中次数: " + missCount.get());
        System.out.println("缓存命中率: " +
                         String.format("%.2f%%",
                             (double) successCount.get() / (successCount.get() + missCount.get()) * 100));

        // 测试缓存统计
        CacheStats stats = cache.getStats();
        System.out.println("缓存统计: " + stats);
    }

    // 性能测试
    private static void performanceTest() throws InterruptedException {
        System.out.println("n--- 性能测试 ---");

        CacheSystem<Integer, String> cache = new CacheSystem<>();
        int threadCount = 20;
        int operationsPerThread = 10000;

        System.out.println("启动 " + threadCount + " 个线程,每个执行 " + operationsPerThread + " 次操作");

        long startTime = System.nanoTime();
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    for (int j = 0; j < operationsPerThread; j++) {
                        int key = threadId * 1000 + (j % 1000);
                        String value = "value-" + key;

                        // 随机操作:70%读取,30%写入
                        if (Math.random() < 0.7) {
                            cache.get(key);
                        } else {
                            cache.put(key, value);
                        }
                    }
                } finally {
                    latch.countDown();
                }
            }, "Perf-Thread-" + i).start();
        }

        latch.await();
        long endTime = System.nanoTime();

        long totalOperations = threadCount * operationsPerThread;
        double duration = (endTime - startTime) / 1_000_000.0; // 毫秒
        double opsPerSecond = totalOperations / (duration / 1000.0);

        System.out.println("性能测试结果:");
        System.out.println("总操作数: " + totalOperations);
        System.out.println("总耗时: " + String.format("%.2f", duration) + " ms");
        System.out.println("吞吐量: " + String.format("%.0f", opsPerSecond) + " ops/sec");

        CacheStats finalStats = cache.getStats();
        System.out.println("最终缓存统计: " + finalStats);
    }
}

/**
 * 高性能缓存系统实现
 */
class CacheSystem<K, V> {

    // 使用volatile保证缓存状态的可见性
    private volatile CacheEntry<K, V>[] table;
    private volatile int size;
    private volatile int modCount;

    // 统计信息
    private final AtomicInteger hitCount = new AtomicInteger(0);
    private final AtomicInteger missCount = new AtomicInteger(0);
    private final AtomicInteger putCount = new AtomicInteger(0);

    // 缓存配置
    private static final int DEFAULT_CAPACITY = 16;
    private static final float LOAD_FACTOR = 0.75f;
    private final int capacity;

    @SuppressWarnings("unchecked")
    public CacheSystem() {
        this.capacity = DEFAULT_CAPACITY;
        this.table = new CacheEntry[capacity];
        this.size = 0;
        this.modCount = 0;
    }

    public void put(K key, V value) {
        if (key == null) {
            throw new IllegalArgumentException("Key cannot be null");
        }

        int index = getIndex(key);
        CacheEntry<K, V> entry = new CacheEntry<>(key, value);

        // 简单的线性探测处理冲突
        while (table[index] != null && !table[index].key.equals(key)) {
            index = (index + 1) % capacity;
        }

        if (table[index] == null) {
            size++;
        }

        table[index] = entry;
        putCount.incrementAndGet();
        modCount++;
    }

    public V get(K key) {
        if (key == null) {
            return null;
        }

        int index = getIndex(key);
        int startIndex = index;

        // 线性探测查找
        while (table[index] != null) {
            if (table[index].key.equals(key)) {
                hitCount.incrementAndGet();
                return table[index].value;
            }

            index = (index + 1) % capacity;
            if (index == startIndex) {
                break; // 遍历完整个表
            }
        }

        missCount.incrementAndGet();
        return null;
    }

    private int getIndex(K key) {
        return Math.abs(key.hashCode()) % capacity;
    }

    public CacheStats getStats() {
        int totalRequests = hitCount.get() + missCount.get();
        double hitRate = totalRequests == 0 ? 0.0 : (double) hitCount.get() / totalRequests;

        return new CacheStats(
            size,
            capacity,
            hitCount.get(),
            missCount.get(),
            putCount.get(),
            hitRate
        );
    }

    // 缓存条目
    private static class CacheEntry<K, V> {
        final K key;
        volatile V value;

        CacheEntry(K key, V value) {
            this.key = key;
            this.value = value;
        }
    }

    // 缓存统计信息
    public static class CacheStats {
        private final int size;
        private final int capacity;
        private final int hitCount;
        private final int missCount;
        private final int putCount;
        private final double hitRate;

        public CacheStats(int size, int capacity, int hitCount, int missCount,
                         int putCount, double hitRate) {
            this.size = size;
            this.capacity = capacity;
            this.hitCount = hitCount;
            this.missCount = missCount;
            this.putCount = putCount;
            this.hitRate = hitRate;
        }

        @Override
        public String toString() {
            return String.format(
                "CacheStats{size=%d, capacity=%d, hitCount=%d, missCount=%d, putCount=%d, hitRate=%.2f%%}",
                size, capacity, hitCount, missCount, putCount, hitRate * 100
            );
        }
    }
}

今日总结

核心要点回顾

  1. Java 内存模型 (JMM)

    • 抽象了主内存和工作内存的概念
    • 定义了 8 种原子操作来管理内存交互
    • 为并发编程提供了规范保证
  2. JMM 三大特性

    • 原子性:操作不可分割(需要 synchronized 或 atomic 类保证)
    • 可见性:变量修改对其他线程立即可见(volatile 或 synchronized 保证)
    • 有序性:程序执行顺序与代码顺序一致(volatile 或 synchronized 保证)
  3. volatile 关键字

    • 保证可见性和有序性,但不保证原子性
    • 通过插入内存屏障来实现有序性保证
    • 基于 MESI 等缓存一致性协议实现可见性
  4. happens-before 原则

    • 定义了操作间的偏序关系
    • 是判断数据是否存在竞争、线程是否安全的重要依据
    • 8 大规则为并发编程提供了理论基础
  5. 指令重排序与内存屏障

    • 编译器和处理器可能会进行指令重排序优化
    • 内存屏障可以禁止特定类型重排序
    • volatile 读/写会插入相应的内存屏障

最佳实践建议

  1. volatile 使用场景

    • 状态标志位(如停止标志)
    • 双重检查锁定(DCL)中的实例变量
    • 独立观察者的发布场景
    • 读写锁的状态变量
  2. 避免的陷阱

    • 不要依赖 volatile 保证复合操作的原子性
    • 不要在 volatile 变量上进行递增或递减操作
    • 理解 volatile 的性能开销(相对 synchronized 较小)
  3. 并发编程原则

    • 优先使用不可变对象
    • 使用适当的同步机制(synchronized、volatile、atomic 类)
    • 理解 happens-before 关系,避免数据竞争

下节预告

明天我们将深入学习 线程生命周期与状态转换,包括:

  • 线程的六大状态详解
  • 线程控制方法:sleep、join、yield、interrupt
  • 守护线程的特性与应用
  • 线程状态监控和调试技巧

课后作业

  1. 理论作业

    • 详细解释 JMM 的内存交互过程
    • 分析 volatile 与 synchronized 的异同点
    • 列举 happens-before 的 8 大规则并举例说明
  2. 实践作业

    • 实现一个基于 volatile 的任务调度器
    • 优化之前的生产者-消费者代码,使用 volatile 优化性能
    • 分析一个开源项目中 volatile 的使用情况
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:[email protected]