`
阅读更多
说明:本篇文章是在阅读《Java 并发编程艺术》过程中的一些笔记和分析,由于本人能力有限,如果有书写错误的地方,欢迎各位大佬批评指正!我们互相交流,学习,共同进步!

该项目的地址:https://github.com/xiaoheng1/concurrent-programming

1.Java 中的锁,值的是 Lock, 它和 synchronized 实现的功能类似,但是其更加的强大,比如说超时获取锁等. 世上没有两全的东西,越是功能强大
的东西,那么它的复杂性肯定会增加. Lock 和 synchronized 相比,synchronized 隐式的获取,释放锁,使用起来更加的方法,但是 Lock 必须
手动的获取和释放锁.

注意:**当使用 Lock 的时候,不要将获取锁的过程写在 try 中,因为获取锁失败(发生异常),在抛出异常的时候,会导致锁无故释放**.

如何理解了?

下面代码是从 ReentrantLock 中 copy 出来的.

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

当在 try 块中获取锁时发生异常(没有获取成功),会调用 finally 块中的释放锁,分为3中情况:
(1)其他线程获取了锁,当前线程获取锁失败,判断 Thread.currentThread() != getExclusiveOwnerThread() 成立,抛出异常.
(2)当前线程之前获取了锁,现在调用 unlock 方法会导致之前获取的锁无故释放.
(3)当前线程没有获取锁,exclusiveThread = null, Thread.currentThread() != getExclusiveOwnerThread() 成立,抛出异常.

2. Lock 和 synchronized 相比,具备哪些特性了?
(1)尝试非阻塞获取锁:当前线程尝试获取锁,如果这一时刻没有线程获取锁,则成功获取锁.
(2)支持超时获取锁
(3)支持中断获取锁

Java Lock 的实现,依赖于 AQS(AbstractQueuedSynchronizer). 下面分析下 AQS 是如何实现的.

同步器的实现,使用了模板模式:

先来关注下 AbstractOwnableSynchronizer 这个类吧. 这个抽象类定义了一个变量 exclusiveOwnerThread,用于标记获取锁的线程.

AbstractQueuedSynchronizer 继承 AbstractOwnableSynchronizer 类,在排他锁的时候,用到了 exclusiveOwnerThread.

**AQS 在实现的过程中,依赖内部的一个同步队列(一个FIFO 的双向队列, head & tail) 来完成同步状态的管理,当前线程获取同步状态失败时,同步
器会将当前线程以及等待状态等信息封装成为一个节点(Node),并加入到同步队列中,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程
唤醒,使其在此尝试获取同步状态**.
   
关于这段话其实包含了很多隐含的信息. 如果您看过我上篇《Java并发编程基础》的话,你会发现其实很多的设计思路是一样的. 比如说会用到等待/通知
机制等.

该 Node 类节选自 AQS.

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

下面说下该类中比较重要的属性以及设计思路.

waitStatus:
(1)CANCELLED 值为1,由于在同步队列中等待的线程**等待超时或者被中断**,需要从同步队列中取消等待,节点进入该状态将不会发生变化.
(2)SIGNAL 值为 -1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将通知后继节点,使后继节点的线程得以运行.
(3)CONDITION 值为 -2,节点在等待队列中,节点线程等待在 Condition 上,当其他线程对 Condition 调用 signal() 方法后,该节点将会
从等待队列转移到同步队列中,加入到对同步状态的获取中.
(4)PROPAGATE 中为 -3,表示下一次共享式同步状态获取将会无条件的被传播下去.
(5)INITIAL 值为 0,初始状态.

其实从上面的状态就可以看出,AQS 在设计之初就已经考虑到了共享和排他两种锁了,同时也考虑到了等待/通知机制了.

prev & next 指向 Node 节点,构成双向队列.

nextWaiter 等待队列中的后继节点. 如果当前节点是共享的,那么这个字段将是一个 SHARED 常量,也就是说节点类型(独占或共享)和等待队列
中的后继节点共用同一个字段.

thread 获取同步状态的线程.

现在来分析下独占锁的获取,释放逻辑:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这段代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作. 主要逻辑是:首先调用自定义同步器实现的
tryAcquire 方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有
一个线程成功获取同步状态)并通过 addWaiter(Node node) 方法将该节点加入到同步队列的尾部,最后调用 acquireQueued 方法,使得该节点
以 "死循环" 的方式获取同步状态. 如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现.

上面那句话,总结来说:
1.尝试获取锁,如果获取不到锁,入队,则自旋获取锁,如果还是没有获取到,则调用 LockSupport.part 阻塞当前线程.

在 acquireQueued 方法中,我们可以看到只有前驱节点是头结点才能够尝试获取同步状态?为什么了?
(1)头结点是成功获取到同步状态的节点,而头节点的线程释放了同步状态后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱
节点是否是头节点.
(2)维护同步队列的FIFO 原则.

下面来分析下 release 方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

**当前线程获取同步状态并执行了相应的逻辑之后,使的后续节点能够继续获取同步状态,通过调用同步器的 release 方法可以释放同步状态,该方法
在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)**.

这段话现在不理解不要紧,后面我会把 acquire 和 release 方法结合起来加以说明.

其实 Lock 中独占锁的获取和释放是这么实现的.

1.当前线程 acquire 方法尝试获取锁,如果获取成功,则没有其他入队之说,直接执行当前线程的业务逻辑.
2.如果没有获取成功,那么事就多了,首先将该节点入队(同步队列),插入到队尾. 入队后,其实不是立马进行阻塞,而是先回自旋一会,为啥这么
设计了?假设获取锁的线程执行立马结束了,那么入队的节点是不是就没必要进行阻塞(阻塞和唤醒很耗时间的)?还就就是获取锁的一个一个来,也就是
说,离头结点越近的节点先获取锁,所以队首节点在还没有阻塞的时候,疯狂尝试获取锁,但是也不能一直这样吧?因为这样会很耗CPU资源,是在获取
不到,那么该阻塞,还是的阻塞.
3.接下来说下 shouldParkAfterFailedAcquire 这个方法. 这个方法怎么说了?shouldParkAfterFailedAcquire 中有这么一段判断:

if (ws == Node.SIGNAL)
    /*
     * This node has already set status asking a release
     * to signal it, so it can safely park.
     */
    return true;
   
这个如何理解了?我们知道,第一次进来的时候,不会触发该判断,直接是将初始状态置为 Node.SIGNAL. 那上面怎么理解了?我们知道 SIGNAL
是说后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将通知后继节点,使后继节点的线程得以运行. 如果说当前
节点的前驱节点就已经是 SIGNAL 了,那么本节点是不是可以放心的阻塞了(反正我的前驱节点会通知我)?

4.如果自旋一会还是获取同步状态,那么就调用 LockSupport.park 进行阻塞.

下面在来说下 release 方法:

1.调用 tryRelease 方法尝试释放同步状态,如果成功,则调用 unparkSuccessor(node) 方法唤醒阻塞的线程.


3.共享式同步状态获取

共享式同步状态获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态.

下面看下 acquireShared 方法.

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

当 tryAcquireShared 方法返回值 >= 0 时,表示能够获取到同步状态. 如果该方法返回一个负数,那么就是没有成功获取到锁,没有成功获取到
锁,那么就要自旋了,现在看下 doAcquireShared 方法. 和排他锁一样,先是节点入队(SHARE Node). 然后自旋获取锁,如果获取成功了,则
调用 setHeadAndPropagate 方法. 这里和独占锁是不一样的,独占锁是直接设置 head 即可. 但是共享锁,正如字面意思,可以支持多个线程同时
访问,所以,当你获取锁了,你不是可以向下传播下,唤醒更多的线程获取锁了?所以有时候说 JDK 的代码写的是真的精炼.setHeadAndPropagate
方法最终会调用 doReleaseShared 方法. 如果没有获取成功,则自我阻塞,和排他锁一样.

下面看下 releaseShared 方法:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

我们直接关注下 doReleaseShared 方法是如何做的.

private void doReleaseShared() {
    for (;;) {
        //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
        //其实就是唤醒上面新获取到共享锁的节点的后继节点
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //表示后继节点需要被唤醒
            if (ws == Node.SIGNAL) {
                //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;     
                //执行唤醒操作     
                unparkSuccessor(h);
            }
            //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;               
        }
        //如果头结点没有发生变化,表示设置完成,退出循环
        //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
        if (h == head)                  
            break;
    }
}

我们看到这段代码时,是不是很懵?其实这段代码要结合 setHeadAndPropagate 这个方法看就很明白了. 当头结点的状态是 SIGNAL,就表明
后续节点需要唤醒,调用 unparkSuccessor. 当状态为初始状态时,则修改当前状态为转播.

然后我们回过头来看下 setHeadAndPropagate 方法:

//两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; //记录当前头节点
    //设置新的头节点,即把当前获取到锁的节点设置为头节点
    //注:这里是获取到锁之后的操作,不需要并发控制
    setHead(node);
    //这里意思有两种情况是需要执行唤醒操作
    //1.propagate > 0 表示调用方指明了后继节点需要被唤醒
    //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
        //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
        if (s == null || s.isShared())
            //后面详细说
            doReleaseShared();
    }
}

当后继节点竞争到锁时,会调用 setHeadAndPropagate 方法. 这个方法会继续调用 doReleaseShared 唤醒后继节点.
    

关于中断获取锁以及超时获取锁和这大同小异,就不再分析了.


现在借助 AQS,我们自己实现一个锁.

实现一个锁,需要覆盖 tryAcquireXXX 和 tryReleaseXXX 方法.

public class TwinsLock implements Lock{
    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count){
            if(count <=0 ) throw new IllegalStateException("count must be > 0");
            setState(count);
        }
        @Override
        protected int tryAcquireShared(int arg) {
            for(;;){
                int current = getState();
                int newCount = current - arg;
                if(newCount < 0 || compareAndSetState(current, newCount)) return newCount;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (;;){
                int current = getState();
                int newCount = current + arg;
                if(compareAndSetState(current, newCount)) return true;
            }
        }
    }

    public void lock() {
        sync.acquireShared(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        throw new RuntimeException("not support.");
    }

    public boolean tryLock() {
        throw new RuntimeException("not support.");
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void unlock() {
        sync.tryReleaseShared(1);
    }

    public Condition newCondition() {
        return null;
    }
}


5.重入锁
重入锁的含义是以获取锁的线程,再次访问临界资源的时候,不会被锁在外面,仅仅是 state + 1.
下面说下 ReentrantLock 是怎么实现的.
首先说一下 ReentrantLock 是一个重入锁,那它是如何实现的了?看如下代码

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

从这两个方法就可以看出,ReentrantLock 是如何实现可重入的,使用到了一个变量 exclusiveOwnerThread,用这个变量记录获取锁的线程,
那么下次一个线程过来获取锁的时候,判断下是否是同一个线程,如果不是,则返回 false,如果是,则 state + 1. 其实这个和 synchronized
的实现有那么一丢丢相似,synchronized 同样会在对象头中存放获取锁的线程,下次来的时候,先判断下来实现可重入.

然后说下 ReentrantLock 的公平获取和非公平获取.

公平性是针对获取锁而言的,如果说一个锁是公平的,那么它获取锁的顺序就是先来先得,对于非公平锁,只要线程能够设置成功state即可,才不管
你是不是先来了.

由于要保证公平性,所以公平锁的性能没有非公平锁那么高(TPS),因为公平锁要入队,出队,而非公平锁是只要来一个线程,先不管三七二十一,设置
状态再说,没设置成功,才入队.但是非公平锁可能造成线程饥饿.


6.读写锁

读写锁是说在同一时刻,允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均不可被访问.

如何实现了?

我们知道 AQS 中有一个 32 位的 state 字段. 现在用高 16 位表示读,低 16 位表示写.

线程进入读锁的条件:

1.**没有其他线程的写锁**
2.**没有写请求或者有写请求,但是调用线程和持有锁的线程是同一个线程**

线程进入写锁的条件:

1.**没有其他线程的读锁**
2.**没有其他线程的写锁**

重点分析下 tryRelease & tryAcquire 方法:
(1)tryRelease 方法当排他锁数量为 0 时,则设置没有线程获取锁.
(2)tryAcquire 方法,当 c != 0 && w == 0 说明有共享锁被持有了,如果current != getExclusiveOwnerThread()则说明有其他线程
持有写锁了,则获取锁失败.

tryReleaseShared & tryAcquireShared 方法:
(1)tryReleaseShared 当存在排他锁的时候,并且排他线程不等于当前线程时,获取失败.

tryWriteLock 方法:和 tryAcquire 方法实现很类似.

tryReadLock 方法:和 tryAcquireShared 方法实现很类似.

然后在看下 ReadLock & WriteLock.

ReadLock & WriterLock 只是简单的对 Sync 进行了包装.

最关键的是如下总结(包含了读写锁的设计思路):

1.**没有其他线程的写锁**
2.**没有写请求或者有写请求,但是调用线程和持有锁的线程是同一个线程**

线程进入写锁的条件:

1.**没有其他线程的读锁**
2.**没有其他线程的写锁**


7.LockSupport 工具

我们知道 Lock 中就使用到了 LockSupport 来阻塞和唤醒线程.
(1)park() 阻塞当前线程,如果调用 unpark(Thread thread) 方法或者当前线程被中断,才能从 park() 方法返回.
(2)parkNanos(time) 阻塞当前线程,最长不超过 time,在 park() 方法的基础上增加了超时返回.
(3)parkUntil(deadline) 阻塞当前线程,直到 deadline.
(4)unpark(Thread thread) 唤醒处于阻塞状态的线程 thread.


8. Condition 接口

我们知道,Java 中可以使用 wait 和 notify 来实现线程间的通讯,同样的 condition 可以配合 lock 实现线程间的通讯.

那么 condition 是如何实现的了?

每个 Condition 对象都包含着一个队列(等待队列),该队列是 Condition 对象实现等待/通知的关键.

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

说下 Condition 实现的大概思路:

Lock 配合 Condition 实现等待通知机制其实和 synchronized + wait/notify 实现等待通知机制是差不多的. condition 在实现等待通知
机制的时候,有一个同步队列(竞争锁和多个等待队列), 当调用 condition.wait() 方法后,获取锁的节点会转移到等待队列,释放锁并并唤醒后继
节点,然后阻塞自己,等待被唤醒(signal).
当其他线程调用 signal 方法时,会唤醒等待队列中的 first 节点,将该节点加入到同步队列的尾部,唤醒被阻塞的线程. 然后就回到了 wait 方法
阻塞那去了,阻塞在 wait 处的线程被唤醒后,它已经从等待队列转移到了同步队列,这时候,它以一个正常的 AQS 节点抢占锁,并清除取消节点.


参考:https://www.jianshu.com/p/1161d33fc1d0
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics