在系统设计中,通常需面对着一个相对棘手的问题:同一数据的多端访问,即我们常说的数据并发。
通俗一点来讲,当同一份数据在同一时候被多端访问或修改时,其最终态将不可预测。由此衍生,针对此类场景有着丰富的解决方式,稍后让我们来一一盘点。
一、多端并发
对于并发问题,线程锁无疑是最趁手的工具,在以往的博客中分享过线程的基本定义与示例,今天就让我们将其应用于实际场景之中。
1. 业务场景
首先让我们来假定一个业务场景,在系统中包含工单审批模块,审批环节多个用户都拥有审批权限,当用户操作审批之后将更新状态并完结工单。
如果你对业务足够敏感,在上述的的场景中便可发现一个问题,由于审批权限的开放给了多用户,当多个用户针对同一工单操作审批时,若无并发控制则工单终态将不可预测。
如两个用户同时发起操作,其中一个用户同意了工单,而另一用户拒绝了工单,最终工单的状态则可能为二者中的任意其一。这类不确定性对于业务功能而言显然不能接受,正确的效果应该是对其二中的一个用户返回审批中的消息提示等交互。
2. 基本思路
针对上述提到的场景,先让我们来看一个较为经典的解决方案。
造成数据不可预测性的原因即多端访问问题,那所要实现的即限制同一个数据的并发访问,可采取下述步骤:
- 定义缓存容器,用于存储当前进行的事务;
- 用户发起操作时,读取进程中事务后并存入自身提交的事务;
- 执行操作对应的动作,同时比对事务是否操作中,若是则跳过;
对于缓存容器则有着丰富的选择,在单机环境下可选 ConcurrentHashMap
与 SynchronizedSet
等 JDK 自带线程安全容器。而对于集群部署服务,则可以使用 Redis
缓存数据库。
3. 代码实现
下面就将上述的实现方案转化为相应的代码实现。
在缓存容器上,这里选用了 ConcurrentHashMap
实例,由于其线程安全的特性则通过 static
声明为静态变量后便可作为缓存使用。而针对锁则使用 ReentrantLock
,其基于 Unsafe
的 CAS
性能相对于 synchronized
关键字更优秀。
对应完整的实现代码如下:
public class ConcurrencyTest {
private static final ReentrantLock locker = new ReentrantLock();
private static final Map<String, Set<Integer>> map = new ConcurrentHashMap<>();
@Data
@NoArgsConstructor
@AllArgsConstructor
static class ActionTread implements Runnable {
private String key;
private Set<Integer> inputs;
@Override
public void run() {
Set<Integer> failed = new HashSet<>();
Set<Integer> success = new HashSet<>();
Set<Integer> actives;
try {
// 加锁读写缓存
locker.lock();
try {
// 读取激活状态数据
actives = map.values().stream()
.flatMap(Set::stream)
.collect(Collectors.toSet());
// 记录自身提交数据
map.put(key, inputs);
} finally {
locker.unlock();
}
for (Integer ele : inputs) {
// 判断元素是否处理中,是则跳过
if (actives.contains(ele)) {
failed.add(ele);
continue;
}
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
success.add(ele);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 释放缓存
map.remove(key);
}
System.out.printf("Key: {%s}, Failed: {%s}, Success: {%s}%n", key, failed, success);
}
}
}
这里有一点需要注意,虽然 Map
容器为线程安全,但为了实现每个用户都读取都最新的缓存数据,针对缓存数据的读写两个步骤仍然需要加锁保证原子性。
以具体场景举例,user1
提交 [1,2]
后获取锁并读取缓存,此期间 user2
提交了 [2,3]
由于锁已经占用将进入阻塞,当 user1
写入缓存后释放锁,则 user2
成功获取锁并读取最新缓存得到 user1
提交的 [1,2]
,后续则可基于此缓存数据判断处理。
在操作数据锁时注意一点,lock()
需在 try
代码块之外,原因在于若放置于 try
内当获取锁异常时,执行 finally
中的 unlock()
将再次抛出异常。
locker.lock();
try {
// do something
} finally {
locker.unlock();
}
4. 测试用例
完成对应的代码实现后让我们编写相应的用例进行测试。
分别定义三个用户提交了三个集合,集合内容相互交互。在操作提交后,预期的效果即 Beth
提交的 3
被 Alex
占用仅 [4,5]
成功,类似的 Jack
用户仅 [6,7]
成功。
@Test
public void test1() throws InterruptedException {
Thread t1 = new Thread(new ActionTread("Alex", Set.of(1, 2, 3)));
Thread t2 = new Thread(new ActionTread("Beth", Set.of(3, 4, 5)));
Thread t3 = new Thread(new ActionTread("Jack", Set.of(5, 6, 7)));
started(List.of(t1, t2, t3));
// 阻塞主线程查看结果
TimeUnit.SECONDS.sleep(10);
}
private void started(List<Thread> threads) throws InterruptedException {
for (Thread t : threads) {
t.start();
TimeUnit.SECONDS.sleep(1);
}
}
运行上述用例代码,可以看到输出的内容的与所预期效果一致。
Key: {Alex}, Failed: {[]}, Success: {[1, 2, 3]}
Key: {Beth}, Failed: {[3]}, Success: {[4, 5]}
Key: {Jack}, Failed: {[5]}, Success: {[6, 7]}
二、单端并发
1. 场景分析
在刚才的示例中,我们解决了多用户的并发限制,实现了数据的原子性操作。
但在上述的场景下仍遗漏了一个环节,即缓存的管控依据以用户为维度,当同一用户针对同一数据发起连续操作时,限制此时将失效。
如同一用户提交 [1,2,3]
数据后并再次重复操作发起 [1,2,3]
,显然此时第二次将被限制提交实现,但第二次提交失败退出执行 map.remove()
将导致数据的提前释放,若此时用户再次发起 [1,2,3]
将会绕过限制。
2. 代码改造
因此,针对刚才提到的代码进行改造,在缓存容器上换成 SynchronizedSet
容器。
在缓存容器的读写部分,与之前的示例一致仍采用 ReentrantLock
锁方式实现。不同之处在于缓存的记录与释放,在释放缓存时仅移除成功部分,因为失败部分数据表明在线程中正在操作,应由发起方进行资源释放。
另外需注意一点,由于每次只释放成功部分数据,为防止意料外的异常崩溃在 catch
中当捕获程序异常时,仍需按提交内容全量释放,防止资源未正确清空释放。
改造后对应完整的实现代码如下:
public class ConcurrencyTest {
private static final ReentrantLock locker = new ReentrantLock();
private static final Set<Integer> set = Collections.synchronizedSet(new HashSet<>());
@Data
@NoArgsConstructor
@AllArgsConstructor
static class ActionTest implements Runnable {
private String key;
private Set<Integer> inputs;
@Override
public void run() {
Set<Integer> failed = new HashSet<>();
Set<Integer> success = new HashSet<>();
boolean hasException = false;
Set<Integer> actives;
try {
// 加锁读写缓存
locker.lock();
try {
// 复制缓存,切断引用
actives = new HashSet<>(set);
set.addAll(inputs);
} finally {
locker.unlock();
}
for (Integer ele : inputs) {
// 判断元素是否处理中,是则跳过
if (actives.contains(ele)) {
failed.add(ele);
continue;
}
// 模拟业务耗时
TimeUnit.SECONDS.sleep(1);
success.add(ele);
}
} catch (Exception e) {
// 发生异常按提交内容清空
hasException = true;
set.removeAll(inputs);
} finally {
if (!hasException) {
// 仅清除成功内容
set.removeAll(success);
}
}
System.out.printf("Key: {%s}, Failed: {%s}, Success: {%s}%n", key, failed, success);
}
}
}
3. 测试用例
同样的,按照刚才提到的单用户并发编写相应的测试用例代码,内容如下:
@Test
public void test1() throws InterruptedException {
Thread t1 = new Thread(new ActionTread("Alex", Set.of(1, 2, 3)));
Thread t2 = new Thread(new ActionTread("Alex", Set.of(1, 2, 3)));
Thread t3 = new Thread(new ActionTread("Alex", Set.of(1, 2)));
started(List.of(t1, t2, t3));
TimeUnit.SECONDS.sleep(10);
}
运行上述用例代码,可以看到后续两次重复提交都能按照设计被成功拦截。
Key: {Alex}, Failed: {[1, 2, 3]}, Success: {[]}
Key: {Alex}, Failed: {[1, 2]}, Success: {[]}
Key: {Alex}, Failed: {[]}, Success: {[1, 2, 3]}
三、队列应用
1. 思路分析
除了上述提到的基于缓存容器之外,针对并发场景也可利用队列先进先出的特性实现。
首先让我们从根本上来看可以发现数据不可预测是由于并发导致,那么如果从程序上避免并发问题即迎刃而解。
那又应该如何消除并发呢?原理其实并不复杂,通过队列将多端输入内容入列,再由统一的消费者读取队列后执行相应的操作,由此便从流程上消除了并发场景。
2. 数据生产
下面就将对应的方案转为实际代码实现。
同样的,对于数据的读写仍需要涉及 ReentrantLock
保存原子性,由于不再需要缓存容器,这里选择了 JDK
中自带的 SynchronousQueue
阻塞队列实例。倘若在集群环境中,则可替换选择 RabbitMQ
及 Kafka
等常见消息队列作为载体。
当用户提交操作时,由生产者将数据发送至阻塞队列,代码实现如下:
public class ConcurrencyTest {
private static final ReentrantLock locker = new ReentrantLock();
private static final SynchronousQueue<String> queue = new SynchronousQueue<>();
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Producer implements Runnable {
private Set<String> inputs;
@Override
public void run() {
locker.lock();
try {
// 提交内容入队
for (String ele : inputs) {
queue.put(ele);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
locker.unlock();
}
}
}
}
3. 数据消费
在消费者方面,即启动守护线程监听队列数据,当存在数据写入时则由 take()
读取内容,再判断是否元素是否处理过即可。
在这里仍通过 ConcurrentHashMap
记录数据的处理次数,由于此时消费者仅单线程,理论上使用非线程安全容器也可,消费者逻辑代码实现如下:
public class ConcurrencyTest {
private static final Map<String, Integer> map = new ConcurrentHashMap<>();
public static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
try {
// 读取队列
String ele = queue.take();
// 判断是否处理过
Integer count = map.get(ele);
if (Objects.nonNull(count)) {
continue;
}
// 记录数据操作次数
map.merge(ele, 1, Integer::sum);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
4. 测试用例
完成后编写相应的测试用例,通过生产者提交数据,每次提交内容都包含重复数据。
@Test
public void test1() throws InterruptedException {
Thread consumer = new Thread(new Consumer());
consumer.setDaemon(true);
consumer.start();
List<Thread> threads = List.of(
new Thread(new Producer(Set.of("a", "b"))),
new Thread(new Producer(Set.of("b", "c"))),
new Thread(new Producer(Set.of("c", "d")))
);
for (Thread t : threads) {
t.start();
}
TimeUnit.SECONDS.sleep(5);
System.out.println(map);
}
运行程序后可以看到输出内容每个元素计数器仅 1
,对于重复部分能够成功过滤。
{a=1, b=1, c=1, d=1}