Java AQS的独占与共享模式:ReentrantLock与CountDownLatch的实现差异

好的,我们现在开始深入探讨Java AQS(AbstractQueuedSynchronizer)框架中的独占模式和共享模式,并以ReentrantLock和CountDownLatch为例,剖析它们在实现上的差异。

AQS:同步器的骨架

AQS是Java并发包java.util.concurrent的核心基石。它提供了一个框架,用于构建锁和同步器,极大地简化了并发编程的复杂性。AQS维护一个同步状态(state)和一个FIFO等待队列。同步状态是一个整数,可以使用原子操作进行修改。等待队列则用于管理那些试图获取同步状态但被阻塞的线程。

AQS定义了两种同步模式:

  • 独占模式(Exclusive Mode): 每次只允许一个线程持有锁。ReentrantLock就是基于独占模式实现的。
  • 共享模式(Shared Mode): 允许多个线程同时持有锁。Semaphore和CountDownLatch就是基于共享模式实现的。

ReentrantLock:独占模式的典型代表

ReentrantLock是一个可重入的互斥锁。这意味着如果一个线程已经持有了锁,它可以再次获取该锁而不会被阻塞。ReentrantLock的实现是基于AQS的独占模式。

ReentrantLock的AQS实现

ReentrantLock内部通常会包含一个Sync类,该类继承自AQS。Sync类又会有公平锁(FairSync)和非公平锁(NonfairSync)两种实现。我们先来看非公平锁。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;

public class NonfairReentrantLockExample {

    static class NonfairSync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        @Override
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    private final NonfairSync sync = new NonfairSync();

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public static void main(String[] args) throws InterruptedException {
        NonfairReentrantLockExample lockExample = new NonfairReentrantLockExample();

        Thread t1 = new Thread(() -> {
            lockExample.lock();
            try {
                System.out.println("Thread 1 acquired lock");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lockExample.unlock();
                System.out.println("Thread 1 released lock");
            }
        });

        Thread t2 = new Thread(() -> {
            lockExample.lock();
            try {
                System.out.println("Thread 2 acquired lock");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lockExample.unlock();
                System.out.println("Thread 2 released lock");
            }
        });

        t1.start();
        Thread.sleep(100); // 让t1先获取锁
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Done");
    }
}

核心方法分析:

  • tryAcquire(int acquires): 尝试获取独占锁。
    • 如果state为0,表示锁未被占用,尝试使用CAS(Compare and Swap)操作将state设置为acquires(通常为1),并将当前线程设置为独占线程。如果CAS成功,则获取锁成功。
    • 如果state不为0,且当前线程是独占线程,表示重入锁,将state增加acquires
    • 如果state不为0,且当前线程不是独占线程,则获取锁失败。
  • tryRelease(int releases): 尝试释放独占锁。
    • 将state减去releases
    • 如果state变为0,表示锁完全释放,将独占线程设置为null,并返回true。
    • 否则,更新state,并返回false。
  • isHeldExclusively(): 判断当前线程是否持有独占锁。

acquire(int acquires)release(int releases):

ReentrantLock的lock()方法调用sync.acquire(1)unlock()方法调用sync.release(1)acquire()release()是AQS提供的方法,它们封装了获取和释放锁的逻辑,包括阻塞和唤醒线程。

公平锁的实现 (FairSync)

公平锁的实现与非公平锁的主要区别在于tryAcquire()方法。公平锁会先检查等待队列中是否有等待时间更长的线程,如果有,则不会尝试获取锁,从而保证了公平性。

static class FairSync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && // 关键:检查队列中是否有等待时间更长的线程
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // 其他方法与 NonfairSync 相同
}

hasQueuedPredecessors(): AQS提供的方法,用于检查等待队列中是否有等待时间更长的线程。

CountDownLatch:共享模式的经典案例

CountDownLatch是一个同步工具类,允许一个或多个线程等待直到在其他线程中执行的一组操作完成。CountDownLatch维护一个计数器,该计数器被初始化为一个正整数。当计数器达到零时,所有等待线程将被释放。

CountDownLatch的AQS实现

CountDownLatch的实现是基于AQS的共享模式。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {

    static class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public CountDownLatchExample(int count) {
        sync = new Sync(count);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getState();
    }

    public static void main(String[] args) throws InterruptedException {
        int numberOfThreads = 3;
        CountDownLatchExample latch = new CountDownLatchExample(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("Thread " + threadId + " is working...");
                    Thread.sleep((long) (Math.random() * 2000)); // 模拟工作
                    System.out.println("Thread " + threadId + " finished.");
                    latch.countDown(); // 倒计时
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        latch.await(); // 主线程等待
        System.out.println("All threads have finished.  Main thread continues.");
    }
}

核心方法分析:

  • tryAcquireShared(int acquires): 尝试获取共享锁。
    • 如果state为0,表示计数器已经为0,允许所有等待线程通过,返回一个大于等于0的值(通常为1)。
    • 如果state不为0,表示计数器还未归零,不允许线程通过,返回一个小于0的值(通常为-1)。
  • tryReleaseShared(int releases): 尝试释放共享锁(减少计数器)。
    • 使用CAS操作将state减1。
    • 如果state变为0,表示计数器已经为0,返回true,唤醒所有等待线程。
    • 否则,返回false。

acquireSharedInterruptibly(int acquires)releaseShared(int releases):

CountDownLatch的await()方法调用sync.acquireSharedInterruptibly(1)countDown()方法调用sync.releaseShared(1)acquireSharedInterruptibly()releaseShared()是AQS提供的方法,它们封装了获取和释放共享锁的逻辑,包括阻塞和唤醒线程。 acquireSharedInterruptibly 允许在等待过程中被中断。

独占模式 vs. 共享模式:实现差异对比

特性 独占模式 (ReentrantLock) 共享模式 (CountDownLatch)
获取锁 tryAcquire() 返回 true/false,表示是否成功获取锁。 tryAcquireShared() 返回 >=0 或 <0,表示是否允许通过。
释放锁 tryRelease() 返回 true/false,表示是否完全释放锁。 tryReleaseShared() 返回 true/false,表示是否需要唤醒其他线程。
适用场景 互斥访问共享资源。 等待一组线程完成操作。
状态 (State) 表示锁的重入次数。 表示计数器的值。
获取线程数 每次只允许一个线程获取锁。 允许多个线程同时获取锁 (当状态满足条件时)。

AQS核心方法的差异:

  • tryAcquire vs. tryAcquireShared: 独占模式使用tryAcquire尝试获取锁,返回布尔值表示是否成功。共享模式使用tryAcquireShared尝试获取锁,返回整数值,其正负表示获取是否成功,更灵活地表达状态。
  • tryRelease vs. tryReleaseShared: 独占模式使用tryRelease释放锁,返回布尔值表示是否完全释放锁。共享模式使用tryReleaseShared释放锁,返回布尔值表示是否需要唤醒其他线程。

选择合适的同步模式

选择独占模式还是共享模式取决于具体的并发场景。

  • 如果需要互斥访问共享资源,应该选择独占模式,例如ReentrantLock。
  • 如果需要等待一组线程完成操作,应该选择共享模式,例如CountDownLatch。
  • 如果需要限制同时访问共享资源的线程数量,可以选择Semaphore(也是基于AQS的共享模式实现的)。

AQS的设计优势

AQS框架的设计具有以下优势:

  • 代码复用: AQS提供了一个通用的同步框架,可以避免重复编写同步逻辑。
  • 可扩展性: 可以通过继承AQS并重写其抽象方法来实现自定义的同步器。
  • 性能优化: AQS内部使用了CAS操作和FIFO队列,可以有效地提高并发性能。
  • 灵活性: AQS支持独占模式和共享模式,可以满足不同的并发需求。

总结:理解独占与共享模式的关键差异

ReentrantLock和CountDownLatch虽然都基于AQS框架,但它们分别代表了独占模式和共享模式。独占模式保证了对资源的互斥访问,而共享模式则允许多个线程并发执行。理解这两种模式的差异,有助于我们选择合适的同步工具,并编写高效、可靠的并发程序。

核心在于状态的控制

AQS的状态管理是区分独占和共享模式的关键。独占模式中,状态通常表示锁的持有者和重入次数;而在共享模式中,状态则代表某种共享资源的可利用程度或者计数器的剩余值。通过对状态的巧妙控制,AQS能够灵活地支持各种复杂的并发场景。

AQS的强大之处在于它的可定制性

AQS的强大之处在于它的可定制性。开发者可以通过继承AQS并重写其抽象方法,来实现各种自定义的同步器,满足不同的并发需求。这种灵活性使得AQS成为Java并发包中不可或缺的核心组件。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注