Java AQS的共享模式:如何利用tryAcquireShared()实现资源的并发访问控制

Java AQS 共享模式:构建并发访问控制的基石

大家好,今天我们深入探讨Java AQS(AbstractQueuedSynchronizer)的共享模式,以及如何利用tryAcquireShared()方法来实现资源的并发访问控制。AQS是Java并发包java.util.concurrent的核心基石,理解AQS对于构建高性能、高可靠的并发应用至关重要。

1. AQS 简介:并发同步的抽象框架

AQS是一个抽象类,它提供了一个框架,用于构建锁和相关的同步器。它通过维护一个同步状态(state,一个volatile int变量)和一个FIFO等待队列来实现同步机制。AQS定义了两种模式:

  • 独占模式(Exclusive Mode): 只有一个线程可以持有资源。例如,ReentrantLock
  • 共享模式(Shared Mode): 多个线程可以同时持有资源。例如,SemaphoreCountDownLatch

我们今天的重点是共享模式。

2. 共享模式的核心方法:tryAcquireShared()

在共享模式中,tryAcquireShared(int arg) 方法是核心。它的作用是尝试以共享模式获取同步状态。它返回一个int值,这个值决定了后续的操作:

  • 正数(>= 1): 表示获取成功,并且剩余资源允许后续线程继续获取。这个正数可以理解为剩余的可用许可数量。
  • 零(0): 表示获取成功,但后续线程无法继续获取。通常意味着资源已经耗尽或者达到了某种限制。
  • 负数(< 0): 表示获取失败。线程需要进入等待队列。

3. 共享模式的流程

共享模式的获取流程大致如下:

  1. 尝试获取: 线程调用tryAcquireShared(int arg)尝试获取资源。
  2. 判断结果:
    • 成功(正数或零): 线程获取到资源,继续执行。如果返回值是正数,会唤醒等待队列中排在前面的线程,让他们也尝试获取资源。
    • 失败(负数): 线程进入等待队列,等待被唤醒。
  3. 唤醒和重试: 当资源可用时,AQS会唤醒等待队列中的线程。被唤醒的线程会再次尝试tryAcquireShared(int arg)

4. 代码示例:基于 AQS 实现一个简单的信号量(Semaphore)

为了更好地理解tryAcquireShared(),我们通过一个简单的信号量实现来演示。信号量允许一定数量的线程同时访问共享资源。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class SimpleSemaphore {

    private final Sync sync;

    public SimpleSemaphore(int permits) {
        sync = new Sync(permits);
    }

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void release() {
        sync.releaseShared(1);
    }

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -32249322224590012L;

        Sync(int permits) {
            setState(permits); // 初始化同步状态为 permits
        }

        @Override
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available, remaining)) {
                    return remaining; // 返回剩余许可数量
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next)) {
                    return true;
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SimpleSemaphore semaphore = new SimpleSemaphore(3); // 允许3个线程同时访问

        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("Thread " + threadId + " is waiting to acquire.");
                    semaphore.acquire();
                    System.out.println("Thread " + threadId + " acquired.");
                    Thread.sleep(2000); // 模拟访问共享资源
                    System.out.println("Thread " + threadId + " releasing.");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            Thread.sleep(100); // 错开线程启动时间
        }
    }
}

代码解释:

  • SimpleSemaphore 类: 封装了信号量的逻辑。
  • Sync 类: 继承自 AbstractQueuedSynchronizer,实现了同步逻辑。
  • Sync(int permits) 构造函数: 初始化同步状态 statepermits,表示可用的许可数量。
  • tryAcquireShared(int acquires) 方法:
    • 在一个无限循环中尝试获取许可。
    • available = getState():获取当前可用的许可数量。
    • remaining = available - acquires:计算剩余的许可数量。
    • if (remaining < 0):如果剩余许可数量小于 0,表示获取失败,返回一个负数。
    • compareAndSetState(available, remaining):使用CAS操作尝试更新 state。如果更新成功,表示获取成功,返回剩余的许可数量 remaining
    • 如果CAS操作失败,表示有其他线程同时也在尝试获取许可,循环重试。
  • tryReleaseShared(int releases) 方法:
    • 在一个无限循环中尝试释放许可。
    • current = getState():获取当前的许可数量。
    • next = current + releases:计算释放后的许可数量。
    • compareAndSetState(current, next):使用CAS操作尝试更新 state。如果更新成功,表示释放成功,返回 true
    • 如果CAS操作失败,表示有其他线程同时也在尝试释放许可,循环重试。
  • acquire()release() 方法: 分别调用 acquireSharedInterruptibly()releaseShared(),这是AQS提供的方法,封装了线程入队和唤醒的逻辑。
  • main() 方法: 创建一个信号量,并启动 5 个线程来竞争资源。每个线程获取到信号量后,模拟访问共享资源,然后释放信号量。

运行结果分析:

运行这段代码,你会发现最多只有 3 个线程可以同时输出 "Thread X acquired.",因为我们创建的信号量的许可数量是 3。其他线程会阻塞在 semaphore.acquire() 方法中,直到有线程释放信号量。

5. tryAcquireShared() 的设计要点

  • CAS 操作: tryAcquireShared() 通常使用CAS(Compare and Swap)操作来原子性地更新同步状态 state。这是避免数据竞争的关键。
  • 自旋: 如果 CAS 操作失败,tryAcquireShared() 通常会进行自旋重试,直到成功或者确定无法获取资源。
  • 返回值: tryAcquireShared() 的返回值非常重要,它不仅指示了获取是否成功,还指示了后续线程是否可以继续获取资源。
  • 公平性: 默认情况下,AQS 是非公平的。这意味着即使有线程在等待队列中,新来的线程仍然可以尝试获取资源。如果需要公平性,可以考虑使用 FairSync,或者在 tryAcquireShared() 中加入判断逻辑,例如检查当前线程是否是队列的头部节点。

6. 共享模式的应用场景

共享模式在并发编程中有很多应用场景:

  • 资源池: 例如数据库连接池、线程池。可以使用信号量来控制同时连接数据库或执行任务的线程数量。
  • 流量控制: 限制对某个服务的并发请求数量,防止服务过载。
  • 读写锁: ReentrantReadWriteLock 中的读锁就是基于 AQS 共享模式实现的。多个线程可以同时持有读锁,但只有一个线程可以持有写锁。
  • 栅栏(CyclicBarrier): 允许一组线程互相等待,直到所有线程都到达某个点,然后所有线程才可以继续执行。

7. AQS 共享模式的优势

  • 灵活性: AQS 提供了一个高度灵活的框架,可以自定义同步逻辑。
  • 高性能: AQS 使用高效的 CAS 操作和等待队列来管理线程的同步和阻塞。
  • 可扩展性: AQS 可以很容易地扩展到支持更复杂的同步需求。
  • 避免死锁: 通过合理的线程入队和唤醒机制,AQS 可以有效地避免死锁的发生。

8. tryAcquireShared() 的源码分析(以ReentrantReadWriteLock的ReadLock为例)

虽然我们自己实现了一个简单的Semaphore,但要真正理解tryAcquireShared(),研究Java并发包中现有类的实现是最好的方式。我们以ReentrantReadWriteLockReadLocktryAcquireShared()方法为例进行分析。

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1; //如果存在写锁,并且不是当前线程持有,则获取失败
            int r = sharedCount(c);
            if (r == MAX)
                throw new Error("Maximum number of shared locks exceeded");
            if (readerShouldBlock()) //判断读线程是否需要阻塞
                return -1;
            if (compareAndSetState(c, c + SHARED_UNIT)) { //尝试增加读锁计数器
                HoldCounter holdCounter = holdCounter;
                if (holdCounter == null) {
                    firstReader = current;
                    holdCounter = new HoldCounter();
                } else if (holdCounter.tid != getThreadId(current)) {
                    firstReader = current;
                    holdCounter = new HoldCounter();
                } else {
                    holdCounter.count++;
                }
                this.holdCounter = holdCounter;
                return 1;
            }
            else {
                //自旋重试,直到成功或者readerShouldBlock返回true
                for (;;) {
                    c = getState();
                    if (exclusiveCount(c) != 0 &&
                        getExclusiveOwnerThread() != current)
                        return -1;
                    if (readerShouldBlock())
                        return -1;
                    if (sharedCount(c) == MAX)
                        throw new Error("Maximum number of shared locks exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT))
                        break;
                }
                ThreadLocalHoldCounter holdCounter = cachedHoldCounter;
                if (holdCounter == null)
                    cachedHoldCounter = holdCounter = new ThreadLocalHoldCounter();
                HoldCounter h = holdCounter.get();
                if (h == null) {
                    holdCounter.set(h = new HoldCounter());
                } else if (h.tid != getThreadId(current)) {
                    holdCounter.set(h = new HoldCounter());
                } else {
                    h.count++;
                }
                firstReader = current;
                return 1;
            }
        }

代码解释:

  • exclusiveCount(c) 用于从同步状态 c 中提取写锁的计数。如果写锁计数不为 0,说明有写锁被持有。
  • getExclusiveOwnerThread() 返回持有写锁的线程。
  • sharedCount(c) 用于从同步状态 c 中提取读锁的计数。
  • readerShouldBlock() 这是一个策略方法,用于判断当前线程是否应该被阻塞。这与读写锁的公平性策略有关。如果读写锁是非公平的,那么即使有等待的写线程,新来的读线程也可能尝试获取读锁。如果读写锁是公平的,那么新来的读线程必须等待前面的写线程释放写锁。
  • SHARED_UNIT 表示读锁计数器的增量,通常为 1。
  • HoldCounterThreadLocalHoldCounter 用于记录每个线程持有读锁的次数,这对于支持可重入的读锁是必要的。

分析要点:

  1. 优先判断写锁: tryAcquireShared() 首先判断是否存在写锁,并且当前线程是否持有写锁。如果存在写锁,并且不是当前线程持有,那么获取读锁失败。这是读写锁的基本原则:写锁优先于读锁。
  2. readerShouldBlock(): 这个方法体现了读写锁的公平性策略。如果 readerShouldBlock() 返回 true,那么即使没有写锁,当前线程也应该被阻塞,等待前面的写线程释放写锁。
  3. CAS 操作和自旋: tryAcquireShared() 使用 CAS 操作来增加读锁计数器。如果 CAS 操作失败,说明有其他线程也在尝试获取或释放读锁,需要自旋重试。
  4. HoldCounter: 为了支持可重入的读锁,需要记录每个线程持有读锁的次数。HoldCounterThreadLocalHoldCounter 就是用于实现这个功能的。

9. AQS 共享模式的注意事项

  • 正确处理返回值: tryAcquireShared() 的返回值至关重要,必须根据返回值来判断获取是否成功,以及是否需要唤醒其他线程。
  • 避免死锁: 在使用 AQS 时,需要仔细考虑死锁的可能性。例如,避免循环等待的情况。
  • 性能优化: 合理选择同步策略(例如公平性),避免不必要的竞争。
  • 理解 readerShouldBlock() 的作用: 在使用 ReentrantReadWriteLock 时,要理解 readerShouldBlock() 方法的作用,根据实际需求选择合适的公平性策略。

10. 总结:AQS 共享模式是并发控制的强大工具

今天我们详细学习了 AQS 的共享模式,并通过一个简单的信号量和 ReentrantReadWriteLock 的源码分析,深入理解了 tryAcquireShared() 方法的作用和实现。掌握 AQS 共享模式,能够帮助我们构建更加灵活、高效、可靠的并发应用。

资源共享,并发控制,AQS共享模式的核心价值

AQS 共享模式提供了强大的并发访问控制能力,通过 tryAcquireShared() 的灵活设计,可以应对各种复杂的并发场景。深入理解和熟练运用 AQS,是成为并发编程专家的必经之路。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注