并发工具包
并发工具包
内容
14-15Lock和Condition
Lock相比于synchronized解决了破坏死锁中的不可抢占条件。
Lock 用于解决互斥问题,Condition 用于解决同步问题。
如何解决:
- 能够响应中断
- 支持超时
- 支持非阻塞获取锁,获取失败返回获取失败的结果。
1 | public class LockTest { |
可重入锁
解决:防止funA()调用funB()产生死锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class LockTest {
public synchronized void funA(){
System.out.println("funA");
funB();
}
public synchronized void funB(){
System.out.println("funB");
}
public static void main(String[] args) {
// 可重入锁案例
LockTest lockTest = new LockTest();
lockTest.funA();// funA调用funB时,因为锁可冲入,可以调用成功,不会产生死锁
}
}定义:可重入锁指的是在一个线程中可以多次获取同一把锁
实现:每次加锁成功,请求计数器+1,每次解锁,请求计数器-1;第一次加锁时,对对象写入当前线程id;计数器为0时,锁未被占用
公平锁与非公平锁
- 非公平锁
- 优点:
- 缺点
- 公平锁
1 | //ReentrantLock构造方法 |
Lock可见性的实现
内部 volatile state,加锁时,读写status的值;
锁的最佳实践
- 只在更新对象的成员变量时加锁。
- 只在访问可变的成员变量时加锁
- 尽量不在修改别的对象的方法时加锁
Condition
Condition 实现了管程模型里面的条件变量.
两个condition条件实现阻塞队列。
Dubbo实现异步转同步
将TCP层的线程异步通过阻塞线程转为线程同步
16 Semaphore 信号量
解决:
- 并发的互斥和同步问题
- 多个线程同时使用一个数据库连接池、线程池
实现:
- 1个计数器
- 1个等待队列
- 三个方法。 init(),acquire(),release()
使用:
1 | public static void main(String[] args) { |
信号量实现限流器
17 ReadWriteLock
使用场景:读多写少
类似于数据库的读和写。
读写不能升级为写锁
数据同步方案:
- MySQL作为数据源,通过近实时解析binlog识别数据变化
- 数据库和缓存双写
ReadWriteLock实现简单缓存demo
1 | com.xiaoruiit.knowledge.point.javaconcurrent.concurrent.ReadWriteLockTest |
18 StampedLock
支持写锁,悲观读锁,乐观读
不支持重入,不支持条件变量
1 | stampedLock.readLock()// 不支持中断 |
低并发场景性能比ReadWriteLock更好一点
19CountDownLatch和CyclicBarrier 线程步调一致
可使用Future代替,使用更简单。
优化案例 TODO
20并发容器
Java容器分类:List、Map、Set、Queue
并发容器:

List
CopyOnWriteArrayList
缺点:
- 写数据时,读数据存在短暂不一致
- CopyOnWriteArrayList 迭代器是只读的,不支持增删改。
实现:
- 写数据时,先copy一份数据为新数组,新数组写入数据,引用从旧数组指向新数组;
- 写数据时,可以从旧数组读数据。
- 写操作互斥
场景:写操作少,能容忍读写的短暂不一致。
Map
ConcurrentHashMap
缺点:key无序,key和value不能为空
ConcurrentSkipListMap
优点:运行复杂度稳定,O(logn)。key有序
缺点:key和value不能为空
场景:无法接收Concurrenthashmap偶尔的不稳定时,尝试ConcurrentSkipListMap
实现:跳表。
Set
ConcueentSkipListSet
对ConcurrentSkipListMap的简单封装
CopyOnWriteArraySet
Queue
deque:双端队列;queue单端队列;Blocking:阻塞
只有ArrayBlockingQueue 和LinkedBlockingQueue 是支持有界的 。
使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。
21原子类:无锁工具类典范
count++,无锁线程安全实现
去除加锁、解锁CPU消耗。
增加cpu计算消耗来提升响应时间。
1 | public class Test { |
AtomicLong类的实现
1 | public final long getAndIncrement() { |
CAS
原理:
1 | 1.获取值 |
问题:
ABA问题
描述
假设 count 原本是 A,线程 T1 在获取到值是A,执行修改时;线程 T2 更新成了 B,之后又被 T3 更新回了 A;这样线程T1 虽然看到的还是是A,但是其实已经被其他线程更新过了 。
解决:增加版本号对比
一般来说,CAS适合轻量级的并发操作,也就是并发量并不多,而且等待时间不长的情况,否则就应该使用普通锁,进入阻塞状态,避免CPU空转。
不适用场景 todo:
应用场景:
在Java 8之后,也引入了新的原子操作类LongAdder和DoubleAdder,更加适合写多读少的并发情景。
JAVA中原子类
22Executor线程池
生产者和消费者共用一个线程池,生产者把线程池里的线程用光了,导致消费不了。 出现类似死锁现象。这种情况下通过线程池不太容易看,需要去计数。 所有线程都等待,还没有死锁,就查查为什么会等待。
不同业务定义不同的线程池。线程数量各自计算,然后做压测,优化有性能瓶颈的业务。
线程池可以对线程设置有业务意义的名称,方便排查问题。
线程池要使用有界队列,防止OOM。
拒绝策略一般和降级策略结合使用。降级策略:发到MQ、redis,存到数据库、访问降级等。
Java线程池的使用为什么不用获取、释放的模式,而是生产者-消费者模式?
- 目的和使用场景:数据库线程池主要用于管理数据库连接和执行数据库操作的线程,以提高数据库的并发性能和资源利用率。它通常用于处理数据库的读写操作,如查询、更新、插入等。而Java线程池是用于管理Java应用程序中的线程,用于执行各种任务,如计算、IO操作、网络请求等。
- 线程资源:数据库线程池通常会预先创建一组数据库连接,这些连接可以被多个线程共享。每个线程在需要执行数据库操作时,从线程池中获取一个可用的数据库连接,并在操作完成后释放连接。Java线程池则是根据需要动态创建和销毁线程,根据任务的数量和系统负载来调整线程的数量。
- 线程管理:数据库线程池通常由数据库管理系统(DBMS)负责管理和维护,它会根据配置参数和系统负载来控制线程的数量和行为。DBMS会监控线程的状态、执行时间等指标,并根据需要进行线程的回收和重新创建。Java线程池则是由Java应用程序自己管理和控制,可以通过ThreadPoolExecutor等类来创建和配置线程池,并根据需要进行线程的创建和销毁。
- 任务调度:数据库线程池通常不涉及任务调度和优先级管理,它主要负责执行数据库操作。而Java线程池可以通过调度器(如ScheduledThreadPoolExecutor)来实现任务的定时执行和周期性执行,还可以通过设置任务的优先级来管理任务的执行顺序。
TODO Java并发编程实战第八章:线程池的使用
23Future
24CompletableFuture异步编程
Java 在 1.8 版本提供了CompletableFuture 来支持异步编程。
Java 9提供了更完善的Flow API. https://juejin.cn/post/6994712328197373989
1.8JDK completableFuture 优先于使用future,优先于countdownLatch。
优点
无需手工维护线程
语义更清晰
代码简练,专注于业务逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public static void main(String[] args){
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
System.out.println("洗水壶");
sleep(1);
System.out.println("烧水");
sleep(15);
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {// 支持返回值
System.out.println("洗茶壶");
sleep(1);
System.out.println("洗茶杯");
sleep(2);
System.out.println("拿茶叶");
sleep(1);
return "龙井";
});
CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
System.out.println("泡茶...");
return "上茶:" + tf;
});
System.out.println(f3.join());
}
private static void sleep(int millis) {
try {
TimeUnit.SECONDS.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
注意
默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数。
如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,需要根据不同的业务类型创建不同的线程池,以避免互相干扰
四种条件&异常捕获
四种现实条件,同OA流程图绘制中的四种条件。
并行
串行
1
2
3
4f1.thenApply(); // 支持参数和返回值
f1.thenAccept(); // 支持参数,不支持返回值
f1.thenRun();// 不支持参数,也不支持返回值
f1.thenCompose(); // 除了新建一个子流程,最终结果和thenApply相同且
thenCombine、thenAcceptBoth 和 runAfterBoth
或
applyToEither、acceptEither 和runAfterEither
异常处理
exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。
whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{}
回调函数
回调函数在处理异常以及复杂的异步任务关系时往往力不从心。
RxJava 解决了回调问题。Java 9 版本则提供了更加完备的 Flow API