JAVA使用ReentrantLock实现可重入锁的底层原理学习与实践

好的,下面我们开始深入探讨Java中ReentrantLock的底层原理及其应用。

讲座:Java ReentrantLock 的底层原理与实践

今天,我们将深入探讨Java并发编程中一个非常重要的工具:ReentrantLockReentrantLock,顾名思义,是一种可重入的互斥锁,它提供了比synchronized关键字更灵活、更强大的并发控制能力。我们将从ReentrantLock的基本概念入手,逐步剖析其底层实现原理,并通过实际代码示例展示如何在实际开发中使用它。

1. ReentrantLock 的基本概念

首先,我们需要理解什么是可重入锁。可重入锁是指,如果一个线程已经持有了某个锁,那么它可以多次获得该锁而不会被阻塞。每次获得锁,锁的计数器都会递增;每次释放锁,计数器都会递减。只有当计数器减为零时,锁才真正释放,允许其他线程获取。

ReentrantLock实现了Lock接口,它提供了与synchronized关键字类似的互斥功能,但具有更多的扩展特性,例如:

  • 公平锁与非公平锁: ReentrantLock可以配置为公平锁或非公平锁。公平锁按照线程请求锁的顺序来分配锁,先到先得;而非公平锁则允许线程“插队”,即如果某个线程尝试获取锁时,锁正好可用,那么即使有其他线程正在等待,该线程也可以立即获取锁。
  • 中断响应: 线程在等待ReentrantLock时,可以被中断,从而避免死锁。
  • 定时锁: 线程可以尝试在指定时间内获取锁,如果超时仍未获取到锁,则放弃等待。
  • 条件变量: ReentrantLock可以与Condition对象配合使用,实现更复杂的线程同步。

2. ReentrantLock 的底层实现:AQS

ReentrantLock的底层实现依赖于AbstractQueuedSynchronizer(AQS),也称为同步器。AQS是一个用于构建锁和同步器的框架。它使用一个int类型的状态变量(state)来表示同步状态,并维护一个FIFO(先进先出)的等待队列来管理等待锁的线程。

AQS定义了以下几种核心方法:

  • tryAcquire(int arg):尝试以独占模式获取同步状态。如果成功,则返回true;否则,返回false
  • tryRelease(int arg):尝试释放同步状态。如果成功,则返回true;否则,返回false
  • isHeldExclusively():判断当前线程是否独占式持有同步状态。
  • tryAcquireShared(int arg):尝试以共享模式获取同步状态。
  • tryReleaseShared(int arg):尝试以共享模式释放同步状态。

ReentrantLock通过继承AQS,并实现其tryAcquiretryRelease方法,来实现锁的获取和释放。

3. ReentrantLock 的源码分析

我们来分析一下ReentrantLock的关键源码,以深入理解其实现原理。

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        @Override
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    static final class NonfairSync extends Sync {
        @Override
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        @Override
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    static final class FairSync extends Sync {
        @Override
        final void lock() {
            acquire(1);
        }

        @Override
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    public void lock() {
        sync.lock();
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isFair() {
        return sync instanceof FairSync;
    }
}
  • Sync 抽象类: ReentrantLock内部有一个Sync抽象类,它继承自AQS。Sync类定义了锁的基本行为,包括lock()方法(获取锁)和tryRelease()方法(释放锁)。Sync类还维护了当前持有锁的线程(exclusiveOwnerThread)和锁的重入计数器(state)。
  • NonfairSync 和 FairSync: ReentrantLock提供了两种同步器实现:NonfairSync(非公平锁)和FairSync(公平锁)。它们都继承自Sync,并实现了自己的lock()tryAcquire()方法。
  • lock() 方法: lock()方法是获取锁的关键。在非公平锁中,lock()方法首先尝试使用CAS(Compare and Swap)操作来获取锁。如果CAS操作成功,则将当前线程设置为锁的持有者,并返回。如果CAS操作失败,则调用AQS的acquire()方法,将当前线程加入等待队列,并阻塞等待。在公平锁中,lock()方法直接调用AQS的acquire()方法,将当前线程加入等待队列,并阻塞等待。
  • tryAcquire() 方法: tryAcquire()方法尝试获取锁,但不阻塞。它首先检查锁是否被其他线程持有。如果锁未被持有,则尝试使用CAS操作来获取锁。如果CAS操作成功,则将当前线程设置为锁的持有者,并返回true。如果锁已被持有,且持有者是当前线程,则增加锁的重入计数器,并返回true。否则,返回false。在公平锁中,tryAcquire()方法还会检查是否有其他线程正在等待锁,如果有,则不尝试获取锁,直接返回false
  • unlock() 方法: unlock()方法是释放锁的关键。它首先检查当前线程是否是锁的持有者。如果不是,则抛出IllegalMonitorStateException异常。如果是,则减少锁的重入计数器。如果计数器减为零,则将锁的持有者设置为null,并唤醒等待队列中的一个线程。

4. ReentrantLock 的应用示例

下面是一些使用ReentrantLock的实际代码示例。

示例 1:使用 ReentrantLock 实现线程安全的计数器

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Counter {
    private int count = 0;
    private final Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Count: " + counter.getCount()); // Expected: 20000
    }
}

在这个示例中,我们使用ReentrantLock来保护count变量,确保多个线程可以安全地对其进行递增操作。lock.lock()获取锁,lock.unlock()释放锁。finally块确保即使在try块中发生异常,锁也能被正确释放。

示例 2:使用 ReentrantLock 和 Condition 实现生产者-消费者模式

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int capacity = 10;
    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public void produce(int item) throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                notFull.await(); // 等待队列不满
            }
            queue.offer(item);
            System.out.println("Produced: " + item);
            notEmpty.signal(); // 通知消费者队列不为空
        } finally {
            lock.unlock();
        }
    }

    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); // 等待队列不为空
            }
            int item = queue.poll();
            System.out.println("Consumed: " + item);
            notFull.signal(); // 通知生产者队列未满
            return item;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();

        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.produce(i);
                    Thread.sleep((long)(Math.random() * 100)); // Simulate production time
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    pc.consume();
                    Thread.sleep((long)(Math.random() * 150)); // Simulate consumption time
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

在这个示例中,我们使用ReentrantLockCondition对象来实现生产者-消费者模式。notFull条件变量用于生产者等待队列不满,notEmpty条件变量用于消费者等待队列不为空。await()方法使线程进入等待状态,signal()方法唤醒等待的线程。

5. 公平锁与非公平锁的比较

ReentrantLock可以配置为公平锁或非公平锁。

特性 公平锁 非公平锁
线程调度 按照线程请求锁的顺序来分配锁,先到先得。 允许线程“插队”,即如果某个线程尝试获取锁时,锁正好可用,那么即使有其他线程正在等待,该线程也可以立即获取锁。
性能 通常情况下,公平锁的性能比非公平锁差,因为公平锁需要维护等待队列,并按照顺序唤醒线程。 通常情况下,非公平锁的性能比公平锁好,因为它可以减少线程上下文切换的开销。
适用场景 适用于对公平性要求较高的场景,例如,需要避免某些线程长时间饥饿的情况。 适用于对性能要求较高的场景,例如,需要尽可能减少线程上下文切换的开销。
代码实现 FairSync 中,tryAcquire 方法会检查是否有其他线程正在等待锁 (hasQueuedPredecessors()),如果有,则不尝试获取锁。 NonfairSync 中,tryAcquire 方法不会检查是否有其他线程正在等待锁,而是直接尝试使用 CAS 操作来获取锁。
示例代码 java ReentrantLock fairLock = new ReentrantLock(true); // 创建一个公平锁 | java ReentrantLock unfairLock = new ReentrantLock(false); // 创建一个非公平锁java ReentrantLock unfairLock = new ReentrantLock(); // 默认创建非公平锁

选择公平锁还是非公平锁,取决于具体的应用场景。如果对公平性要求较高,可以选择公平锁;如果对性能要求较高,可以选择非公平锁。

6. ReentrantLock 的最佳实践

  • 始终在 finally 块中释放锁: 确保即使在 try 块中发生异常,锁也能被正确释放,避免死锁。
  • 避免长时间持有锁: 尽量减少持有锁的时间,避免阻塞其他线程。
  • 使用 tryLock() 方法: 如果不需要一直等待锁,可以使用 tryLock() 方法尝试在指定时间内获取锁。
  • 合理使用条件变量: 使用 Condition 对象可以实现更复杂的线程同步,例如生产者-消费者模式。
  • 选择合适的锁类型: 根据具体的应用场景选择公平锁或非公平锁。

7. ReentrantLock 与 synchronized 的比较

ReentrantLocksynchronized都是Java中用于实现线程同步的机制,但它们之间存在一些重要的区别。

特性 ReentrantLock synchronized
实现方式 基于 AQS(AbstractQueuedSynchronizer)实现,是 Java 类库提供的 API。 是 Java 语言的关键字,由 JVM 实现。
灵活性 提供了更灵活的锁控制,例如公平锁、非公平锁、中断响应、定时锁等。 相对简单,只能实现非公平锁。
功能性 除了基本的互斥功能外,还提供了条件变量(Condition)用于实现更复杂的线程同步。 只能通过 wait()notify/notifyAll() 方法来实现简单的线程同步,功能相对有限。
性能 在某些情况下,ReentrantLock 的性能可能比 synchronized 更好,特别是在竞争激烈的情况下。但早期的 synchronized 性能较差,在JDK1.6之后,JVM对synchronized 进行了大量的优化,使得在很多场景下,它的性能与ReentrantLock 相当甚至更好。
使用方式 需要显式地获取和释放锁,使用 lock()unlock() 方法。必须在 finally 块中释放锁,以确保锁一定会被释放。 使用 synchronized 关键字可以隐式地获取和释放锁,不需要手动释放。
可重入性 两者都是可重入锁。 两者都是可重入锁。
中断响应 可以响应中断,线程在等待锁的过程中可以被中断。 在等待锁的过程中不能被中断,只能等待锁被释放。
锁的类型 可以实现公平锁和非公平锁。 只能实现非公平锁。
监控 可以使用 LockSupport 类进行更细粒度的线程阻塞和唤醒操作。 只能使用 wait()notify/notifyAll() 方法进行线程阻塞和唤醒操作。

何时选择 ReentrantLock?

  • 需要公平锁。
  • 需要响应中断。
  • 需要定时锁。
  • 需要使用条件变量。
  • 需要在高并发场景下进行更细粒度的锁控制。

何时选择 synchronized?

  • 代码简单,只需要基本的互斥功能。
  • 不需要公平锁。
  • 不需要响应中断。
  • 不需要定时锁。
  • 对性能要求不高。

8. 结合AQS状态分析可重入的原理

AQS的状态(state)维护了锁的重入计数。当线程第一次获取锁时,AQS将state设置为1,并记录当前持有锁的线程。如果同一个线程再次尝试获取锁,AQS会检查当前线程是否是持有锁的线程,如果是,则将state递增,表示锁被重入一次。释放锁时,AQS会将state递减,当state减为0时,表示锁被完全释放,其他线程可以尝试获取锁。

代码示例:

class MyReentrantLock {
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        @Override
        protected boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            int c = getState() - releases;
            if (c == 0) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
            setState(c);
            return false;
        }
    }

    public static void main(String[] args) {
        MyReentrantLock lock = new MyReentrantLock();
        lock.lock();
        System.out.println("First lock acquired");
        lock.lock();
        System.out.println("Second lock acquired");
        lock.unlock();
        System.out.println("First unlock done");
        lock.unlock();
        System.out.println("Second unlock done");
    }
}

在这个例子中,同一个线程连续两次获取锁,tryAcquire方法通过判断当前线程是否为持有锁的线程,如果是,则增加state的值,实现了锁的重入。

9. 总结与启示

ReentrantLock是Java并发编程中一个强大的工具,它提供了比synchronized关键字更灵活、更强大的并发控制能力。通过深入理解ReentrantLock的底层实现原理,我们可以更好地利用它来构建高效、可靠的并发应用程序。掌握AQS的原理是理解ReentrantLock的关键。选择合适的锁类型,并遵循最佳实践,可以帮助我们避免常见的并发问题,例如死锁、饥饿等。理解AQS和锁状态的变化是深入理解重入原理的基础。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注