JAVA并发编程中AQS核心机制与多种同步器实现原理解析

JAVA并发编程中AQS核心机制与多种同步器实现原理解析

大家好,今天我们来深入探讨Java并发编程中的一个核心组件:AbstractQueuedSynchronizer (AQS),以及基于AQS构建的各种同步器。AQS是构建锁和其他同步工具的基础框架,理解AQS的原理对于编写高效、可靠的并发程序至关重要。

1. AQS:并发的基石

AQS,即抽象队列同步器,是一个用于构建锁和相关同步器的框架。它提供了一个FIFO队列来管理竞争同步状态的线程,并提供了一套模板方法供子类实现特定的同步语义。AQS自身并没有实现任何同步接口,而是定义了同步器应该如何工作,具体的同步语义由其子类来实现。

AQS的核心思想是:

  • 同步状态(State): AQS维护一个volatile int类型的state变量,用于表示同步状态。这个状态的具体含义由子类来定义,例如,在ReentrantLock中,state为0表示锁空闲,大于0表示锁被持有。
  • FIFO队列(CLH队列): 当线程竞争同步状态失败时,AQS会将这些线程加入到一个FIFO队列中,等待被唤醒。这个队列实际上是一个CLH(Craig, Landin, and Hagersten)变体的队列。
  • 模板方法: AQS提供了一系列模板方法,子类需要实现这些方法来定义如何获取和释放同步状态。这些方法包括tryAcquiretryReleaseisHeldExclusively等。

2. AQS的核心数据结构

AQS主要涉及以下几个核心数据结构:

  • state (volatile int): 代表同步状态。其值取决于同步器的具体语义。
  • head (volatile Node): 指向队列的头部节点,代表当前持有锁的线程或者即将获取锁的线程。
  • tail (volatile Node): 指向队列的尾部节点,新加入的线程会被添加到队列尾部。
  • Node: 代表等待队列中的一个线程。包含以下主要属性:

    • waitStatus (volatile int): 表示节点的状态。可能的状态包括:
      • CANCELLED (1): 表示线程获取锁的请求被取消。
      • SIGNAL (-1): 表示当前节点的后继节点需要被唤醒。
      • CONDITION (-2): 表示节点在条件队列中等待。
      • PROPAGATE (-3): 表示释放锁的操作需要传播到后续节点。
      • 0: 表示节点处于默认状态,等待被唤醒。
    • thread (Thread): 代表等待获取锁的线程。
    • prev (Node): 指向前驱节点。
    • next (Node): 指向后继节点。
    • nextWaiter (Node): 指向条件队列中的下一个节点(用于ConditionObject)。

3. AQS的工作流程

AQS的工作流程大致如下:

  1. 线程尝试获取同步状态: 线程调用同步器的acquire方法,该方法会调用AQS的tryAcquire模板方法。
  2. tryAcquire的实现: 子类需要实现tryAcquire方法来尝试获取同步状态。如果获取成功,tryAcquire返回true,线程继续执行。如果获取失败,tryAcquire返回false。
  3. 加入等待队列: 如果tryAcquire返回false,表示获取同步状态失败,线程会被封装成一个Node节点,并加入到AQS的等待队列(CLH队列)的尾部。
  4. 阻塞等待: 加入队列后,线程会进入阻塞状态,等待被唤醒。
  5. 唤醒: 当持有同步状态的线程释放同步状态时,会调用同步器的release方法,该方法会调用AQS的tryRelease模板方法。
  6. tryRelease的实现: 子类需要实现tryRelease方法来释放同步状态。如果释放成功,tryRelease返回true,AQS会唤醒等待队列中的一个或多个线程。
  7. 唤醒后重新尝试获取: 被唤醒的线程会再次尝试获取同步状态,如果获取成功,则退出等待队列,继续执行。如果获取失败,则重新进入阻塞状态。
  8. 中断处理: 如果线程在等待过程中被中断,AQS会处理中断请求,并将线程从等待队列中移除。

4. AQS的核心方法

AQS提供了一系列核心方法,供子类使用:

  • getState(): 获取当前同步状态。
  • setState(int newState): 设置当前同步状态。
  • compareAndSetState(int expect, int update): 使用CAS操作原子性地更新同步状态。
  • acquire(int arg): 以独占模式获取同步状态,忽略中断。
  • acquireInterruptibly(int arg): 以独占模式获取同步状态,响应中断。
  • tryAcquireNanos(int arg, long nanosTimeout): 以独占模式获取同步状态,响应中断,并支持超时。
  • release(int arg): 以独占模式释放同步状态。
  • acquireShared(int arg): 以共享模式获取同步状态,忽略中断。
  • acquireSharedInterruptibly(int arg): 以共享模式获取同步状态,响应中断。
  • tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式获取同步状态,响应中断,并支持超时。
  • releaseShared(int arg): 以共享模式释放同步状态。
  • isHeldExclusively(): 判断当前线程是否独占式持有同步状态。

5. AQS的模板方法

子类需要实现以下模板方法来定义同步器的具体行为:

  • tryAcquire(int arg): 尝试以独占模式获取同步状态。成功返回true,失败返回false。
  • tryRelease(int arg): 尝试以独占模式释放同步状态。成功返回true,失败返回false。
  • tryAcquireShared(int arg): 尝试以共享模式获取同步状态。成功返回非负数,失败返回负数。
  • tryReleaseShared(int arg): 尝试以共享模式释放同步状态。如果释放成功,并且允许后续共享模式的线程获取同步状态,则返回true,否则返回false。
  • isHeldExclusively(): 判断当前线程是否独占式持有同步状态。

6. 基于AQS的同步器实现

Java并发包中提供了许多基于AQS实现的同步器,例如:

  • ReentrantLock: 可重入锁,支持独占模式。
  • ReentrantReadWriteLock: 可重入读写锁,支持共享模式(读锁)和独占模式(写锁)。
  • Semaphore: 信号量,用于控制同时访问特定资源的线程数量。
  • CountDownLatch: 倒计时门闩,允许一个或多个线程等待其他线程完成操作。
  • CyclicBarrier: 循环栅栏,允许一组线程互相等待,直到所有线程都到达某个屏障点。
  • FutureTask: 代表一个异步计算的结果。

7. ReentrantLock源码分析

ReentrantLock是一个可重入的独占锁,它基于AQS实现。下面我们分析ReentrantLock的关键源码:

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class ReentrantLock implements Lock, java.io.Serializable {

    private static final long serialVersionUID = -607776692572029626L;

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}.  The main reason for using
         * separate methods is to allow limiting scope of final
         * variables.
         */
        abstract void lock();

        /**
         * Returns {@code true} if this sync is owned exclusively by
         * the given thread.  This is used to implement {@link
         * Condition#isHeldExclusively}.
         *
         * @param thread the thread
         * @return {@code true} if this sync is owned exclusively by
         *         the given thread
         */
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    /**
     * Synchronization control for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -300089878970904665L;

        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    /**
     * Synchronization control for nonfair locks.
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 731615356378235015L;

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * Performs all needd internal locking for both fair and nonfair
     * versions.
     * @param acquires the number of reconcursions of acquire. Decreases
     *          until becomes 0.
     * @return {@code true} if acquired.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * Creates an instance of {@code ReentrantLock} with the
     * given fairness policy.
     *
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    /**
     * Acquires the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and returns
     * immediately, setting the lock hold count to one.
     *
     * <p>If the current thread already holds this lock, then the hold
     * count is incremented and the method returns immediately.
     *
     * <p>If the lock is held by another thread, then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until the lock has been acquired, at which time the lock
     * hold count is set to one.
     *
     * @throws InterruptedException if the current thread is
     *         interrupted while waiting to acquire the lock
     */
    public void lock() {
        sync.lock();
    }

    /**
     * Attempts to acquire the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and
     * returns immediately with the value {@code true}, setting the lock hold
     * count to one. Even when this lock has been set to use a
     * fair ordering policy, a call to {@code tryLock()} <em>will</em>
     * immediately acquire the lock if it is available, whether or not
     * waiting threads are fair ordering policy. This
     * "barging" behavior can be useful in certain
     * circumstances, even though it breaks fairness. If you want to honor
     * the fairness setting for this lock, then use
     * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) }
     * which is almost equivalent (it also detects interruption).
     *
     * <p>If the current thread already holds this lock then the hold
     * count is incremented and the method returns {@code true}.
     *
     * <p>If the lock is held by another thread then this method will return
     * immediately with the value {@code false}.
     *
     * @return {@code true} if the lock was acquired
     *         {@code false} otherwise
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    /**
     * Releases the lock.
     *
     * <p>If the current thread is the holder of this lock, then the hold
     * count is decremented.  If the hold count is now zero, then the
     * lock is released.  If the current thread is not the holder of this
     * lock, then {@link IllegalMonitorStateException} is thrown.
     *
     * <p>Release of the lock happens in one of two ways:
     *
     * <ul>
     *
     * <li> If the current thread is the holder of this lock and the hold
     * count is decremented to zero, then the lock is released.
     *
     * <li> If the current thread is not the holder of this lock, then
     * {@link IllegalMonitorStateException} is thrown.
     *
     * </ul>
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() {
        sync.release(1);
    }

    /**
     * Returns a {@link Condition} instance for use with this {@link
     * Lock} instance.
     *
     * <p>The returned {@link Condition} instance supports the same
     * usages as do the {@link Object} monitor methods ({@link
     * Object#wait() wait}, {@link Object#notify() notify} and {@link
     * Object#notifyAll() notifyAll}) when used with the built-in
     * monitor lock.
     *
     * <ul>
     *
     * <li>If this lock is not held when any of the {@link
     * Condition} {@linkplain #await() waiting} methods are called,
     * then an {@link IllegalMonitorStateException} is thrown.
     *
     * <li>When the {@link Condition} {@linkplain #signal() signaling}
     * methods are called, then the current thread must be the holder
     * of the lock.
     *
     * </ul>
     *
     * <p>The returned {@link Condition} instance supports the same
     * usages as do the {@link Object} monitor methods ({@link
     * Object#wait() wait}, {@link Object#notify() notify} and {@link
     * Object#notifyAll() notifyAll}) when used with the built-in
     * monitor lock.
     *
     * @return a {@link Condition} instance for use with this {@link
     *         Lock} instance
     * @throws UnsupportedOperationException if this {@link Lock}
     *         implementation does not support conditions
     */
    public Condition newCondition() {
        return sync.newCondition();
    }

    /**
     * Queries the number of holds on this lock by the current thread.
     *
     * <p>A thread has a hold on a lock for each lock action that is not
     * matched by an unlock action.
     *
     * <p>The hold count information is only available for locks which are
     * held by the current thread. When the lock is not held by the
     * current thread, this method returns zero.
     *
     * <p>This operation is intended for use in monitoring of the lock and
     * is not intended for use in synchronization control.
     *
     * @return the number of holds on this lock by the current thread,
     *         or zero if this lock is not held by the current thread
     */
    public int getHoldCount() {
        return sync.getHoldCount();
    }

    /**
     * Queries if this lock is held by the current thread.
     *
     * <p>Analogous to a call to {@link #getHoldCount()}{@code  > 0}.
     *
     * @return {@code true} if this lock is held by the current thread,
     *         and {@code false} otherwise
     */
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    /**
     * Queries if this lock is held by any thread. This method is
     * designed for use in monitoring of the system state, not for
     * synchronization control.
     *
     * @return {@code true} if this lock is held by any thread,
     *         and {@code false} otherwise
     */
    public boolean isLocked() {
        return sync.isLocked();
    }

    /**
     * Returns {@code true} if this lock implements fairness.
     *
     * @return {@code true} if this lock implements fairness
     */
    public final boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * Return the thread that currently owns this lock, or
     * {@code null} if not owned. When this method is called by a
     * thread that is not the owner, the return value reflects a
     * "best-effort" snapshot of the current owner. The returned value
     * is not guaranteed to be accurate, and may not reflect subsequent
     * changes in ownership. This method is designed for use in
     * monitoring of the system state, not for synchronization
     * control.
     *
     * @return the owner, or {@code null} if not owned
     */
    protected Thread getOwner() {
        return sync.getExclusiveOwnerThread();
    }

    /**
     * Queries whether any threads are waiting to acquire this lock.
     * Note that because cancellations due to interrupts and timeouts
     * may occur at any time, a {@code true} return does not guarantee
     * that any thread will ever acquire this lock.  This method is
     * designed primarily for use in monitoring of the system state.
     *
     * @return {@code true} if there may be waiting threads
     */
    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * Queries whether the given thread is waiting to acquire this
     * lock. Note that because cancellations due to interrupts and
     * timeouts may occur at any time, a {@code true} return does not
     * guarantee that this thread will ever acquire this lock.  This
     * method is designed primarily for use in monitoring of the
     * system state.
     *
     * @param thread the thread
     * @return {@code true} if the given thread is queued waiting for this lock
     * @throws NullPointerException if the thread is null
     */
    public boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }

    /**
     * Returns an estimate of the number of threads waiting to acquire
     * this lock.  The value is only an estimate because the actual
     * number of threads may change dynamically while this method
     * traverses internal data structures.  This method is designed
     * for use in monitoring of the system state, not for
     * synchronization control.
     *
     * @return the estimated number of threads waiting for this lock
     */
    public int getQueueLength() {
        return sync.getQueueLength();
    }

    /**
     * Returns a collection containing threads that may be waiting to
     * acquire this lock.  Because the actual set of threads may change
     * dynamically while constructing this collection, the returned
     * collection is only a best-effort estimate.  The elements of the
     * returned collection are in no particular order.  This method is
     * designed primarily for use in monitoring of the system state.
     *
     * @return the collection of threads
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    /**
     * Queries whether any threads are waiting on the given condition
     * associated with this lock. Note that because timeouts and
     * interrupts may occur at any time, a {@code true} return does not
     * guarantee that a future {@link Condition#signal} will awaken
     * any particular thread. This method is designed primarily for use
     * in monitoring of the system state.
     *
     * @param condition the condition
     * @return {@code true} if there are any waiting threads
     * @throws IllegalMonitorStateException if this lock is not held
     * @throws IllegalArgumentException if the given condition is
     *         not associated with this lock
     * @throws NullPointerException if the condition is null
     */
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    /**
     * Returns an estimate of the number of threads waiting on the
     * given condition associated with this lock. Note that because
     * timeouts and interrupts may occur at any time, the estimate
     * serves only as an upper bound on the actual number of waiters.
     * This method is designed for use in monitoring of the system
     * state, not for synchronization control.
     *
     * @param condition the condition
     * @return the estimated number of waiting threads
     * @throws IllegalMonitorStateException if this lock is not held
     * @throws IllegalArgumentException if the given condition is
     *         not associated with this lock
     * @throws NullPointerException if the condition is null
     */
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    /**
     * Returns a collection containing those threads that may be
     * waiting on the given condition associated with this lock.
     * Because the actual set of threads may change dynamically while
     * constructing this collection, the returned collection is only a
     * best-effort estimate. The elements of the returned collection
     * are in no particular order.  This method is designed primarily
     * for use in monitoring of the system state.
     *
     * @param condition the condition
     * @return the collection of threads
     * @throws IllegalMonitorStateException if this lock is not held
     * @throws IllegalArgumentException if the given condition is
     *         not associated with this lock
     * @throws NullPointerException if the condition is null
     */
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    /**
     * ReentrantLock的内部类,用于实现公平锁和非公平锁
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * 尝试获取锁,由子类实现
         */
        abstract void lock();

        /**
         * 判断当前线程是否独占式持有锁
         */
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        /**
         * 创建一个与此锁关联的Condition对象
         */
        final ConditionObject newCondition() {
            return new ConditionObject(this);
        }

        /**
         * 获取当前线程持有锁的次数
         */
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        /**
         * 判断锁是否被锁定
         */
        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * 获取独占式持有锁的线程
         */
        final Thread getExclusiveOwnerThread() {
            return getExclusiveOwnerThread();
        }

        /**
         * 判断是否有线程在等待获取锁
         */
        final boolean hasQueuedThreads() {
            return hasQueuedThreads();
        }

        /**
         * 判断给定的线程是否在等待获取锁
         */
        final boolean hasQueuedThread(Thread thread) {
            return isQueued(thread);
        }

        /**
         * 获取等待获取锁的线程数
         */
        final int getQueueLength() {
            return getQueueLength();
        }

        /**
         * 获取等待获取锁的线程集合
         */
        final Collection<Thread> getQueuedThreads() {
            return getQueuedThreads();
        }

        /**
         * 判断是否有线程在等待给定的Condition
         */
        final boolean hasWaiters(ConditionObject condition) {
            if (condition == null)
                throw new NullPointerException();
            return hasWaiters(condition);
        }

        /**
         * 获取等待给定Condition的线程数
         */
        final int getWaitQueueLength(ConditionObject condition) {
            if (condition == null)
                throw new NullPointerException();
            return getWaitQueueLength(condition);
        }

        /**
         * 获取等待给定Condition的线程集合
         */
        final Collection<Thread> getWaitingThreads(ConditionObject condition) {
            if (condition == null)
                throw new NullPointerException();
            return getWaitingThreads(condition);
        }
    }

    /**
     * 公平锁的实现
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -300089878970904665L;

        /**
         * 获取锁
         */
        final void lock() {
            acquire(1);
        }

        /**
         * 尝试获取锁,如果队列中有等待线程,则不获取
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    /**
     * 非公平锁的实现
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 731615356378235015L;

        /**
         * 获取锁,尝试直接获取锁,如果获取失败,则进入队列等待
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        /**
         * 尝试获取锁
         */
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * 非公平尝试获取锁
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    /**
     * 释放锁
     */
    public void unlock() {
        sync.release(1);
    }

    /**
     * 尝试释放锁
     */
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    public boolean isFair() {
        return sync instanceof FairSync;
    }

}
  • Sync抽象类: ReentrantLock内部定义了一个Sync抽象类,继承自AQS。Sync类定义了lock()方法,并实现了isHeldExclusively()方法。
  • FairSync和NonfairSync: Sync类有两个子类,分别是FairSync和NonfairSync,分别对应公平锁和非公平锁。
  • tryAcquire(int acquires): FairSync和NonfairSync都实现了tryAcquire方法。公平锁在tryAcquire方法中会先判断是否有其他线程在等待,如果有,则不获取锁,直接返回false,保证公平性。非公平锁则会直接尝试获取锁,如果获取成功,则返回true,否则返回false。
  • release(int releases): ReentrantLock的unlock方法调用了sync.release(1)。Sync类重写了AQS的tryRelease方法。tryRelease方法会将state减1,如果state减为0,表示锁被释放,则将exclusiveOwnerThread设置为null,并返回true。

8. Semaphore源码分析

Semaphore 信号量是用来控制同时访问特定资源的线程数量,它也基于AQS实现。


import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.Semaphore;

public class MySemaphore {

    private final Sync sync;

    /**
     * Synchronization mechanism for semaphore.  Uses AQS state
     * to represent permits. Subclass manages fairness.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
    }

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected final int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected final int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

     /**
      * Creates a {@code Semaphore} with the given number of
      * permits and nonfair fairness setting.
      *
      * @param permits the initial number of permits available.
      *        This value may be negative, in which case releases
      *        must occur before any acquires will be granted.
      */
    public MySemaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the indicated fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true}

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注