Java多线程进阶


在之前我们了解线程的基础工作方式,即不同核心处理不同线程任务。

而多线程在提供高性能的之际同样也带来了一些不足之处,例如不同线程之间既然相互独立,那又该如何实现线程通讯呢?以及多线程对访问相同资源时的竞争关系又如何解决?

在本篇文章中,让我们通过 VolatileThreadLocal 为媒介初步认识下多线程之间的交互。

一、Atomic类

1. 基本定义

在针对多线程数据原子性问题除了通过加锁的方式外, Java 提供了一系列原子类(如 AtomicInteger 等)以保证数据的原子性,其核心通过反射获取 Unsafe 实例从而实现 CAS 操作,保证了数据的原子性。

AtomicInteger 类为例,下表中列出其常用方法及其描述信息。

方法 作用
set() 为对象设置值。
get() 获取对象值。
incrementAndGet() 返回对象自增 1 后的值,同理还有 getAndIncrement()。
decrementAndGet() 返回对象自减 1 后的值,同理还有 getAndDecrement()。

同样以 AtomicInteger 举例,示例代码如下:

public void atomicDemo() {
    AtomicInteger atomicInteger = new AtomicInteger(1);
    // 修改值
    atomicInteger.set(2);
    System.out.println("get: " + atomicInteger.get());
    // 等价自增
    atomicInteger.incrementAndGet();
    System.out.println("incrementAndGet: " + atomicInteger);
    // 等价自减
    atomicInteger.decrementAndGet();
    System.out.println("decrementAndGet: " + atomicInteger);
}

2. 原子引用

除了自带的 AtomicInteger 等原子类,可通过 AtomicReference 为复杂对象提供原子操作。

AtomicReference 声明的对象操作与 AtomicInteger 等系统已经封装好的原子类类似,可在定义时通过 new 关键字设置初始化值,也可通过后续的 set() 方法设置值,其操作皆满足线程安全。

public void referenceDemo() {
    // 基本类型原子操作
    AtomicReference<Integer> atomicReference = new AtomicReference<>(0);
    atomicReference.updateAndGet(it -> it + 1);
    System.out.println("Get: " + atomicReference.get());

    // 复杂对象原子操作
    AtomicReference<User> userAtomic = new AtomicReference<>(new User("Alex"));
    userAtomic.getAndUpdate(it -> new User("Beth"));
    System.out.println("Get: " + userAtomic.get());
}

static class User {
    private String name;

    public User(String name) {
        this.name = name;
    }
}

二、Volatile

在了解 volatile 关键字之前,我们需要先初步了解一下 JVM 内存工作机制。

1. 内存机制

JVM 中内存可以粗暴的分为 主内存本地内存,默认所有的变量声明都存放在 主内存 中,当一个线程需要修改变量的值时需要先从 主内存 中以副本的形式读取至线程的 本地内存 ,完成更改后再将新值重新写回 主内存

而读取到写回这个过程中即存在时间差,若未能及时写回 主内存,其它进程获取的值仍为未发生改变之前的值,从而引发数据不一致问题。

2. 解决方案

为了解决上述提到的读写时间差造成的影响,在 Java 中提供了 volatile 关键字,即通过 volatile 声明的变量对任意一个线程都是实时可见的,同时避免数据的异步不一致与程序指令重排。

注意其只能保证变量的 可见性,即变量发生变化会立即更新主存数据,但若并发操作 volatile 变量仍会出现原子性问题,同时注意 volatile 只能修饰 类变量实例变量 ,对于 方法参数常量 等等均非法。

3. 指令重排

在上一点中讲到了 Volatile 可以禁止实现指令重排,首先介绍一下什么是指令重排。

在运行 Java 文件时,当代码行之间没有强依赖时它的执行顺序不一样是按照编写的顺序,如图示例在实际编译时 int b = 20 的执行可能会在 int a = 10 之前,因为二者并无强关联,谁先定义都不影响程序的正确执行。但 int sum = a + b 一定是在 ab 定义之后才会运行,因为 sum 的值强依赖于 ab,而实际编译顺序与代码定义顺序的不同的这个过程就是指令重排。

而由 Volatile 关键字修饰的变量其在写入之前需要先完成读取,同时写入完成之后需立即回写进主内存,其读写的顺序是确定且无法改变的,因此也就无法实现指令重排。

4. 示例演示

下面通过一个示例演示 volatile 的效果,示例中分别创建了两个读写线程,一个用于更新数据,另一个用于监控变量 num 的变化情况并打印输出。

完整的测试代码如下,分别执行 num 在没有使用 volatile 修饰与使用 volatile 修饰的情况。

public class VolatileExample {

    private volatile static int num = 0;
   
    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            int local = num;
            while (local < 3) {
                // 监控 num 值变化,记录最新值
                if (num != local) {
                    System.out.println("receive change: " + num);
                    local = num;
                }
            }
        }, "Reader").start();

        new Thread(() -> {
            int local = num;
            while (local < 3) {
                // 修改 num 值
                System.out.println("change to: " + ++local);
                num = local;

                try {
                    // 休眠使 Reader 获取变化
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "Updater").start();
        // 等待线程结束
        TimeUnit.SECONDS.sleep(10);
    }
}

在未使用 volatile 的情况下,当 Updater 更新 num 的值并不是立刻写回主存,导致 Reader 未能读的数据仍是变更之前的变量,因此在判断 num != local 条件时永远是 false,最后打印的信息中 update 也因此缺失。

// 未使用 volatile
change to: 1
receive change: 1
change to: 2
change to: 3

而在使用 volatile 的情况下,因为实现了 num 变量的多线程可见性,Reader 线程也就能够实现监控 num 变量的实际变更状况。

// 使用 volatile
change to: 1
receive change: 1
change to: 2
receive change: 2
change to: 3
receive change: 3

5. 应用场景

volatile 通常用于多线程下的状态监控记录,最常见的即搭配锁实现单例模式。

如下实例代码中通过 volatilesynchronized 搭配实现了简易的单例模式,在多线程的情况下通过 volatile 保证了 instance 实例的可见性,同时利用 synchronized 保证对象不会被重复初始化。

主要注意这里在 synchronized 的同步块中需要二次判断实例对象,假设在并发情况下若 线程A 成功拿到锁并进行实例化,在此过程中其它线程正处于自旋取锁状态,当 线程A 完成对象实例化并通过 volatile 关键字将数据回写主存并释放锁之后,之前自旋状态的线程将会取得锁并执行同步块内容,若不进行二次判断则会造成重复实例化,且因为实例声明为 volatile 因此判断结果将为 false 退出同步块返回已经完成实例的对象。

public class Singleton {

    private volatile static Singleton instance = null;

    public static Singleton getInstance() {
        // 实例为空则获取锁
        if (instance == null) {
            synchronized (Singleton.class) {
                // synchronized 防止多线程同时初始化实例
                if (instance == null)
                    instance = new Singleton();
            }
        }
        return instance;
    }
}

三、ThreadLocal

ThreadLocal 正如名称所示,是每个线程单独的存储的空间,通常用于存储线程中的某些状态值。不同线程之间的 ThreadLocal 变量都独立存在无法相互通讯,子线程会继承父线程的 ThreadLocal 变量。

1. 基本操作

注意 ThreadLocal 在使用完成之后一定要通过 remove() 方法回收,因为 ThreadLocal 变量会随着线程的周期终止而回收,但在涉及到线程池时,每个线程在完成任务之后并返回线程池后不一定会被销毁而是进入空闲状态,意味着创建的 ThreadLocal 也将一直存在无法被垃圾回收,从而导致内存泄漏,因此在设置 ThreadLocal 操作时建议配合 try catch 并在 finally 中执行 remove() 释放。

方法 作用
ThreadLocal.withInitial() 在定义时为 ThreadLocal 设置默认值。
get() 为当前线程创建一个存储对象。
set() 修改当前线程其本地对象的值。
remove() 释放当前线程其本地对象。

2. 实现机制

首先先了解一下 ThreadLocal 的内部结构,其包含内部类 ThreadLocalMap,而其又包含 Entry 内部类并继承于 WeakReference

对象 Entryvalue 字段即为 ThreadLocal 保存的数据,之所以要继承 WeakReference 原因在于在 JavaWeakReference 创建的对象当其引用为空时,在下一轮 GC 后则会被回收,从而降低内存泄漏发生的可能。

ThreadLocal 的内部实现相对简单,当执行 set() 存储本地线程数据时,先获取当前线程,若 Entry 对象未创建则先实例化并存入数据,其 get()remove() 方法类似此时不详细展开。

public class ThreadLocal<T> {

    ThreadLocal.ThreadLocalMap threadLocals = null;
    
    public void set(T value) {
        // 获取当前线程
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // entry 存在则存入数据
            map.set(this, value);
        } else {
            // 不存在则实例化
            createMap(t, value);
        }
    }

    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
}

3. 示例介绍

在下述的示例中,我们创建了一个 ThreadLocal<Integer> 对象,该对象可以存储 Integer 类型的值,当在不同的线程中通过 get() 方法将会为每个线程初始化一份单独的副本。接着我们创建两个子线程,并在每个子线程中通过 threadLocal.get()threadLocal.set() 方法获取值与设置值。

运行示例可以看到每个线程都有自己独立的值,并不会相互干扰,两个子线程各自输出的其设置的哈希值。

public class ThreadLocalExample {
    private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        // 创建两个子线程
        Thread t1 = new Thread(new MyRunnable());
        Thread t2 = new Thread(new MyRunnable());
        // 启动子线程
        t1.start();
        t2.start();
        try {
            // 等待子线程执行完毕
            t1.join();
            t2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class MyRunnable implements Runnable {
        @Override
        public void run() {
            // 在子线程中获取值
            System.out.println("Thread value: " + threadLocal.get());
            // 在子线程中设置新值
            threadLocal.set(Thread.currentThread().hashCode());
            // 再次获取值
            System.out.println("Thread value after set: " + threadLocal.get());
        }
    }
}

4. 应用场景

当在并发操作中涉及非线程安全操作时,通常即使用 ThreadLocal 避免非线程安全对象的并发操作。

Java 中时间格式化 SimpleDateFormat 对象是非线程安全的,当多线程并发操作时将会抛出异常,此时即可通过 ThreadLocal 为每个线程定义单独的 SimpleDateFormat 对象。

如下示例中定义了容量为 5 的线程池模拟并发,并在每个线程中通过 SimpleDateFormat 实现时间格式化,若在 for 循环上一步定义公共的 SimpleDateFormat 对象并在每个子线程调用其实现时间转为将会抛出异常。

public class SafeDateFormatTest {
    
    private static final ThreadLocal<DateFormat> dateFormatThreadLocal = ThreadLocal.withInitial(() -> {
        return new SimpleDateFormat("yyyy-MM-dd");
    });

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        List<Future<Date>> results = new ArrayList<Future<Date>>();

        // perform 10 date conversions
        for (int i = 0; i < 10; i++) {
            results.add(threadPool.submit(() -> {
                Date date;
                try {
                    date = dateFormatThreadLocal.get().parse("2023-01-01");
                } catch (ParseException e) {
                    throw new RuntimeException(e);
                }
                return date;
            }));
        }
        threadPool.shutdown();

        // look at the results
        for (Future<Date> result : results) {
            System.out.println(result.get());
        }
    }
}

文章作者: 烽火戏诸诸诸侯
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 烽火戏诸诸诸侯 !
  目录