JUC

一、Lock基本使用

ReentrantLock(重入锁)

ReentrantReadWriteLock(读写锁)

二、ReentrantLock原理

定义:
基本概念
1. 可重入,即同一个线程可以多次获取同一把锁而不导致死锁
2. 公平锁和非公平锁,**<font style="color:#DF2A3F;">默认是非公平锁</font>**
    1. 非公平锁代码体现,一开始会尝试去抢占锁,有compareAndSetState方法
    2. 公平锁代码体现,没有前面的抢占逻辑,没有compareAndSetState方法
设计原理
  1. 获得锁和唤醒过程
  2. 基于AQS(AbstractQueuedSynchronizer)实现
    1. 定义:AQS 使用一个 FIFO 队列来管理尝试获取锁但未能成功的线程,并使用了一个整型变量表示锁的状态
    2. AQS状态管理,AQS定义两个属性
      1. 有一个volatile修饰的整型变量state,state=0表示无锁,state可以重入获取
      2. execlusiveOwnerThread,存储当前获得锁线程
    3. 队列机制:
      1. 当一个线程尝试获取锁但失败时,它会被封装成一个Node节点加入到 AQS 维护的同步队列尾部。每个节点都有pre和next指针,以便于线程之间的协调。队列中的线程会按照 FIFO 的顺序尝试获取锁。
      2. 当持有同步状态的线程执行完任务后调用了相应的释放锁,AQS将从等待队列的头部开始,唤醒第一个节点对应的线程,并允许它再次尝试获取同步状态
      3. Node节点定义和属性
        1. waitStatus,节点状态,包括:SIGNAL、
        2. thread,当前线程
    4. 阻塞和唤醒原理:
      1. 通过lockSupport的park();和unPark(threadId);
    5. AQS为什么是安全的
      1. 原子变量操作
        AQS使用了一个单一的int类型的volatile变量state来表示同步状态。compareAndSetState(int expect, int update)方法进行访问和修改。其中,compareAndSetState方法利用了CAS(Compare And Swap)机制,这是一种底层硬件支持的原子操作,能够在不使用锁的情况下实现对共享变量的安全更新。这意味着即使多个线程同时尝试修改state,也能够确保任意时刻只有一个线程能成功修改其值,从而避免了竞态条件。
      2. 内存屏障
        为了防止编译器优化或CPU乱序执行导致的数据一致性问题,AQS在关键位置插入了内存屏障(Memory Barrier)。这些屏障确保了某些指令不会被重排序,并强制刷新缓存到主内存,使得所有线程都能看到最新的状态变更。这对于维护同步状态的一致性至关重要。
      3. FIFO等待队列
        当一个线程试图获取同步状态但失败时(例如,因为另一个线程已经持有锁),该线程将被封装成一个节点并加入到一个由AQS管理的FIFO双向链表中等待。这种队列结构确保了线程可以按照申请锁的顺序依次获得锁,避免了饥饿现象的发生。此外,AQS还通过自旋与阻塞相结合的方式提高了性能:如果预计很快就能获取到锁,线程可能会先进行短暂的自旋;否则,则会被挂起以节省资源。
      4. 线程的唤醒
        一旦持有锁的线程释放了同步状态,AQS会从等待队列头部移除第一个节点,并尝试将其对应的线程唤醒。这样做的好处是保证了公平性,即最先请求锁的线程将优先获得锁。同时,这也意味着在高并发环境下,虽然可能有多个线程竞争同一把锁,但由于存在这样一个有序的调度机制,依然可以保证系统的稳定性和高效性。
  3. 其他
    1. Sync:这是 <font style="color:rgb(44, 44, 54);">ReentrantLock</font> 的核心组件,继承自 AQS。它定义了加锁和解锁的基本行为。
    2. NonfairSync 和 FairSync:这两个类分别实现了非公平锁和公平锁的具体逻辑。它们都是 Sync 的子类,提供了不同的策略来处理锁的竞争。

三、Condition条件控制

Java中的Condition接口是并发包中的一部分,它为线程间的协调提供了比Object.wait()和Object.notify()更强大的功能。

基本概念
  1. Condition接口通常和Lock一起搭配使用,用来实现等待/通知模式。
同步队列和等待队列
  1. 同步队列,这是由<font style="color:rgb(44, 44, 54);">AbstractQueuedSynchronizer</font>(AQS)管理的一个FIFO队列,用于排队那些尝试获取锁但暂时未能成功的线程。(是一个双向链表
  2. 等待队列,这是由ConditionObject(实现了Condition接口的具体类)管理的另一个FIFO队列,用于排队那些调用了await()方法而进入等待状态的线程。( 是一个单向链表
执行细节
  1. 当一个已经持有锁的线程调用Condition对象的await()方法时

    ,会发生以下操作:

    1. 它会释放持有的锁
    2. 封装成一个Node.CONDITION节点类型,并加入到等待队列中(尾部),进入等待状态
    3. 直到其他线程调用同一个Condition的signal()或signalAll()方法
  2. 当某个线程调用Condition对象的signal()或signalAll()方法时

    ,会发生以下操作:

    1. 等待队列头部的一个线程会被选中,并从等待队列移除
    2. 重新插入到同步队列(AQS)的尾部,准备再次尝试竞争锁
    3. 被唤醒的线程不会立即执行,需要与其他已经在同步队列(AQS)的现成竞争锁

四、阻塞队列

BlockingQueue是Java并发包的中的一个接口,支持在多线程环境中数据的存储和传递。

核心特性
  • 阻塞插入和移除:如果队列满了,试图添加新元素的操作将会被阻塞(进入等待队列),直到有足够的空间(等待signal唤醒)。同样地,如果队列为空,试图移除元素的操作也会被阻塞(进入等待队列),直到队列中有元素(等待signal唤醒)。
  • 线程安全:所有操作都是线程安全的,无需额外同步机制。
  • 不允许存储null值:因为poll()方法返回null表示队列为空,所以不允许存储null值以避免混淆。
  • 可选的容量限制:有些实现允许指定最大容量,而其他则是无界的(例如LinkedBlockingQueue默认构造函数创建的是无界队列)
BlockingQueue原理
  1. BlockingQueue的内部通常通过锁(如

    <font style="color:rgb(44, 44, 54);">ReentrantLock</font>)和条件变量(Condition)来实现线程间的同步。当一个线程尝试对一个已满或已空的队列执行操作时,该线程会被挂起,直到另一个线程改变队列的状态(即增加或移除元素)。这个过程涉及到以下关键点:

    • :用于确保同一时刻只有一个线程能够修改队列状态。
    • 条件变量:用于让线程在特定条件下等待(比如队列为空或满),并且可以在条件满足时重新唤醒这些线程。
  2. 例如 ArrayBlockingQueue的实现原理
    1. 核心成员,主要包含 一个lock和两个Condition条件控制(一个Condition一个等待队列)
      1. final ReentrantLock lock; 重入锁,用来保护对队列的所有操作。
      2. private final Condition notEmpty; 当队列为空时,等待获取元素的线程会在此条件上等待。
      3. private final Condition notFull; 当队列已满时,等待插入元素的线程会在此条件上等待。
    2. 内部逻辑细节
      1. 环形数组:为了高效利用数组空间,ArrayBlockingQueue实际上实现了环形数组的概念。即当putIndex或takeIndex到达数组末尾时,它们会被重置为0,从而形成一个循环结构
      2. put
        1. count==items.length,表示队列满了,等待插入元素的线程会在此条件进入等待
        2. enqueue(),元素入队列,同时唤醒在notEmpty条件等待的线程
      3. take
        1. count==0,表示队列空了,等待获取元素的线程会在此条件上等待
        2. dequeue(),元素出队列,同时唤醒在notFull条件等待的线程

五、同步工具

1、CountDownLatch

CountDownLatch 是 Java 并发包J.U.C中的一个同步辅助类。它允许一个或多个线程等待,直到其他线程执行完一组操作后再继续执行

基本使用
CountDownLatch countDownLatch = new CountDownLatch(1); //创建对象并初始化计数器
countDownLatch.await(); // 现成会被挂起
countDownLatch.countDown(); // 减少计数器
基本原理
  • CountDownLatch 的核心原理是基于 AQS(AbstractQueuedSynchronizer)实现的,使用了AQS 的 state 字段作为计数器
  • 初始化:创建CountDownLatch对象,并设置初始值
  • 等待阶段:主线程调用 await() 方法,此时若计数器不为 0,则主线程会被放入 AQS 队列中等待
  • 任务执行阶段:每个子线程完成自己的任务后,会调用 <font style="color:rgb(44, 44, 54);">countDown()</font> 方法,这会使计数器减一
  • 解锁阶段:一旦计数器变为 0,AQS 会自动唤醒所有因为调用了 await() 而处于等待状态的线程,这些线程将继续执行
  • 区别
    • 独占锁的唤醒过程:先判断头节点状态是SIGNAL,如果是则修改为0,并唤醒头结点的下一个节点
    • 共享锁模式的节点状态是:PROPAGATE,处于这个状态下的节点,会对线程的唤醒进行传播(第一次唤醒了head节点(继续执行),head节点再唤醒第一个节点,第一个节点再唤醒第二个节点...以此类推)

2、Semaphore(信号量)

信号量(Semaphore)是操作系统中用于控制多个线程对共享资源访问的一种同步机制

基本概念和方法
  • 许可:当创建一个<font style="color:rgb(44, 44, 54);">Semaphore</font>实例时,需要指定一个许可数,这个数字代表了最多允许多少个线程同时访问受保护的资源。
  • 获取许可(acquire()方法:每当一个线程想要访问受保护的资源时,它必须首先调用<font style="color:rgb(44, 44, 54);">acquire()</font>方法尝试获得一个许可。如果当前还有可用的许可,则该线程可以继续执行;如果没有可用的许可(即所有许可都已被其他线程占用),则调用<font style="color:rgb(44, 44, 54);">acquire()</font>的线程将被阻塞,直到有其他线程释放了一个许可。
  • 释放许可(release()方法:一旦线程完成了对共享资源的访问,它应该调用<font style="color:rgb(44, 44, 54);">release()</font>方法来释放其持有的许可,使得其他等待的线程有机会获取许可并访问资源。
深入理解原理
  • 内部队列:当没有可用的许可时,试图获取许可的线程会被放入一个等待队列中。这个队列由AQS(AbstractQueuedSynchronizer)管理,确保线程能够有序地获取到许可。
  • 公平性:如果<font style="color:rgb(44, 44, 54);">Semaphore</font>被配置为公平模式,那么等待时间最长的线程将优先获得许可。这有助于避免饥饿问题,但可能会降低整体性能。
  • 非公平性默认情况下,<font style="color:#DF2A3F;">Semaphore</font>是非公平的,这意味着新到达的线程有机会“插队”直接尝试获取刚刚被释放的许可,而不需要排队等候。
重点思路
  • 唤醒过程:AQS会遍历等待队列中的节点,使用LockSupport.unpark()方法来恢复那些之前被挂起的线程。被唤醒的线程将再次尝试获取许可。如果此时有可用的许可(即信号量的许可计数大于0),那么该线程将成功获取许可并退出等待状态,继续执行其后续代码。如果没有可用的许可,则该线程可能会再次进入等待状态,或者根据实现细节进行短暂的自旋后再尝试获取

3、CyclicBarrier

<font style="color:rgb(44, 44, 54);">CyclicBarrier</font> 是 Java 并发包 <font style="color:rgb(44, 44, 54);">java.util.concurrent</font> 中的一个同步辅助类,它允许一组线程相互等待,直到所有线程都到达一个共同的屏障点(barrier)。CountDownLatch 不同的是,CyclicBarrier 在触发后可以重用,这意味着它可以用来实现循环或者重复的同步操作。

基本概念

  • CyclicBarrier(int parties):创建一个新的 CyclicBarrier,设置需要等待的线程数目为 parties
  • CyclicBarrier(int parties, Runnable barrierAction):除了设定需要等待的线程数目外,还可以提供一个 Runnable,当所有线程都到达屏障点时自动执行这个 Runnable。

原理

  • 阻塞过程
    1. 初始化:当你创建一个 <font style="color:rgb(44, 44, 54);">CyclicBarrier</font> 实例时,你需要指定参与同步的线程数量(称为 parties)。这个数字表示需要多少个线程调用 <font style="color:rgb(44, 44, 54);">await()</font> 方法后才能解除当前的屏障。
    2. 调用 **<font style="color:rgb(44, 44, 54);">await()</font>** 方法:当一个线程到达屏障点并调用 <font style="color:rgb(44, 44, 54);">await()</font> 方法时,该方法会首先将内部计数器减一,并检查是否所有参与的线程都已经到达屏障点。如果还没有,则当前线程会被阻塞,放入一个等待队列中。
    3. 进入等待状态:具体来说,当线程调用 <font style="color:rgb(44, 44, 54);">await()</font> 方法时,<font style="color:rgb(44, 44, 54);">CyclicBarrier</font> 会通过底层的同步机制(通常是基于 AQS)来挂起这个线程。这通常涉及到使用 <font style="color:rgb(44, 44, 54);">LockSupport.park()</font> 方法来暂停线程的执行,使它进入等待状态,直到被显式唤醒或由于其他原因(如中断)而恢复执行。
  • 唤醒过程
    1. 最后一个线程到达屏障点:一旦最后一个线程调用了 <font style="color:rgb(44, 44, 54);">await()</font> 方法,使得内部计数器达到零,<font style="color:rgb(44, 44, 54);">CyclicBarrier</font> 就知道现在所有线程都已经到达了屏障点。
    2. 触发屏障动作(如果有的话):如果在创建 <font style="color:rgb(44, 44, 54);">CyclicBarrier</font> 时提供了一个 <font style="color:rgb(44, 44, 54);">Runnable</font> 对象作为屏障动作,那么此时将会执行这个 <font style="color:rgb(44, 44, 54);">Runnable</font>。这个动作通常用于执行一些在所有线程到达屏障点之后需要进行的操作,比如合并结果或者更新共享状态等。
    3. 重置内部状态:在执行完屏障动作之后,<font style="color:rgb(44, 44, 54);">CyclicBarrier</font> 会重置其内部计数器为初始值(即你设定的 <font style="color:rgb(44, 44, 54);">parties</font>),以便它可以被再次使用。这是因为 <font style="color:#DF2A3F;">CyclicBarrier</font> 支持重用,也就是说,它可以在一轮同步完成后继续用于下一轮同步
    4. 唤醒所有等待的线程<font style="color:rgb(44, 44, 54);">CyclicBarrier</font> 会遍历队列中的所有节点,并使用 <font style="color:rgb(44, 44, 54);">LockSupport.unpark()</font> 方法逐个唤醒这些线程。这意味着之前被阻塞的所有线程现在都将从它们调用 <font style="color:rgb(44, 44, 54);">await()</font> 方法的地方恢复执行,继续执行它们后续的代码。
4、Atomic
在 Java 中,

<font style="color:rgb(44, 44, 54);">java.util.concurrent.atomic</font> 包提供了多种原子操作类,如 <font style="color:rgb(44, 44, 54);">AtomicInteger</font>, <font style="color:rgb(44, 44, 54);">AtomicLong</font>, <font style="color:rgb(44, 44, 54);">AtomicBoolean</font>, <font style="color:rgb(44, 44, 54);">AtomicReference</font> 等。这些类允许以原子方式执行某些操作,而无需显式地使用锁来保证线程安全。

定义
  • 原子类主要依赖于硬件层面的原子操作指令(如 CAS,Compare And Swap 或 Compare And Set)来实现无锁的同步机制
  • CAS 是一种乐观锁策略,它包括三个操作数:内存位置(V)、预期原值(A)和新值(B)。只有当内存位置处的值与预期原值相匹配时,才会将该位置的值更新为新值;否则不进行任何操作,并返回当前值。
  • 这种机制可以避免传统锁带来的高开销和死锁问题。
原理详解
  • 保证原子性:对 CAS 操作的支持。例如,在 <font style="color:rgb(44, 44, 54);">AtomicInteger</font> 中,<font style="color:rgb(44, 44, 54);">compareAndSet(int expect, int update)</font> 方法尝试将当前值与预期值比较,如果相等,则用新值替换旧值。这个过程是在硬件级别上完成的,保证了操作的原子性
  • 保证可见性:原子类还确保了内存可见性。也就是说,一旦某个线程修改了共享变量的值,其他线程能够立即看到这个修改。
其他问题
  • 引入了 ABA 问题,即一个值从 A 变为 B 再变回 A,这可能导致 CAS 认为没有发生过变化。
  • 为了应对这种情况,Java 提供了

    <font style="color:rgb(44, 44, 54);">AtomicStampedReference</font><font style="color:rgb(44, 44, 54);">AtomicMarkableReference</font> 来追踪引用的变化次数或标记位,从而避免 ABA 问题的影响

六、ConcurrentHashMap

ConcurrentHashMap的实现原理
  1. 整体架构
    1. JDK1.7是使用Segment和小数组HashEntry来实现,Segment本身基于ReentrantLock(重入锁)实现线程安全(即操作插入会锁住Segment)。所以我们把它称做分段锁
      1. JDK1.7

        的实现锁的粒度大,发生Hash冲突时性能差。

      2. 数据多需要遍历链表(性能不行)
    2. JDK1.8是由数组、单项链表、红黑数组成。默认会初始化一个长度为16的数组。
    3. ConcurrentHashMap的核心仍然是Hash表,存在hash冲突时,采用链式寻址法来解决hash冲突。当链表比较长时,数据元素的查询复杂度变成O(n)
    4. JDK1.8引入红黑树,即链表的长度大于8并且hash表的数组容量大于64时,链表会转换为红黑树(如果此时数组容量小于64会优先考虑ConcurrentHashMap扩容,避免过早避免过早地将链表转换为红黑树,相比链表,它需要更多的内存空间,并且节点的创建和维护成本也更高
    5. JDK1.8使用CAS加volatile或者Synchronized的方式来保证线程安全,锁的粒度小,查询性能也更高
      1. volatile+ CAS主要在初始化数组initTable时使用;
      2. Synchronized + CAS主要在put元素使用,主要是锁住头结点
  2. put设计过程
    1. 首先会判断容器是否为空,如果容器为空,则使用volatile加CAS来初始化
    2. 如果容器不为空,根据元素计算的hash值找到相应存储位置,如果存储位置为空,则利用CAS设置该节点
    3. 如果存储位置不为空,则使用Synchronized锁住头节点,遍历链表元素新增或者替换到链表中
    4. 最后再判断是否需要转化为红黑树
  3. size设计过程
    1. ConcurrentHashMap中有一个size()方法,用来获取总元素个数。但是在多线程场景中,在保证原子性的前提下,对元素个数的累加,性能非常低。ConcurrentHashMap有以下优化:
      1. 当线程竞争不激烈,直接通过CAS(baseCount + 1)来实现元素个数递增
      2. 当线程竞争激烈,使用一个数组CounterCell[]来维护元素个数。过程是:从数组随机选择一个下标,通过CAS实现原子递增,即CAS(CounterCell[x].value + 1)
    2. 因此,获取ConcurrentHashMap的元素总数,需要从两个变量累加:Sum = baseCount + CounterCell数组
    3. 此外,CounterCell竞争还是激烈,也会扩容
  4. ConcurrentHashMap扩容
    1. 扩容条件:
      1. 插入新元素到达容量阈值,每个ConcurrentHashMap示例都有一个负载因子,默认0.75。哈希表中的元素数量超过桶数乘以负载因子时,就会触发扩容。例如,初始化容量16,当存储的键值对数量达到 12(即 16 * 0.75)时,就会触发扩容。
      2. 链表的长度超过了阈值(默认8)且当前数组小于64,会优先考虑扩容
    2. 分段扩容:

      <font style="color:rgb(44, 44, 54);">ConcurrentHashMap</font> 在扩容时采用了分段迁移的方式,不是一次性地将所有旧桶中的元素迁移到新的桶中,而是每次只处理一部分。这样做可以减少锁的竞争,并允许其他读写操作同时进行

    3. 协助扩容:
      1. 检查当前桶是否正在被迁移
      2. 参与迁移,线程会从当前桶开始,向后遍历直到找到下一个未迁移的桶为止,然后将这些桶中的元素重新散列并移动到新的桶位置上
      3. 更新状态,完成迁移后,相应桶的状态会被标记为已迁移,以便后续的操作可以直接使用新的桶结构。
    4. 完成扩容:
      1. 当所有的桶都被迁移成功,原有的旧桶数组将被废弃,新的更大容量的桶数组成为主数组,扩容结束
    5. 扩容的过程中,如何保证读/写正常?
      1. 如果

        读写操作到达时,该桶还没有开始迁移,则直接在旧哈希表中读/写元素

      2. 如果

        读写操作到达时。检测到目标桶已经被标记为转发节点,则写操作会在新哈希表中进行相应的修改

  5. 为什么ConcurrentHashMap的key不允许为null?
    1. 防止在并发的场景中产生歧义