Java并发包中的非阻塞同步算法:CLH锁、MCS锁在高性能并发结构中的应用

Java并发包中的非阻塞同步算法:CLH锁、MCS锁在高性能并发结构中的应用

大家好,今天我们来深入探讨Java并发包中两种重要的非阻塞同步算法:CLH锁和MCS锁。这两种锁在构建高性能并发数据结构中扮演着关键角色,它们避免了传统锁机制带来的线程阻塞,从而提升了系统的整体吞吐量。

1. 阻塞与非阻塞同步

在传统的锁机制中,例如synchronized关键字和ReentrantLock,当一个线程尝试获取一个已经被其他线程持有的锁时,该线程会被阻塞,进入等待状态。直到锁被释放,该线程才能被唤醒并尝试重新获取锁。这种阻塞行为在高并发场景下会带来显著的性能开销,例如:

  • 上下文切换: 线程阻塞会导致操作系统进行上下文切换,保存和恢复线程的状态,这是一个昂贵的操作。
  • 优先级反转: 低优先级线程持有锁,高优先级线程等待锁,导致高优先级线程无法及时执行。
  • 死锁: 多个线程互相等待对方释放锁,导致所有线程都无法继续执行。

非阻塞同步算法的目标是避免线程阻塞,即使在并发冲突的情况下,线程也能继续执行,只是可能会进行重试或其他操作。常见的非阻塞算法包括:

  • 比较并交换(CAS): 原子性地比较内存中的值与预期值,如果相等则更新为新值。
  • 原子变量: 例如AtomicIntegerAtomicReference等,利用CAS操作提供原子性的读写操作。
  • 锁无关的数据结构: 设计数据结构时避免使用显式锁,利用CAS或其他原子操作来保证线程安全。

CLH锁和MCS锁就是属于锁无关数据结构中的一种,它们通过一种链表结构来维护等待锁的线程,避免了阻塞。

2. CLH锁(Craig, Landin, and Hagersten lock)

CLH锁是一种自旋锁,它使用一个隐式的队列来管理等待锁的线程。每个线程都持有队列中的一个节点,节点中保存了线程是否应该继续自旋的信息。

2.1 CLH锁的基本原理

  1. 队列结构: CLH锁维护一个FIFO队列,队列中的每个节点代表一个等待锁的线程。
  2. 节点状态: 每个节点包含一个状态位,表示前驱节点是否已经释放锁。
  3. 自旋等待: 线程通过检查前驱节点的状态位来判断是否可以获取锁。如果前驱节点未释放锁,线程就自旋等待,直到前驱节点释放锁。
  4. 释放锁: 释放锁时,当前节点将自己的状态位设置为已释放,通知后继节点可以尝试获取锁。

2.2 CLH锁的实现

下面是一个简单的CLH锁的Java实现:

import java.util.concurrent.atomic.AtomicReference;

public class CLHLock {

    private final AtomicReference<QNode> tail;
    private final ThreadLocal<QNode> myNode;
    private final ThreadLocal<QNode> myPred;

    public CLHLock() {
        tail = new AtomicReference<>(new QNode()); // 哨兵节点
        myNode = ThreadLocal.withInitial(QNode::new);
        myPred = new ThreadLocal<>();
    }

    public void lock() {
        QNode qnode = myNode.get();
        QNode pred = tail.getAndSet(qnode);
        myPred.set(pred);

        // 自旋等待前驱节点释放锁
        while (pred.locked) {
            // 自旋
        }
    }

    public void unlock() {
        QNode qnode = myNode.get();
        qnode.locked = false; // 释放锁,通知后继节点
        myNode.set(myPred.get()); // 重用节点
    }

    static class QNode {
        volatile boolean locked = true; // 默认锁定
    }

    public static void main(String[] args) throws InterruptedException {
        CLHLock lock = new CLHLock();
        int numThreads = 5;
        Thread[] threads = new Thread[numThreads];

        for (int i = 0; i < numThreads; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                lock.lock();
                try {
                    System.out.println("Thread " + threadId + " acquired the lock.");
                    Thread.sleep(100); // 模拟持有锁的时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("Thread " + threadId + " releasing the lock.");
                    lock.unlock();
                }
            });
            threads[i].start();
        }

        for (Thread thread : threads) {
            thread.join();
        }
    }
}

代码解释:

  • tail: 一个原子引用,指向队列的尾部节点。
  • myNode: 一个ThreadLocal变量,每个线程都持有自己的QNode实例,用于表示该线程在队列中的节点。
  • myPred: 一个ThreadLocal变量,存储当前线程的前驱节点。
  • lock():
    1. 获取当前线程的QNode
    2. tail指向当前线程的QNode,并获取之前的tail节点作为前驱节点。
    3. 将前驱节点保存到myPred中。
    4. 自旋等待,直到前驱节点的locked状态变为false
  • unlock():
    1. 获取当前线程的QNode
    2. 将当前节点的locked状态设置为false,表示释放锁。
    3. 重用QNode,避免频繁创建对象。

2.3 CLH锁的优点和缺点

优点:

  • 公平性: CLH锁保证了FIFO的获取顺序,先请求的线程先获得锁。
  • 避免饥饿: 每个线程都有机会获得锁,避免了某个线程长期无法获得锁的情况。
  • 本地自旋: 线程只在自己的前驱节点上自旋,减少了对共享内存的竞争。
  • 空间复杂度低: 每个线程只需要一个节点,空间复杂度为O(n),其中n为线程数。

缺点:

  • 自旋等待: 线程在等待锁的过程中会一直自旋,消耗CPU资源。
  • 内存占用: 每个线程都需要一个节点,在高并发场景下会占用较多的内存。
  • 释放锁必须由持有锁的线程执行: 无法实现像ReentrantLock那样的条件变量。

3. MCS锁(Mellor-Crummey and Scott lock)

MCS锁也是一种自旋锁,它与CLH锁类似,也使用一个隐式的队列来管理等待锁的线程。但是,MCS锁的节点直接指向后继节点,而不是前驱节点。

3.1 MCS锁的基本原理

  1. 队列结构: MCS锁维护一个FIFO队列,队列中的每个节点代表一个等待锁的线程。
  2. 节点状态: 每个节点包含一个状态位,表示是否有后继节点正在等待锁。
  3. 自旋等待: 线程通过检查自己的节点状态位来判断是否可以获取锁。如果该状态位为true,表示有后继节点正在等待锁,线程就自旋等待,直到该状态位变为false
  4. 释放锁: 释放锁时,当前节点将自己的状态位设置为false,通知后继节点可以尝试获取锁。

3.2 MCS锁的实现

下面是一个简单的MCS锁的Java实现:

import java.util.concurrent.atomic.AtomicReference;

public class MCSLock {

    private final AtomicReference<QNode> tail;
    private final ThreadLocal<QNode> myNode;

    public MCSLock() {
        tail = new AtomicReference<>();
        myNode = ThreadLocal.withInitial(QNode::new);
    }

    public void lock() {
        QNode qnode = myNode.get();
        QNode pred = tail.getAndSet(qnode);

        if (pred != null) {
            qnode.locked = true;
            pred.next = qnode; // 链接后继节点
            // 自旋等待
            while (qnode.locked) {
                // 自旋
            }
        }
    }

    public void unlock() {
        QNode qnode = myNode.get();
        if (qnode.next == null) {
            // 检查是否有竞争
            if (tail.compareAndSet(qnode, null)) {
                return; // 没有竞争,直接返回
            } else {
                // 等待后继节点链接完成
                while (qnode.next == null) {
                    // 自旋
                }
            }
        }
        qnode.next.locked = false; // 释放锁,通知后继节点
        qnode.next = null; // help GC
    }

    static class QNode {
        volatile boolean locked = false;
        volatile QNode next = null;
    }

    public static void main(String[] args) throws InterruptedException {
        MCSLock lock = new MCSLock();
        int numThreads = 5;
        Thread[] threads = new Thread[numThreads];

        for (int i = 0; i < numThreads; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                lock.lock();
                try {
                    System.out.println("Thread " + threadId + " acquired the lock.");
                    Thread.sleep(100); // 模拟持有锁的时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("Thread " + threadId + " releasing the lock.");
                    lock.unlock();
                }
            });
            threads[i].start();
        }

        for (Thread thread : threads) {
            thread.join();
        }
    }
}

代码解释:

  • tail: 一个原子引用,指向队列的尾部节点。
  • myNode: 一个ThreadLocal变量,每个线程都持有自己的QNode实例,用于表示该线程在队列中的节点。
  • lock():
    1. 获取当前线程的QNode
    2. tail指向当前线程的QNode,并获取之前的tail节点作为前驱节点。
    3. 如果前驱节点不为空,则设置当前节点的lockedtrue,并将前驱节点的next指向当前节点。
    4. 自旋等待,直到当前节点的locked状态变为false
  • unlock():
    1. 获取当前线程的QNode
    2. 如果当前节点的next为空,表示没有后继节点,则尝试将tail设置为null。如果CAS成功,表示没有竞争,直接返回。
    3. 如果CAS失败,表示有其他线程正在尝试获取锁,需要等待后继节点链接完成。
    4. 将后继节点的locked状态设置为false,表示释放锁。
    5. 将当前节点的next设置为null,帮助GC。

3.3 MCS锁的优点和缺点

优点:

  • 公平性: MCS锁保证了FIFO的获取顺序,先请求的线程先获得锁。
  • 避免饥饿: 每个线程都有机会获得锁,避免了某个线程长期无法获得锁的情况。
  • 本地自旋: 线程只在自己的节点上自旋,减少了对共享内存的竞争。
  • 更少的内存流量: MCS锁的释放操作只涉及一次内存写入操作,而CLH锁涉及两次。

缺点:

  • 自旋等待: 线程在等待锁的过程中会一直自旋,消耗CPU资源。
  • 释放锁的逻辑更复杂: 需要处理没有后继节点的情况,以及等待后继节点链接完成的情况。
  • 释放锁必须由持有锁的线程执行: 无法实现像ReentrantLock那样的条件变量。

4. CLH锁和MCS锁的比较

特性 CLH锁 MCS锁
队列结构 隐式FIFO队列,节点指向前驱节点 隐式FIFO队列,节点指向后继节点
自旋位置 在前驱节点的locked状态位上自旋 在自己的locked状态位上自旋
内存流量 获取和释放锁都涉及内存写入操作 释放锁时可能需要额外的CAS操作
节点重用 可以重用节点 不支持节点重用
实现复杂度 相对简单 相对复杂

5. CLH锁和MCS锁在高性能并发结构中的应用

CLH锁和MCS锁非常适合构建高性能的并发数据结构,例如:

  • 并发队列: 可以使用CLH锁或MCS锁来保护队列的头部和尾部,保证并发入队和出队的线程安全。
  • 并发哈希表: 可以使用CLH锁或MCS锁来保护哈希表的桶,保证并发插入和查找的线程安全。
  • 读写锁: 可以使用CLH锁或MCS锁来构建读写锁,允许多个线程同时读取数据,但只允许一个线程写入数据。
  • 线程池: 可以使用CLH锁或MCS锁来管理线程池的任务队列,保证任务的并发执行。

示例:基于CLH锁的并发队列

import java.util.concurrent.atomic.AtomicReference;

public class ConcurrentQueue<T> {

    private final AtomicReference<QNode> head = new AtomicReference<>(new QNode(null));
    private final AtomicReference<QNode> tail = new AtomicReference<>(head.get());
    private final CLHLock lock = new CLHLock();

    public void enqueue(T value) {
        QNode node = new QNode(value);
        lock.lock();
        try {
            tail.get().next = node;
            tail.compareAndSet(tail.get(), node);
        } finally {
            lock.unlock();
        }
    }

    public T dequeue() {
        lock.lock();
        try {
            QNode first = head.get().next;
            if (first == null) {
                return null; // Queue is empty
            }
            T value = (T) first.value;
            head.get().next = first.next;
            if (head.get().next == null) {
                tail.compareAndSet(tail.get(), head.get()); // Queue is now empty
            }
            return value;
        } finally {
            lock.unlock();
        }
    }

    private static class QNode {
        volatile Object value;
        volatile QNode next;

        public QNode(Object value) {
            this.value = value;
            this.next = null;
        }
    }
        static class CLHLock {

        private final AtomicReference<QNode> tail;
        private final ThreadLocal<QNode> myNode;
        private final ThreadLocal<QNode> myPred;

        public CLHLock() {
            tail = new AtomicReference<>(new QNode(null)); // 哨兵节点
            myNode = ThreadLocal.withInitial(QNode::new);
            myPred = new ThreadLocal<>();
        }

        public void lock() {
            QNode qnode = myNode.get();
            QNode pred = tail.getAndSet(qnode);
            myPred.set(pred);

            // 自旋等待前驱节点释放锁
            while (pred != null && pred.locked) {
                // 自旋
                // You might want to add a small delay or yield to avoid excessive CPU consumption
                // For example: Thread.yield();
            }
        }

        public void unlock() {
            QNode qnode = myNode.get();
            qnode.locked = false; // 释放锁,通知后继节点
            myNode.set(myPred.get()); // 重用节点
        }

        static class QNode {
            volatile boolean locked = true; // 默认锁定
             Object value;
             volatile QNode next;

            QNode(Object value) {
                this.value = value;
                this.next = null;
            }

            public QNode() {

            }
        }

    }
}

代码解释:

  • head: 原子引用,指向队列的头部节点。
  • tail: 原子引用,指向队列的尾部节点。
  • lock: CLH锁实例,用于保护队列的头部和尾部。
  • enqueue(T value): 入队操作,先获取锁,然后将新节点添加到队列的尾部,并更新tail
  • dequeue(): 出队操作,先获取锁,然后从队列的头部移除一个节点,并更新head

6. 选择合适的锁

在选择CLH锁和MCS锁时,需要考虑以下因素:

  • 并发程度: 在高并发场景下,CLH锁和MCS锁的性能通常优于传统的阻塞锁。
  • 内存占用: CLH锁和MCS锁都需要为每个线程分配节点,在高并发场景下会占用较多的内存。
  • CPU消耗: CLH锁和MCS锁都使用自旋等待,会消耗CPU资源。
  • 实现复杂度: MCS锁的实现比CLH锁更复杂。
  • 平台特性: 不同的硬件平台对自旋锁的优化程度不同。

一般来说,如果对公平性要求较高,且内存资源充足,可以选择CLH锁或MCS锁。如果对内存占用比较敏感,可以考虑使用其他更节省内存的并发算法。

总的来说,CLH锁和MCS锁都是非常有用的非阻塞同步算法,它们可以帮助我们构建高性能的并发数据结构。但是,在使用这些算法时,需要仔细考虑其优缺点,并根据具体的应用场景进行选择。

7. 总结:非阻塞同步算法的价值

今天我们深入探讨了CLH锁和MCS锁,这两种非阻塞同步算法在构建高性能并发数据结构中起着至关重要的作用。它们通过避免线程阻塞,有效提升了系统的整体吞吐量,但在实际应用中,我们需要权衡其优缺点,并结合具体场景做出明智的选择。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注