0%

并发工具包

并发工具包

并发工具包

内容

14-15Lock和Condition

Lock相比于synchronized解决了破坏死锁中的不可抢占条件。

Lock 用于解决互斥问题,Condition 用于解决同步问题。

如何解决:

  1. 能够响应中断
  2. 支持超时
  3. 支持非阻塞获取锁,获取失败返回获取失败的结果。
1
2
3
4
5
6
7
8
9
10
public class LockTest {
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
lock.lockInterruptibly();// 可中断锁

lock.tryLock(1L, TimeUnit.SECONDS);// 支持超时的锁

boolean b = lock.tryLock();// 非阻塞获取锁;尝试获取锁,获取不到就返回false,不会阻塞
}
}

可重入锁

  • 解决:防止funA()调用funB()产生死锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class LockTest {

    public synchronized void funA(){
    System.out.println("funA");
    funB();
    }

    public synchronized void funB(){
    System.out.println("funB");
    }

    public static void main(String[] args) {
    // 可重入锁案例
    LockTest lockTest = new LockTest();
    lockTest.funA();// funA调用funB时,因为锁可冲入,可以调用成功,不会产生死锁
    }
    }
  • 定义:可重入锁指的是在一个线程中可以多次获取同一把锁

  • 实现:每次加锁成功,请求计数器+1,每次解锁,请求计数器-1;第一次加锁时,对对象写入当前线程id;计数器为0时,锁未被占用

公平锁与非公平锁

  • 非公平锁
    • 优点:
    • 缺点
  • 公平锁
1
2
3
4
5
6
7
8
//ReentrantLock构造方法
public ReentrantLock() {// 非公平锁
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

Lock可见性的实现

内部 volatile state,加锁时,读写status的值;

锁的最佳实践

  1. 只在更新对象的成员变量时加锁。
  2. 只在访问可变的成员变量时加锁
  3. 尽量不在修改别的对象的方法时加锁

Condition

Condition 实现了管程模型里面的条件变量.

两个condition条件实现阻塞队列。

Dubbo实现异步转同步

将TCP层的线程异步通过阻塞线程转为线程同步

16 Semaphore 信号量

解决:

  1. 并发的互斥和同步问题
  2. 多个线程同时使用一个数据库连接池、线程池

实现:

  • 1个计数器
  • 1个等待队列
  • 三个方法。 init(),acquire(),release()

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(1);// 1表示只有一个许可证

new Thread(() -> {
try {
semaphore.acquire();// 获取许可证,计数器减1,如果计数器的值小于0,线程会被阻塞
System.out.println("线程1获取到许可证");
System.out.println(semaphore.toString());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();// 释放许可证,计数器加1,如果计数器的值小于等于0,会唤醒一个等待的线程
}
}).start();

new Thread(() -> {
try {
semaphore.acquire();// 获取许可证
System.out.println("线程2获取到许可证");
System.out.println(semaphore.toString());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();// 释放许可证
}
}).start();
}

信号量实现限流器

17 ReadWriteLock

使用场景:读多写少

类似于数据库的读和写。

读写不能升级为写锁

数据同步方案:

  1. MySQL作为数据源,通过近实时解析binlog识别数据变化
  2. 数据库和缓存双写

ReadWriteLock实现简单缓存demo

1
com.xiaoruiit.knowledge.point.javaconcurrent.concurrent.ReadWriteLockTest	

18 StampedLock

支持写锁,悲观读锁,乐观读

不支持重入,不支持条件变量

1
2
stampedLock.readLock()// 不支持中断
stampedLock.readLockInterruptibly();// 支持中断

低并发场景性能比ReadWriteLock更好一点

19CountDownLatch和CyclicBarrier 线程步调一致

可使用Future代替,使用更简单。

优化案例 TODO

20并发容器

Java容器分类:List、Map、Set、Queue

并发容器:

image-20230908103111740

  • List

    • CopyOnWriteArrayList

      缺点:

      • 写数据时,读数据存在短暂不一致
      • CopyOnWriteArrayList 迭代器是只读的,不支持增删改。

      实现:

      • 写数据时,先copy一份数据为新数组,新数组写入数据,引用从旧数组指向新数组;
      • 写数据时,可以从旧数组读数据。
      • 写操作互斥

      场景:写操作少,能容忍读写的短暂不一致。

  • Map

    • ConcurrentHashMap

      缺点:key无序,key和value不能为空

    • ConcurrentSkipListMap

      优点:运行复杂度稳定,O(logn)。key有序

      缺点:key和value不能为空

      场景:无法接收Concurrenthashmap偶尔的不稳定时,尝试ConcurrentSkipListMap

      实现:跳表。

  • Set

    • ConcueentSkipListSet

      对ConcurrentSkipListMap的简单封装

    • CopyOnWriteArraySet

  • Queue

    deque:双端队列;queue单端队列;Blocking:阻塞

    只有ArrayBlockingQueue 和LinkedBlockingQueue 是支持有界的 。

    使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。

21原子类:无锁工具类典范

count++,无锁线程安全实现

去除加锁、解锁CPU消耗。

增加cpu计算消耗来提升响应时间。

1
2
3
4
5
6
7
8
9
10
public class Test {
AtomicLong count =
new AtomicLong(0);
void add10K() {
int idx = 0;
while(idx++ < 10000) {
count.getAndIncrement();
}
}
}

AtomicLong类的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}

public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(o, offset, v, v + delta));
return v;
}

public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);

CAS

原理:

1
2
3
4
1.获取值
2.计算
3.调用CPU指令,比较并交换。
4.比较并交换失败时,循环123。循环过程也称自旋

问题:

  1. ABA问题

    • 描述

      假设 count 原本是 A,线程 T1 在获取到值是A,执行修改时;线程 T2 更新成了 B,之后又被 T3 更新回了 A;这样线程T1 虽然看到的还是是A,但是其实已经被其他线程更新过了 。

    • 解决:增加版本号对比

一般来说,CAS适合轻量级的并发操作,也就是并发量并不多,而且等待时间不长的情况,否则就应该使用普通锁,进入阻塞状态,避免CPU空转。

不适用场景 todo:

应用场景:

​ 在Java 8之后,也引入了新的原子操作类LongAdder和DoubleAdder,更加适合写多读少的并发情景。

JAVA中原子类

22Executor线程池

生产者和消费者共用一个线程池,生产者把线程池里的线程用光了,导致消费不了。 出现类似死锁现象。这种情况下通过线程池不太容易看,需要去计数。 所有线程都等待,还没有死锁,就查查为什么会等待。

不同业务定义不同的线程池。线程数量各自计算,然后做压测,优化有性能瓶颈的业务。

线程池可以对线程设置有业务意义的名称,方便排查问题。

线程池要使用有界队列,防止OOM。

拒绝策略一般和降级策略结合使用。降级策略:发到MQ、redis,存到数据库、访问降级等。

Java线程池的使用为什么不用获取、释放的模式,而是生产者-消费者模式?

  1. 目的和使用场景:数据库线程池主要用于管理数据库连接和执行数据库操作的线程,以提高数据库的并发性能和资源利用率。它通常用于处理数据库的读写操作,如查询、更新、插入等。而Java线程池是用于管理Java应用程序中的线程,用于执行各种任务,如计算、IO操作、网络请求等。
  2. 线程资源:数据库线程池通常会预先创建一组数据库连接,这些连接可以被多个线程共享。每个线程在需要执行数据库操作时,从线程池中获取一个可用的数据库连接,并在操作完成后释放连接。Java线程池则是根据需要动态创建和销毁线程,根据任务的数量和系统负载来调整线程的数量。
  3. 线程管理:数据库线程池通常由数据库管理系统(DBMS)负责管理和维护,它会根据配置参数和系统负载来控制线程的数量和行为。DBMS会监控线程的状态、执行时间等指标,并根据需要进行线程的回收和重新创建。Java线程池则是由Java应用程序自己管理和控制,可以通过ThreadPoolExecutor等类来创建和配置线程池,并根据需要进行线程的创建和销毁。
  4. 任务调度:数据库线程池通常不涉及任务调度和优先级管理,它主要负责执行数据库操作。而Java线程池可以通过调度器(如ScheduledThreadPoolExecutor)来实现任务的定时执行和周期性执行,还可以通过设置任务的优先级来管理任务的执行顺序。

TODO Java并发编程实战第八章:线程池的使用

23Future

24CompletableFuture异步编程

Java 在 1.8 版本提供了CompletableFuture 来支持异步编程。

Java 9提供了更完善的Flow API. https://juejin.cn/post/6994712328197373989

1.8JDK completableFuture 优先于使用future,优先于countdownLatch。

  • 优点

    1. 无需手工维护线程

    2. 语义更清晰

    3. 代码简练,专注于业务逻辑。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      public static void main(String[] args){
      CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
      System.out.println("洗水壶");

      sleep(1);

      System.out.println("烧水");

      sleep(15);
      });

      CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {// 支持返回值
      System.out.println("洗茶壶");

      sleep(1);

      System.out.println("洗茶杯");

      sleep(2);

      System.out.println("拿茶叶");

      sleep(1);

      return "龙井";
      });

      CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
      System.out.println("泡茶...");

      return "上茶:" + tf;
      });

      System.out.println(f3.join());

      }

      private static void sleep(int millis) {
      try {
      TimeUnit.SECONDS.sleep(millis);
      } catch (InterruptedException e) {
      throw new RuntimeException(e);
      }
      }
  • 注意

    默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数。

    如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,需要根据不同的业务类型创建不同的线程池,以避免互相干扰

四种条件&异常捕获

四种现实条件,同OA流程图绘制中的四种条件。

  1. 并行

  2. 串行

    1
    2
    3
    4
    f1.thenApply(); // 支持参数和返回值
    f1.thenAccept(); // 支持参数,不支持返回值
    f1.thenRun();// 不支持参数,也不支持返回值
    f1.thenCompose(); // 除了新建一个子流程,最终结果和thenApply相同
  3. thenCombine、thenAcceptBoth 和 runAfterBoth

  4. applyToEither、acceptEither 和runAfterEither

  • 异常处理

    exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。

    whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{}

回调函数

回调函数在处理异常以及复杂的异步任务关系时往往力不从心。

RxJava 解决了回调问题。Java 9 版本则提供了更加完备的 Flow API

25CompleteService 任务解耦

26Fork/Join 任务分治