提到线程,肯定绕不开一个话题那就是锁,可以说没有线程锁整个线程世界将分崩离析。
线程锁的家族或许很庞大初看吓人一跳,但只要抓住本质其它一切都柳暗花明。所谓锁,其最为核心的一点即对资源的管控,不管锁怎么分类又或是如何演变,只要记住一个点:资源的控制。
就像饭店排队一样,每个桌子就是一个可访问资源,而每个食客就为一个线程,每个桌子就只能允许同样服务一位顾客,如果多名食客想要公用一张桌子,显然就会出现问题。而线程锁,即可以理解为服务员,保证每个桌子与食客之间的井然有序。
今天,就让我们学习一下
Java
中常用的线程锁
一、锁的介绍
1. 基本分类
Java
线程锁的分类如果进行细分的话是非常多的,这里仅对常遇到的一些分类简要介绍,注意下面分类之间并不是互斥的。
乐观锁
乐观锁顾名思义就是以更宽松的方式进行加锁,认为别的线程在持有锁的时候不进行值变化只做读取,因此允许多个对象同时持有锁,同伴版本号机制控制实现一致性。
悲观锁
悲观锁与乐观锁则恰好相反,任务任何进程在持有锁都可能对变量进行更改,因此在持锁期间其它进程均无法获取同一个锁实例。
自旋锁
自旋锁即当锁实例被其它进程获取而当前进程不断重复尝试获取锁(阻塞)的情况,则称此类行为是自旋锁。
公平锁
公平锁即当锁被进程占用时,其它进程若需要获取锁则进入排队等待,采取先到先得的分配机制,如通过
new ReentrantLock(true)
创建的即为公平锁。非公平锁
非公平锁即当锁被进程持有结束后,其它进程若需要获取锁则会随机进行分配,此机制可能导致线程饥饿问题,即最先请求的进程却一直无法获取锁,常见的
synchronized
与ReentrantLock
皆为非公平锁。可重入锁
可重入锁即一个锁实例可以被同一个进程重复获取,如
ReentrantLock
就是典型的可重入锁。不可重入锁
不可重入锁即锁实例无法被同一进程重复的获取,
synchronized
自身为可重入锁,但搭配计数器(CountdownLatch)
即可实现一个不可重入效果。
2. 锁的意义
在单线程中我们很清楚的知道哪个线程在操作变量,但在多线程中显然这一切就不那么明朗了,因为其有一个明显的问题,那就是变量冲突。
举个简单例子,程序中一共存在两个线程,分别需要针对变量 count
做 1000
次自增与自减,理论上当程序执行完毕后 count
的结果值应该为 0
,示例代码如下:
public MyTest {
private int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count += 1;
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
count -= 1;
}
});
t1.start();
t2.start();
// 阻塞线程
t1.join();
t2.join();
// 输出结果
System.out.println(count);
}
}
但执行上述程序却发现每次输出得到的结果却并非为 0
且每次结果都不相同,这其实是因为 Java
中的一个加减乘除在底层编译时其对应的步骤不止一步。
如最常用的自增 i = i + 1
,在底层编译实现时其对应着三个步骤: 读取
-> 增加
-> 写入
,而这三步中的任意一步出现问题都会导致最终的结果异常。回到上面的代码中,当 线程 1
进行自增操作时,实际上可能只做了 读取 增加
,在完成写入之前 线程 2
却获取变量实例完成自减导致自增 写入
失败,结果就是经过一次自增自减结果却为 -1
。
二、synchronized
1. 基本介绍
synchronized
是最基础的锁对象之一,通过在方法前添加 synchronized
可隐式锁住 this
关键字,使得当其它进程想要访问变量时则无法获取,只有在同步块代码执行结束后自动释放的锁其它进程才能再次获取,从而达到原子性的目的。
改造之前提到的累加器示例代码得到下述结果,运行输出的结果为 0
符合预期。
public MyTest {
private int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
synchronized(this) { // 加锁
synCount += 1;
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
synchronized(this) { // 加锁
synCount -= 1;
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
// 输出 0
System.out.println(count);
}
}
2. 锁的优化
在使用 synchronized(this)
加锁时其针对目标为类层级,其相对较重也更耗费性能,当频繁使用为了更好的性能应该通过成员变量替换,如 JDK
中 Collections.synchronizedList()
即利用此方式实现线程安全集合。
再次改造上述的示例得到下述代码,其实现效果一致但相对的系统开销却更小。
public MyTest {
private int count = 0;
private Object object = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
synchronized(object) { // 加锁
synCount += 1;
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
synchronized(object) { // 加锁
synCount -= 1;
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(count);
}
}
3. 锁的升级
在 JDK1.8
中 JVM
会根据使用情况对 synchronized
的锁进行升级,它大体可以按照下面的路径:偏向锁
-> 轻量级锁
-> 重量级锁
。锁只能升级不能降级,所以一旦升级为重量级锁就只能依靠操作系统进行调度。
锁升级过程中关系最大的就是对象头里的 MarkWord
,它包含 Thread ID
、 Age
、 Biased
、 Tag
四个部分。其中 Biased
有 1bit
大小,Tag
有 2bit
,锁升级即依据 Thread Id
、 Biased
、 Tag
三个变量值进行。
(1) 偏向锁
在只有一个线程使用了锁的情况下,偏向锁能够保证更高的效率。
当第一个线程第一次访问同步块时,会先检测对象头 Mark Word
中的标志位 Tag
是否为 01
,以此判断此时对象锁是否处于无锁状态或者偏向锁状态(匿名偏向锁)。01
也是锁默认的状态,线程一旦获取了这把锁,就会把自己的线程 ID
写到 MarkWord
中,在其他线程来获取这把锁之前,锁都处于偏向锁状态。
(2) 轻量级锁
当下一个线程参与到偏向锁竞争时,会先判断 MarkWord
中保存的线程 ID
是否与这个线程 ID
相等,如果不相等,会立即撤销偏向锁,升级为轻量级锁。
参与竞争的每个线程,会在自己的线程栈中生成一个 LockRecord(LR)
,然后每个线程通过自旋 (CAS)
的方式(即不断的尝试取锁),将锁对象头中的 MarkWord
设置为指向自己的 LR
的指针,哪个线程设置成功,就意味着哪个线程获得锁。
当锁处于轻量级锁的状态时,就不能够再通过简单的对比 Tag
的值进行判断,每次对锁的获取,都需要通过自旋。当然自旋也是面向不存在锁竞争的场景,比如一个线程运行完了,另外一个线程去获取这把锁,但如果自旋失败达到一定的次数,锁就会膨胀为重量级锁。
(3) 重量级锁
重量级锁即为我们对 synchronized
的直观认识,这种情况下线程会挂起,进入到操作系统内核态等待操作系统的调度,然后再映射回用户态。系统调用是昂贵的,重量级锁的名称由此而来。
如果系统的共享变量竞争非常激烈,锁会迅速膨胀到重量级锁,这些优化就名存实亡。如果并发非常严重,可以通过参数 -XX:-UseBiasedLocking
禁用偏向锁,理论上会有一些性能提升,但实际上并不确定。
三、ReentrantLock
1. 源码剖析
在 Java
中提供了 ReentrantLock
实现更轻量化的加锁操作,其核心在于通过 CAS
操作记录线程锁状态。
首先先看一下 ReentrantLock
的类结构示意,其包含三个内部类 Sync
,FairSync
与 NonfairSync
,后两者正是 ReentrantLock
公平锁与非公平锁的实例,默认创建的为非公平锁。
当初始化 ReentrantLock
时默认创建非公平锁,同时也提供构造器创建公平锁。
以非公平锁 NonfairSync
为例,当执行加锁 lock()
时将调用 initialTryLock()
方法。
这里我把涉及到的源码拎出来,当线程调用 lock()
加锁执行 initialTryLock()
时,会调用 Unsafe
类的 compareAndSetInt()
方法更新变量 state
的值。变量 state
通过 volatile
保证了线程可见性,又利用 Unsafe
的 CAS
操作保证了操作了原子性,实现了线程安全的状态变更。
在同一线程的情况下,当第一次加锁时 CAS
方法 compareAndSetState()
会将 state
的值改为 1
并记录下锁的线程拥有者,而后续若再次通过 lock
加锁,由于 state
的值已经更改为 1
,因此 compareAndSetState(0, 1)
将返回 false
,且在同一线程内因此 state
将累加并记录更新。
在不同线程情况下,当 线程一
加锁并将 state
的值更新为 1
后,线程二
再次执行加锁操作由于 state
的值不为 0
因此 compareAndSetState(0, 1)
将返回 false
,又因此时处于不同线程所以 initialTryLock()
将会返回 false
,因此将会进入 lock()
方法中的 acquire(1)
中不断循环尝试取锁。
public class ReentrantLock implements Lock, java.io.Serializable {
static final class NonfairSync extends Sync {
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt is unguarded
// 记录 state 的拥有者线程
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
// 同一线程内多次加锁则 state 累加并记录更新
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
// 执行此表示 state 已被其他线程更新
// 即当前锁对象已经被其他线程所持有
return false;
}
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// volatile 保证线程可见性
private volatile int state;
// 获取 Unsafe 实例
private static final Unsafe U = Unsafe.getUnsafe();
// 获取 state 变量的内存偏移量
private static final long STATE
= U.objectFieldOffset(AbstractQueuedSynchronizer.class, "state");
// CAS 更新 state 的值,保证操作原子性
// 返回 true: expect == STATE
// 返回 false: expect != STATE
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSetInt(this, STATE, expect, update);
}
}
2. 使用示例
我们知道一个锁只能同时被一个线程获取,因此当一个锁被其它线程持有时,使用 synchronized
将会一直尝试取锁直至获取锁对象,一旦逻辑异常极易出现死循环。而 ReentrantLock
除了基础加锁 lock()
外提供 tryLock()
方法可设置取锁超时时间,到期取锁失败将退出阻塞继续向后执行。
(1) lock()
通过 lock()
即可实现加锁操作,同一个线程对象只能持有一把锁,注意当使用 lock()
执行加锁操作若失败则会一直尝试取锁,导致程序陷入阻塞无法执行后续代码。
在使用时建议将代码块置于 try catch
中,并在 finally
中通过 unlock()
释放锁,防止程序异常导致未正常释放锁从而引发死锁问题。
public void demo1() throws Exception {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
// 加锁
lock.lock();
try {
sleep(10 * 1000);
} finally {
// 一定要释放锁
lock.unlock();
}
});
Thread t2 = new Thread(() -> {
// 锁被线程 1 持有无法获取
lock.lock();
System.out.println("get lock");
});
t1.start();
Thread.sleep(500);
t2.start();
t1.join();
t2.join();
}
(2) tryLock()
tryLock()
作用与 lock()
类似,但其可以设置取锁超时时间,在指定时间内若没有取到锁将继续返回 false
并继续执行剩余代码。
如在下述示例中 t2
在尝试加锁两秒后若未能加锁成功将会返回 false
并继续执行其后代码,若将其替换为 lock()
方法则其将进入死锁循环,导致程序资源的浪费。
public void demo2() throws Exception {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
// 加锁, 但不释放锁
lock.lock();
});
Thread t2 = new Thread(() -> {
try {
// 尝试加锁,两秒后未取锁继续执行后续内容
boolean isLock = lock.tryLock(2, TimeUnit.SECONDS);
System.out.println("carry on...");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
Thread.sleep(500);
t2.start();
t1.join();
t2.join();
}
四、ReadWriteLock
上述的两类锁虽然能够实现原子性,但一刀切的锁显然在效率上会略到折扣,因此 Java
中提供了一系列读写锁, ReadWriteLock
即为其中之一。
在查看需求大于修改的业务下,如针对论坛等功能,允许多用户同时查看但禁止多用户同时修改,通过读写锁将互斥逻辑实现剥离,实现更细粒度的并发控制,在保证线程安全的情况下实现不错的性能表现。
1. 读写特性
ReadWriteLock
中分为 读锁
和 写锁
,其特性如下:
读锁
之间是不互斥的,即读锁
可同时被多个线程持有。写锁
之间也是互斥的,在同一时刻最多仅有一个线程能持有写锁
。读锁
和写锁
之间是互斥的,即读锁
和写锁
不能同时持有,二者有且仅有一个处于持有状态。
2. 初始实例
ReadWriteLock
初始化方式如下所示,通过 ReentrantReadWriteLock
实例从而获取对应的读写锁。
public void init() {
// 声明一个读写锁
ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 读锁,允许多线程持有
Lock rLock = rwLock.readLock();
// 写锁,仅允许单线程持有
Lock wLock = rwLock.writeLock();
}
3. 同步示例
了解的读写锁的基本概念之后通过简单的示例更深入明白其作用。
为了方便后续测试这里分别定义了一个 读线程
与 写线程
,并通过休眠操作模拟程序耗时。
class ReadThread extends Thread {
private String name;
public ReadThread(String name){
this.name = name;
}
@Override
public void run() {
rLock.lock();
try {
System.out.println(name + " do read.");
// Simulate program timing
TimeUnit.SECONDS.sleep(3);
System.out.println(name + " sleep over.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
}
}
class WriteThread extends Thread {
private String name;
public WriteThread(String name){
this.name = name;
}
@Override
public void run() {
wLock.lock();
try {
System.out.println(name + " do write.");
// Simulate program timing
TimeUnit.SECONDS.sleep(3);
System.out.println(name + " sleep over.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
wLock.unlock();
}
}
}
根据上面声明的读写线程,下面分别提供了 双读锁
、 一读一写
以及 双写锁
三个示例。
具体的测试代码如下:
/**
* 双读锁示例,不互斥
*/
public void demo1() throws InterruptedException {
Thread reader1 = new ReadThread("Reader-1");
Thread reader2 = new ReadThread("Reader-2");
// Start thread
reader1.start();
TimeUnit.MILLISECONDS.sleep(200);
reader2.start();
// waiting for finish.
reader1.join();
reader2.join();
}
/**
* 一读一写,互斥
*/
public void demo2() throws InterruptedException {
Thread reader1 = new ReadThread("Reader-1");
Thread writer1 = new WriteThread("Writer-1");
// Start thread
reader1.start();
TimeUnit.MILLISECONDS.sleep(200);
writer1.start();
// waiting for finish.
reader1.join();
writer1.join();
}
/**
* 双写锁,互斥
*/
public void demo3() throws InterruptedException {
Thread writer1 = new WriteThread("Writer-1");
Thread writer2 = new WriteThread("Writer-2");
// Start thread
writer1.start();
TimeUnit.MILLISECONDS.sleep(200);
writer2.start();
// waiting for finish.
writer1.join();
writer2.join();
}
运行上述程序可以看到 demo1
中的读锁两个线程读取操作基本是在同时触发,而 demo2
在读线程在运行时写线程无法取到锁,只有在读线程退出释放锁后写线程才被触发。同理 demo3
也是在写线程释放锁后第二个写线程才被触发。
根据结果也验证了之前提出的结论,读锁之间允许共存,但读锁与写锁互斥,写锁之前同样互斥。
// demo1
Reader-1 do read.
Reader-2 do read.
Reader-1 sleep over.
Reader-2 sleep over.
// demo2
Reader-1 do read.
Reader-1 sleep over.
Writer-1 do write.
Writer-1 sleep over.
// demo3
Writer-1 do write.
Writer-1 sleep over.
Writer-2 do write.
Writer-2 sleep over.
五、StampedLock
StampedLock
同样分为读写锁,但其为乐观锁的一种,通过为锁提供版本号进行区别不同操作。
1. 悲观锁
StampedLock
中的 readLock()
与 writeLock()
即为悲观锁,具体效果与 ReentrantReadWriteLock
类似,这里不作详细介绍,具体方法参考下表。
方法 | 作用 |
---|---|
readLock() | 获取一个读锁,返回版本号用于释放锁或升级锁。 |
isReadLocked() | 判断当前是否存在读锁。 |
getReadLockCount() | 获取当前激活的读锁数量。 |
unlockRead() | 根据传入的版本号释放对应的读锁。 |
writeLock() | 获取一个写锁,返回版本号用于释放锁或升级锁。 |
isWriteLocked() | 判断当前是否存在写锁。 |
unlockWrite() | 根据传入的版本号释放对应的写锁。 |
2. 乐观锁
通过 tryOptimisticRead()
可返回一个版本号,注意此时并没有仍是无锁状态,通过 validate(stamp)
方法可以验证此时是否有写入发生即是否有写锁处于持有状态,可根据结果设计不同的处理逻辑
常用的接口方法与其对应的描述信息参考下表。
方法 | 作用 |
---|---|
tryOptimisticRead() | 乐观读,返回一个版本号。 |
validate() | 判断对应版本的获取之后是否获取过写锁。 |
tryConvertToReadLock() | 尝试将当前锁升级为读锁,返回 0 表示失败。 |
tryConvertToWriteLock() | 尝试将当前锁升级为写锁,返回 0 表示失败。 |
unlock() | 释放对应版本号的所有锁(读锁与写锁)。 |
如下即为由乐观锁升级为悲观锁的示例,其中 WriteThread
为获取写锁 writeLock
操作这里略去具体内容。
public class RwTest {
private static long stamp = 1L;
private static final StampedLock stampedLock = new StampedLock();
@Test
public void optimisticDemo() throws InterruptedException {
// Start write thread
Thread write = new WriteThread();
write.setName("Writer-1");
write.start();
TimeUnit.MILLISECONDS.sleep(100);
// Optimistic read
stamp = stampedLock.tryOptimisticRead();
try {
// 验证是否发生写操作
if (stampedLock.validate(stamp)) {
// 若是从乐观读升级为悲观读
stamp = stampedLock.tryConvertToReadLock(stamp);
if (stamp != 0L) {
// convert success
System.out.println("Is read lock: " + stampedLock.isReadLocked());
System.out.println("Convert read success, do read.");
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
stampedLock.unlock(stamp);
}
}
}
六、RedissonLock
在实际生产开发中,为了实现高可用我们通过将服务通过集群部署在不同节点上,而此时用户的单个操作可能就会被集群中的多个节点重复执行。针对这种情况,上面的两类锁显然就不太适用,因为二者都是针对单节点的服务而言,因此这时候就需要用到分布式锁。
用简单一点的大白话来讲,可以把整个集群环境当成一个主线程,而每个节点就是一个线程,分布式锁就是用于控制不同节点间的操作原子性。
1. 依赖导入
RedissonLock
是一款基于 Redis
实现的分布式锁,因此需要导入对应的依赖。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.5</version>
</dependency>
2. 服务连接
下面详细介绍一下其基本使用,由于是基于 Redis
实现所以需要提前创建服务连接。
public void connect() {
Config config = new Config();
config.useSingleServer()
// Redis 服务地址
.setAddress("redis://127.0.0.1:6379")
// 设置密码
.setPassword("123456")
// 设置存储数据库
.setDatabase(2);
// 创建连接
redissonClient = Redisson.create(config);
}
3. 基本使用
RedissonLock
使用方式与 ReentrantLock
其实基本一致,不同点在于前者会将锁对象存储于 Redis
内存中,这里就不再具体举例,仅介绍一下其常用方法。
需要注意一点如果在加锁时未设置过期时间则默认为 30s
。
public void infoDemo() throws InterruptedException {
RLock lock = redissonClient.getLock("test");
// 加锁
lock.lock();
// 加锁,并指定超时时间
lock.lock(6000, TimeUnit.MILLISECONDS);
// 异步加锁
RFuture<Void> future = lock.lockAsync();
// 异步加锁, 指定锁超时时长
future = lock.lockAsync(6000, TimeUnit.MILLISECONDS);
// tryLock() 可以设置等待时间
boolean isL1 = lock.tryLock();
System.out.println("tryLock: " + isL1);
// 尝试获取锁 waitTime: 等待时间; leaseTime: 锁过期时间
// 在 5 秒内未取到锁返回 false, 取到锁则设置锁过期为 6 秒
boolean isL2 = lock.tryLock(5 * 1000, 6 * 1000, TimeUnit.MILLISECONDS);
System.out.println("tryLock: " + isL2);
// 解锁
lock.unlock();
// 是否加锁
System.out.println("is locked: " + lock.isLocked());
// 加锁次数
System.out.println("lock count: " + lock.getHoldCount());
}
4. 看门狗机制
RedissonLock
默认锁的过期时间为 30s
,如果在任务耗时超过 30s
则会出现提前释放锁的情况,但若手动设置过期时间,有时并无法准确估计任务耗时,又可能存在持锁时间过长造成资源浪费的情况。针对这种情况, RedissonLock
引入了看门狗机制 watchDog
,看门狗程序会固定间隔持锁情况,若锁仍在持有状态将自动续期加锁时间,从而实现动态加锁时长。
默认的看门狗时间为 30s
,即每隔 10s
会读取当前锁的状态,若锁仍为持有状态则自动延长加锁时间 30s
。当然你也可以在创建连接时通过 setLockWatchdogTimeout()
指定看门狗程序时间,如设置为 60s
则每隔 60/3=20s
会检查锁的状态,若锁未释放则延长 60s
。
下面看一个具体示例,在 线程 1
中创建锁时并没有指定过期时间,因此默认过期时间为 30s
,但是因为看门狗机制将会每隔 10s
读取一次锁状态,这里通过 sleep()
模拟任务耗时 60s
,因此当 30s
后锁过期时间又会自动续期到 30s
,因此 线程 2
将无法获取锁。只有当休眠结束后释放锁看门狗机制失效后 线程 2
才能获取锁。
public void watchDogDemo() throws InterruptedException {
RLock lock = redissonClient.getLock("test");
Thread t1 = new Thread(() -> {
// 不指定时间,默认 30s 过期
lock.lock();
try {
// 模拟任务耗时,看门狗会自动给续期
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 任务结束释放锁
lock.unlock();
}
}).start();
// 间隔启动第二个线程
Thread.sleep(500);
Thread t2 = new Thread(() -> {
// 无法拿到锁
lock.lock();
try {
System.out.println("get lock");
} finally {
// 任务结束释放锁
lock.unlock();
}
}).start();
}
七、线程工具
1. CountDownLatch
CountDownLatch
可以理解为计数器,通过巧妙的应用它我们即可控制线程的流程。
(1) 初始化
计数器的 await()
方法会判断当前计数器值是否为 0
,若否则会进入阻塞状态。
public void demo() {
// 初始化值为 1 的计数器
CountDownLatch latch = new CountDownLatch(1);
// 计数器减 1
latch.countDown();
// 获取计数器大小
latch.getCount()
try {
// 等待计数器归零, 进入阻塞
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
}
(2) 并发模拟
通过 CountDownLatch
可以快速实现并发事件的任务模拟,具体示例如下:
public void concurrentDemo() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
int finalI = i;
new Thread(() -> {
try {
// 阻塞,等待计数器归 0 模拟并发
latch.await();
// 需进行的并发操作
System.out.println(finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 计数器递减
latch.countDown();
TimeUnit.SECONDS.sleep(1);
}
}
2. CyclicBarrier
CyclicBarrier
又译为栅栏,即可以实现批量的线程控制,当一组线程到达都到达该临界状态的时候触发。
(1) 初始化
通过 await()
方法实现线程阻塞的效果,当指定数量的线程进入该阻塞状态则触发 CyclicBarrier
中定义的事件,注意当该事件抛出异常时将会在 await()
处抛出 BrokenBarrierException
异常。
public void demo1() {
// 当指定数量的线程到达 await 状态则触发事件
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// 定义事件
System.out.println("\nAll threads have reached the barrier!\n");
});
for (int i = 0; i <= 3; i++) {
new Thread(() -> {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " has reached the barrier.");
try {
// 进入阻塞,等待其它线程到达 await 状态
cyclicBarrier.await();
} catch (Exception e) {
// 若 CyclicBarrier 事件抛出异常则会抛到此处
e.printStackTrace();
}
System.out.println(threadName + " has continued executing.");
}, "Thread-" + i).start();
}
}
(2) 同步示例
了解了基本使用下面来看一个更贴合实际的案例。
通过 Random
库模拟生成了十万个随机数并实现求和的目的,为了达到更高效的目的这里将数组集合拆分为 3
个子数组并分别求和最后将 3
个任务结果相加即可。当每个子任务计算完成后通过 await()
进入阻塞,等待其它子任务完成计算,而当 3
个子任务都计算完成进入阻塞则触发 CyclicBarrier
的最后累加,将三个任务的结果相加得到最终结果。
public void demo2() throws InterruptedException {
List<Integer> list = new ArrayList<>();
for (int i = 1; i < 100000; i++) {
list.add(new Random().nextInt(100));
}
// 将数组拆分为 3 个子任务
List<List<Integer>> subLists = new ArrayList<>();
subLists.add(list.subList(0, 30000));
subLists.add(list.subList(30000, 70000));
subLists.add(list.subList(70000, 99999));
// 设置栅栏触发事件
List<Integer> result = new ArrayList<>();
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
AtomicInteger total = new AtomicInteger(0);
result.forEach(it -> {
total.getAndUpdate(i -> i + it);
});
System.out.println("\nAll task finish, result is : " + total);
});
// 提交计算子任务
for (int i = 0; i < 3; i++) {
int batch = i;
new Thread(() -> {
AtomicInteger sum = new AtomicInteger(0);
subLists.get(batch).forEach(sum::getAndAdd);
result.add(sum.get());
try {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " calculate finish, wait other task done.");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}, "Thread-" + i).start();
}
TimeUnit.SECONDS.sleep(5);
}
3. Semaphore
Semaphore
能够实现限制线程访问数量的效果,可以粗暴的理解为一个阻塞的线程队列,只有指定数量的线程能同时访问资源,到达阈值后将陷入阻塞。
Semaphore
作用类似于普通的线程锁都是用于限制并发访问,不同是 Semaphore
允许被多线程同时持有。
(1) 初始化
Semaphore
同样提供 acquire()
与 tryAcquire()
两种方式访问资源。
public void demo1() throws InterruptedException {
// 初始化并指定并发大小
Semaphore semaphore = new Semaphore(3);
// 请求访问,并发数满则阻塞
semaphore.acquire();
// 请求访问,失败不会阻塞
boolean b1 = semaphore.tryAcquire();
boolean b2 = semaphore.tryAcquire(3, TimeUnit.SECONDS);
try {
System.out.println(UUID.randomUUID());
} finally {
// 释放资源
semaphore.release();
}
}
(2) 同步示例
我们知道常见的 IO
是阻塞的,因此在文件下载等业务场景下想实现多任务下载就必须通过多线程方式,但若没有合理限制并发数将导致系统资源被大量占用。这种情况可以通过创建一个指定大小的 IO
专用线程池资源来实现,但往往一个系统中还有其它服务需要使用线程如果为每个模块创建单独的线程池显然是不合适且耗费资源的。
Semaphore
则完美解决这个问题,通过其即可限制同一资源的线程并发数,下面看个例子让我们更好的理解:
/**
* 初始化大小为 3,即只能同时 3 个任务并发
*/
private static final Semaphore semaphore = new Semaphore(3);
public void demo2() throws InterruptedException {
for (int i = 0; i < 5; i++) {
new MyThread().start();
}
// 等待主线程中子任务结束
TimeUnit.SECONDS.sleep(25);
}
static class MyThread extends Thread {
@Override
public void run() {
try {
// 达到指定数量访问将阻塞
semaphore.acquire();
try {
System.out.println(UUID.randomUUID());
// 模拟耗时
TimeUnit.SECONDS.sleep(3);
} finally {
// 一定要释放!
semaphore.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
八、线程死锁
1. 死锁介绍
线程死锁顾名思义即两个线程相互等待对方释放锁对象导致两个线程都进入无限的循环等待中,在多线程任务中如果没有设计好恰当的锁的获取与释放,就十分容易出现死锁问题。
举个简单例子,user-1
向 user-2
进行转账,为了保证数据的一致性即不出现一方扣款一方却未收到的情况,在转账操作中需要同时对两个对象进行加锁操作,即对 user-1
加锁后再对 user-2
加锁,然后执行具体的转账业务操作。
将上述的文字逻辑转化为对应的代码后如下:
public void transfer(User fromUser, User toUser, double amount) {
synchronized(toUser) {
synchronized(fromUser) {
// 扣除来源用户余额
fromUser.debit(amount);
// 增加目标用户余额
toUser.credit(amount)
}
}
}
在上述的示例中,当同时存在 user-1
向 user-2
转账与 user-2
向 user-1
转账两个线程时,由于正好作用顺序相反,两个线程在上述代码中的第一个 synchronized
都加锁成功,此时 user-1
与 user-2
都将进入被锁状态。而到了第二个 synchronized
时,线程一持有 user-2
的锁并等待线程二释放 user-1
的锁,线程二持有 user-1
的锁等待线程一释放 user-2
的锁,二者都在等待对方解锁从而导致死锁状态。
2. 解决方案
想要规避死锁问题有两个常用的方式,第一种就是确定锁的执行顺序,第二种则是通过 ReentrantLock
的 tryLock()
等非阻塞加锁方式。
第二种方式较为简单,通过 tryLock()
方法实现超时退出从而规避死锁问题,当然弊端就是会导致其中一个任务执行失败。
下面着重对方式一的实现思路解析。
方式一的核心在于确定流程的加锁顺序,对于同一组操作要保证加锁顺序的一致性。如之前 user-1
向 user-2
与 user-2
向 user-1
转账的示例中,因保证其无论如何触发都保证先对对 user-1
加锁再对 user-2
加锁,或者顺序倒置,但重点的是要保证应用每次调用都使用这个顺序。
稍微修改上述的示例,通过计算对象的哈希值进行比对从而保证加锁顺序的一致,通过引入第三个加锁对象用于防止两个对象 hashCode()
一致的情况,相应的代码示例如下:
public final Object obj = new Object();
public void transfer(User fromUser, User toUser, double amount) {
int fromId = fromUser.hashCode();
int toId = toUser.hashCode();
if(fromId < toId) {
synchronized(toUser) {
synchronized(fromUser) {
fromUser.debit(amount);
toUser.credit(amount)
}
}
} else if (fromId > toId) {
synchronized(fromUser) {
synchronized(toUser) {
fromUser.debit(amount);
toUser.credit(amount)
}
}
} else {
synchronized(obj) {
synchronized(fromUser) {
synchronized(toUser) {
fromUser.debit(amount);
toUser.credit(amount)
}
}
}
}
}
参考链接: