Java锁事之Unsafe、CAS、AQS知识点总结

原创 吴就业 138 0 2020-02-13

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://wujiuye.com/article/be8f9499dee54d789c2d28fa3d22cf3c

作者:吴就业
链接:https://wujiuye.com/article/be8f9499dee54d789c2d28fa3d22cf3c
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2020年02月13日,从公众号同步过来(博客搬家),本篇为原创文章。

Unsafe、CAS、AQS是我们了解Java中除synchronized之外的锁必须要掌握的重要知识点。CAS是一个比较和替换的原子操作,AQS的实现强依赖CAS,而在Java中,CAS操作需通过使用Unsafe提供的方法实现。

sun.misc.Unsafe

Java不能像C++中那样可以自己申请内存和释放内存,想要实现直接读取某内存地址中存储的数据,就必须要通过JNI调用C/C++方法。Java中的Unsafe类正是为我们提供了类似C++手动管理内存的能力。

Unsafe类实现了很多功能,如Volatile读写、直接内存操作、获取字段在对象中的偏移地址、线程调度、内存屏障。

Unsafe类是”final”的,不允许继承,且构造函数是private的,无法实例化。如果我们想使用Unsafe提供的功能,就必须要使用反射去获取Unsafe实例。

public static Unsafe getUnsafe() throws Exception{
    Field field = Unsafe.class.getDeclaredField("theUnsafe");
    field.setAccessible(true);
    Unsafe unsafe = (Unsafe) field.get(null);
    return unsafe;
}

如果想通过Unsafe自己实现一个锁,那么我们需要关心两个方法,一个是获取字段在对象中的偏移地址的方法,另一个则是CAS方法。使用Unsafe的objectFieldOffset方法可获取字段在对象中的偏移地址。

Field field = MyLock.class.getDeclaredField("state");
//获取字段在对象实例化之后的对象内偏移地址(对象头+偏移地址=字段地址)
long fieldOffsetAddress = unsafe.objectFieldOffset(field);

Conmpare And Swap

Java中Unsafe提供的compareAndSwapXXX方法,第一个参数是要修改的对象,第二个参数是要修改的字段的偏移地址,第三个参数是期望当前内存中存储的值,第四个参数是想要写入的新值。当且仅当期望值与当前内存值相等时,写入成功。

// 字段类型为引用类型
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
// 字段类型为int类型
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
// 字段类型为long类型
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

每次调用CAS之前都需要先获取一次当前内存中的最新值,作为期望值。字段需要使用volatile关键字声明,确保字段的可见性,能够获取到因被其它线程修改的最新值。

Unsafe提供的CAS方法底层是通过汇编指令cmpxchg实现的,cmpxchg指令实现原子性比较替换操作。

// exchange_value:改变值,新值
// compare_value:比较值,期望值
// dest: 内存地址
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  int mp = os::is_MP();
  // c语言中嵌入汇编代码实现
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}

通过CAS可以实现乐观锁。先通过CAS尝试修改共享资源,当发现别人在修改时,再去加锁,通过自旋,直到CAS修改成功。悲观锁的定义是,总认为别人会修改,因此先上锁再修改。

synchronized、Lock都是悲观锁。虽然Lock是基于AQS实现的,而AQS使用CAS实现加锁,但使用Lock都只能是先调用lock方法获取锁才能去修改共享资源,使用完后必须调用unlock释放锁,因此Lock也是悲观锁。

CAS会存在 ABA问题。如线程1将值由A改为B后,线程3又将B改为A,由于线程2在线程1修改之前将获取到的当前值A作为期望值时,所以当线程1将B改为A后,线程2无需重新获取期望值CAS也能操作成功,于是又将A改为C。出现这种问题都是由于线程调度引起的。

例如链表,用CAS修改链表的表头,那么ABA问题将导致修改后的链表不是预期的链表。

cas修改链表表头的ABA问题

AbstractQueuedSynchronizer

AQS是一个抽象类,提供实现锁的模板方法,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器。AQS屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。AQS提供独占式获取与释放同步状态、共享式获取与释放同步状态操作。

AQS内部维护一个volatile修饰的整型变量state,称为同步状态,且维护一个获取同步状态的等待队列,是一个双向链表。获取与释放锁其实就是获取与释放同步状态state。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    /**
     * 同步状态
     */
      private volatile int state;
    /**
     * 等待队列的头节点
     */
    private transient volatile Node head;


    /**
     * 等待队列的尾节点
     */
    private transient volatile Node tail;
  }

以独占式获取或释放同步状态为例。当state为0时,表示当前没有任何线程占用锁,当有线程想要获取锁时,就通过CAS将state增加1。由于使用volatile确保线程的可见性,其它线程能够读到这个状态的改变。当有其它线程同时想要获取锁时,发现state不为0就会将当前线程封装为Node节点加入等待队列,通过自旋获取锁。

线程被挂起后,当前驱节点释放锁时,会唤醒其后继节点,继而使后继节点重新尝试获取锁。线程的挂起与唤醒是通过LockSupport实现的,而LockSupport也是通过Unsafe实现的。

获取锁的过程

图片来源于《Java并发编程的艺术》

AQS还有一个字段,保存当前持有锁的线程,也是用于实现可重入锁的关键,当state不为0,且当前持有锁的线程是自己时,就将state加1,成功获取锁,获取多少次锁就对应要调用多少次释放锁,将state减为0时才会真正的释放锁。

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    /**
     * 独占式获取到同步状态的线程
     */
    private transient Thread exclusiveOwnerThread;
}

ReentrantLock

ReentrantLock有一个带参数的构造方法,可以指定创建公平锁还是非公平锁。

如果是公平锁,则每次调用lock方法,先判断当前是否存在其它线程等待获取锁,如果是就将当前线程包装为Node放入等待队列,实现谁先来谁先获取到锁。

公平锁的lock方法、tryAcquire方法的实现

如果是非公平锁,则每次调用lock方法都会先CAS尝试获取锁,如果获取失败,再放入等待队列。

非公平锁的lock方法的实现

非公平锁的lock方法的实现

非公平锁与公平锁的代码实现区别很简单,非公平锁除了lock方法会先使用CAS尝试获取锁之外,tryAcquire方法的实现不会去判断当前是否已经有线程在等待获取锁,因此不管当前等待队列有多少线程在等待获取锁,只要CAS操作成功就能获取到锁。

非公平锁的tryAcquire方法的实现

非公平锁的tryAcquire方法的实现

非公平锁相比公平锁的优点是吞吐量更高,非公平锁会有更少的线程被挂起,缺点是会导致一些线程阻塞时间太长。

ConditionObject

AQS还实现了诸如synchronized的wait、notify的支持。AQS的内部类ConditionObject也维护了一个等待队列。

public class ConditionObject implements Condition{
        /** 条件队列的首节点 */
        private transient Node firstWaiter;
        /** 条件队列的最后一个节点 */
        private transient Node lastWaiter;
}

当外部获取到锁时调用await方法,会将当前线程从锁的等待队列中移出,并放入ConditionObject维护的条件等待队列,将线程挂起。当外部获取到锁的线程调用signal方法时,将条件等待队列中的第一个节点放入AQS的锁等待队列,并将线程唤醒。signalAll方法则是将当前条件等待队列中的所有节点按顺序放入AQS的锁等待队列。判断exclusiveOwnerThread是否等于当前线程可知当前线程是否持有锁。

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
condition.await();
condition.signal();

关于共享锁与排他锁

共享锁是当前有线程占用锁时,其它线程还能以共享方式获取到这个锁;排他锁是当前有线程占用锁时,其它线程不能再获取到锁。

最常用的读写锁ReentrantReadWriteLock也是通过AQS实现的,读锁也叫共享锁,写锁也叫排他锁。通过AQS的同步状态state判断是否持有锁,通过acquire、release实现独占式获取与释放同步状态,通过acquireShared和releaseShared实现共享式获取与释放同步状态。

static final class Node {
        // 标志当前节点是共享式节点
        static final Node SHARED = new Node();
        // 标志当前节点是独占式节点
        static final Node EXCLUSIVE = null;


        // ===========  当前节点状态 ============
        // 取消
        static final int CANCELLED =  1;  
        // 等待触发    
        static final int SIGNAL    = -1;
        // 条件等待
        static final int CONDITION = -2;
        // 只能是head节点被设置
        // (到目前为止,我也没看明白它的作用是什么)
        static final int PROPAGATE = -3;
        
        // 当waitStatus为0时表示当前节点是新创建的节点,出队列的节点、队尾的节点、刚从条件队列中进入等待队列中的节点,都处于这种状态
        volatile int waitStatus;
}

在将当前线程包装为Node节点放入等待队列时,都是调用addWaiter方法实现的。调用addWaiter需要传入一个参数,这个参数就标志这个节点是一个共享式节点还是独占式节点。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 先尝试快速入队
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 失败再调用enq方法自旋入队
        enq(node);
        return node;
    }

添加共享式节点:

addWaiter(Node.SHARED)

添加共享式节点

添加独占式节点:

addWaiter(Node.EXCLUSIVE)

添加独占式节点

共享式获取同步状态调用acquireShared方法。

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            // 尝试共享式获取同步状态失败则自旋获取
            doAcquireShared(arg);
}

先调用tryAcquireShared尝试获取同步状态,获取失败后再调用doAcquireShared方法,将当前线程包装为共享式节点放入等待队列,并自旋获取同步状态。

private void doAcquireShared(int arg) {
        // 将当前线程包装为共享式节点放入等待队列,
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 如果当前节点的前驱节点是头节点,才去尝试共享式获取同步状态
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取同步状态成功,将当前节点设置为头节点,并传递tryAcquireShared返回的值
                        setHeadAndPropagate(node, r);
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 当前节点的前驱节点p的waitStatus为SIGNAL,
                // 则调用parkAndCheckInterrupt挂起当前线程等待被唤醒。
                // shouldParkAfterFailedAcquire如果前驱节点p的waitStatus小于等于0,则waitStatus会被设置为SIGNAL
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

当前节点的前驱节点是头节点且获取同步状态成功后,调用setHeadAndPropagate方法将当前节点设置为头节点,并传递tryAcquireShared返回的值(ReentrantReadWriteLock的返回值是1)。

private void setHeadAndPropagate(Node node, int propagate) {
        // 记录旧的头节点
        Node h = head; 
        // 设置node为新的头节点
        setHead(node);
        // tryAcquireShared获取同步状态成功返回的propagate大于0
        if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
            // 判断当前节点的下一个节点是否是共享式节点,或者为空
            // 如果是,则调用doReleaseShared
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
}

setHeadAndPropagate方法将当前节点设置为头节点,并判断下一个节点是否也是共享式节点,或者下一个节点为null,如果是,则调用doReleaseShared方法唤醒后继节点。

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                // 获取当前头节点的waitStatus
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // cas将头节点的状态设置为0先,避免多个线程同时调用releaseShared方法(releaseShared中也调用doReleaseShared)
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    // 由cas将头节点的waitStatus成功设置为0的线程
                    // 负责唤醒头节点的后继节点
                    unparkSuccessor(h);
                }
                // 如果头节点的waitStatus为0,则继续调用CAS修改头节点的waitStatus为PROPAGATE
                // 如果waitStatus不为0或者CAS失败则继续循环
                // (到现在都没能看明白作用是什么,也翻了很多资料)
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; 
            }
            if (h == head)                   
                break;
        }
}

调用doReleaseShared方法唤醒后继节点。如果后继节点是独占式节点,则后继节点调用tryAcquire独占式获取同步状态不会成功,直到当前所有共享式节点都调用了releaseShared(releaseShared调用tryReleaseShared)释放同步状态。

多个共享式线程连续获取同步状态的过程:

  1. 第一个节点的线程获取到同步状态后,将自己设置为头节点,并唤醒其后继节点;
  2. 第二个节点的线程获取到同步状态后又将自己设置为头节点,自然上一个节点就被移出队列了,接着唤醒其后继节点;
  3. 如果被唤醒的节点为独占式节点,则由于同步状态不为0,还会自旋,调用parkAndCheckInterrupt将自己挂起;
  4. 最后一个共享式获取同步状态的线程调用tryReleaseShared方法释放同步状态时,会调用一次doReleaseShared将当前头节点(最后一个共享式获取同步状态的节点)的后继节点换醒。
  5. 独占式节点成功获取到同步状态。

唤醒后继独占式节点的线程不一定是最后一个共享式获取同步状态的线程,但头节点一定是最后一个共享式获取同步状态的节点。

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

使用Mybatis-Plus提高开发效率

使用`mybatis-plus`可以少写很多常用的`SQL`,通过继承`BaseMapper`使用,还可以动态拼接`SQL`。第一眼看到我还以为是`JPA`。

ElasticSearch高版本API的使用姿势

如何在`Java`项目中使用`elasticsearch-rest-high-level-client`。

64位JVM的Java对象头详解

在学习并发编程知识`synchronized`时,我们总是难以理解其实现原理,因为偏向锁、轻量级锁、重量级锁都涉及到对象头,所以了解`java`对象头是我们深入了解`synchronized`的前提条件。

Dubbo路由功能实现灰度发布及源码分析

灰度发布是实现新旧版本平滑过渡的一种发布方式,即让一部分服务更新到新版本,如果这部分服务没有什么问题,再将其它旧版本的服务更新。而实现简单的灰度发布我们可以使用版本号控制,每次发布都更新版本号,新更新的服务就不会调用旧的服务提供者。

JVM垃圾回收GC Root与安全点Safepoint

我看很多资料在介绍`GC Root`时,并没有说栈帧的操作数栈上引用的对象也是`GC Root`,包括我去翻阅《深入理解Java虚拟机》这本书也是一样。所以我才好奇。

服务提供者假死,记一次full gc问题排查

线上某服务一直运行很稳定,最近突然就`cpu`百分百,`rpc`远程调用全部失败,并走了`mock`逻辑。重启后,一个小时后问题又重现。于是`dump`线程栈信息,但不仔细看也看不出什么问题。于是就有了一番排查历程。