0%

并发设计模式

并发设计模式

05 并发设计模式

内容

28 利用不可变性解决并发问题-Immutability 模式

读和写同时存在,会产生并发问题。只有读不会有并发问题。

不变性:一旦创建,或者第一次赋值后,就不允许修改。

JDK中的不可变类:String、Long、Integer、Double

不可变类的实现:

  1. 属性都用final修饰,属性不能是一个对象。
  2. 方法只有只读方法。
  3. 类用final修饰,防止子类继承父类重写父类方法。

String中字符串操作的字符替换操作如何实现的不可变?

  1. 仅仅是原字符没有改变
  2. new 出来一个新字符串,通过返回值返回替换后的字符串。

亨元模式

  • 解决:去除经常使用的不可变重复对象,降低内存占用

  • 实现原理:创建对象时查询对象池中是否有,有的话,直接获取对象池中的对象;没有的话,创建出对象后,放入对象池中

  • 例子:Integer类型的创建,JVM启动时,将-128至127创建出来,这个范围的数,使用时直接返回这个数的地址。

  • 缺点:不适合做锁,对象池中的对象是共有的。

29 Copy On Write

  • 解决:提升读多写少,弱一致性时,读操作的速度。

  • 实现原理:修改时,先对原数据复制一份,修改复制的数据。

  • 缺点:更消耗内存。在GC算法成熟,硬件发达的现在不是问题了。数组较大时不适用

  • 例子

    Docker容器镜像设计,git设计,函数式编程、String.replace方法

    1. 操作系统进程复制

      子进程复制父进程时,先共享同一个地址空间;父进程或子进程需要写入的时候复制修改部分的地址空间。

      总结:延时复制,按需复制。

    2. RPC维护路由表

      image-20230925090436399

      • 路由表的使用

        • 每次 RPC 调用都需要通过负载均衡器来计算目标服务的 IP 和端口号,而负载均衡器需要通过路由表获取接口的所有路由信息,也就是说,
          每次 RPC 调用都需要访问路由表

        • 每次服务的上下线,需要新增或删除一条路由信息

        • 一致性

          一个服务提供方从上线到反馈到客户端的路由表里,即便有 5 秒钟,也能接受。

      • 路由表实现 todo

30线程本地存储

ThreadLocal结构

image-20231009172650851

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class MyThread {
// 内部持有 ThreadLocalMap
ThreadLocal.ThreadLocalMap threadLocals = null;

public static MyThread currentThread() {
MyThread thread = new MyThread();
thread.threadLocals = new ThreadLocal.ThreadLocalMap();
return thread;
}
}

class ThreadLocal<T> {
public T get() {
// 首先获取线程持有的ThreadLocalMap
ThreadLocalMap map = MyThread.currentThread().threadLocals;
// 在 ThreadLocalMap 中查找变量
ThreadLocalMap.Entry e = map.getEntry(this);
return (T) e.value;
}

public void set(T value) {
MyThread t = MyThread.currentThread();
ThreadLocal.ThreadLocalMap map = getMap(t);
if (map != null){
// map.set(this, value);// 不同线程哈希表中的key(ThreadLocal引用)相同,但不同线程的哈希表都是各自自己的,是不同的
}

else
createMap(t, value);
}

void createMap(MyThread t, T firstValue) {
t.threadLocals = new ThreadLocal.ThreadLocalMap(this, firstValue);
}

ThreadLocal.ThreadLocalMap getMap(MyThread t) {
return t.threadLocals;
}

static class ThreadLocalMap {
// 内部是数组而不是 Map
Entry[] table;

// 根据 ThreadLocal 查找 Entry
Entry getEntry(ThreadLocal key) {
// 省略查找逻辑
return new Entry(key);
}

// Entry 定义
static class Entry extends WeakReference<ThreadLocal> {
Object value;

public Entry(ThreadLocal referent) {
super(referent);
}
}

ThreadLocalMap(){}

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// table = new ThreadLocal.ThreadLocalMap.Entry[INITIAL_CAPACITY];
// int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// table[i] = new ThreadLocal.ThreadLocalMap.Entry(firstKey, firstValue);
// size = 1;
// setThreshold(INITIAL_CAPACITY);
}
}
}

31等待唤醒机制

32 Balking 多线程单例模式

  • 解决:多线程变量的改变和读取

  • 实现

    1
    2
    synchroized
    无需原子性时,可使用volatile

33 Thread-Per-Message模式

  • 解决:分工。

  • 例子:

    • 生活:教育小朋友委托给老师;买房子委托房产中介
    • 部分基于Thread实现分布式任务调度框架
  • 实现:

    • 原理:一个请求一个创建一个线程。

    • Thread实现

      • 缺点:与操作系统线程一致,创建销毁线程耗时,占用内存大,可能OOM,并发量低
    • Fiber协程实现

      创建成本相当于创建一个对象

    Java协程库Fiber

    GO从语言上支持协程

  • 问题:目前java语言中的协程是哪个Java版本开始提供,如何使用

34Worker-Thread模式

车间工人工作的模式

  • 解决:

  • 例子:

    • 生活中:车间的工人有活了就干活,没活了休息聊天
  • 实现原理:

  • 注意

    1. 线程死锁

      描述:A、B两个任务使用同一个线程池,B任务依赖A任务的结果。B任务把线程池占满后,所有B任务会一直等待。

      解决:A、B使用不同线程池

    2. 线程池超界、拒绝策略

35线程两阶段终止

子线程终止

  • stop()

    直接终止线程,如同直接终止运行的服务,电脑强制关机

  • interrupt()

    • 用途:终止内部一直循环执行某个逻辑的线程

    • 实现:主线程内改变子线程终止属性为ture,由开发者在子线程内判断此属性值来决定关闭线程时机。

    • 注意:捕获 InterruptedExpection异常时,需重置 interrupt标记位

    • 场景:用子线程定时采集系统数据,终止采集的实现可使用interrupt

    • 使用示例

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      class Proxy {
      boolean started = false;

      Thread rptThread;

      synchronized void start() {
      if (started) {// 保证线程只启动一次
      return;
      }
      started = true;
      rptThread = new Thread(() -> {
      while (!Thread.currentThread().isInterrupted()) {
      // 采集系统数据,存储
      System.out.println("Proxy采集系统数据,存储");
      // 每隔两秒采集
      try {
      Thread.sleep(2000);
      } catch (InterruptedException e) {
      // 重新设置线程中断状态
      Thread.currentThread().interrupt();
      }
      }
      // 执行到此处说明线程马上终止
      started = false;
      });
      rptThread.start();
      }

      synchronized void stop() {
      rptThread.interrupt();
      }
      }
  • interrupt + 自实现终止标记

    • 用途:调用第三方接口时,防止第三方接口捕获了异常,没有重置线程终止标记位

    • 使用示例:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      class Proxy2{
      boolean started = false;

      volatile boolean stop = false;

      Thread rptThread;

      synchronized void start() {
      if (started) {// 保证线程只启动一次
      return;
      }
      started = true;
      rptThread = new Thread(() -> {
      while (!stop) {
      // 采集系统数据,存储
      System.out.println("Proxy2采集系统数据,存储");
      // 每隔两秒采集
      try {
      Thread.sleep(10000);
      } catch (InterruptedException e) {
      System.out.println("Proxy2线程被中断");
      }
      }
      // 执行到此处说明线程马上终止
      started = false;
      });
      rptThread.start();
      }

      synchronized void stop() {
      stop = true;
      rptThread.interrupt();// 如果线程休眠,可唤醒线程
      }
      }

线程池终止

  • shutdown()

    不再接受任务,等待任务执行完后关闭线程池

  • shutdownNow()

    不再接受任务,并且中断线程池(interrupt())中正在执行的任务,返回被剥夺执行任务

36生产者-消费者模式

生产者线程生产任务,并将任务添加到任务队列中,而消费者线程从任务队列中获取任务并执行。

  • 优点

    解耦。灵活性高,通过固定交互数据结构,生产者和消费者可独立修改而不影响对方

​ 支持异步

​ 可以调整生产者、消费者个数来平衡生产者、消费者速度,进而提高资源使用率

​ 消费者可以批量消费生产者生产出的数据,进而提升性能。如:redis每秒刷盘策略

  • 缺点

    增加复杂度,可能造成队列堆积,或者消费者闲置浪费资源

问题

  1. CopyOnWriteArrayList为什么写时复制全部,linux的fork()函数是复制部分?

    CopyOnWriteArrayList和fork()函数的设计目标和使用场景不同。
    CopyOnWriteArrayList是为了提供线程安全的读写操作,并且在读多写少的场景下具有较好的性能。它的设计目标是在读取操作上提供高性能和并发性,并且保证读取操作的一致性。为了实现这个目标,CopyOnWriteArrayList在写操作时会创建一个新的副本,从而保持读取操作的一致性。
    而fork()函数是用于创建新进程的,它的设计目标是实现进程的复制。fork()函数使用写时复制技术来实现进程的快速复制,避免了不必要的内存复制开销。这种设计适用于进程的复制场景,但不适用于通用的集合操作。
    此外,CopyOnWriteArrayList是Java集合框架中的一部分,它是为了提供线程安全的集合操作而设计的。而fork()函数是操作系统提供的系统调用,用于创建新进程。它们的设计和实现是基于不同的需求和上下文。

  2. log4j的刷盘时机?

参考

《图解 Java 多线程设计模式》