JAVA synchronized代码块锁粒度过大导致吞吐下降的拆分策略

JAVA synchronized 代码块锁粒度过大导致吞吐下降的拆分策略

大家好,今天我们要深入探讨一个在并发编程中经常遇到的问题:synchronized 代码块锁粒度过大导致吞吐量下降,以及如何通过有效的拆分策略来解决这个问题。

在多线程应用中,synchronized 关键字是实现互斥访问的重要手段。它可以确保在同一时刻只有一个线程可以访问被保护的代码块或方法。然而,过度使用 synchronized 或者不合理地设置锁的范围,会导致锁竞争加剧,线程阻塞,从而严重影响程序的吞吐量。

锁竞争与吞吐量

锁竞争是指多个线程试图同时获取同一个锁的情况。当一个线程持有锁时,其他试图获取该锁的线程会被阻塞,直到锁被释放。这种阻塞会导致线程上下文切换,而上下文切换本身也是一种开销。当锁竞争非常激烈时,大量的线程会处于阻塞状态,CPU 时间被浪费在上下文切换上,而不是执行实际的业务逻辑,最终导致吞吐量下降。

吞吐量是指单位时间内系统能够处理的请求或事务的数量。在并发编程中,高吞吐量是衡量系统性能的重要指标之一。

识别锁粒度过大的问题

在开始优化之前,首先需要识别出代码中锁粒度过大的问题。以下是一些常见的迹象:

  • CPU 利用率不高: 尽管系统中有多个线程在运行,但 CPU 利用率却很低,说明线程大部分时间都处于阻塞状态。
  • 响应时间过长: 用户的请求需要等待很长时间才能得到响应,表明线程在等待锁的过程中耗费了大量时间。
  • 性能监控工具显示锁竞争激烈: 使用诸如 JConsole、VisualVM 等性能监控工具,可以观察到大量的线程在争夺同一个锁。

锁粒度拆分策略

一旦确定了锁粒度过大是性能瓶颈,就可以考虑使用锁粒度拆分策略来优化代码。核心思想是将一个大的锁分解成多个小的锁,从而降低锁竞争的概率,提高并发度。以下是一些常用的拆分策略:

1. 基于数据结构的拆分

这种策略适用于对共享数据结构进行并发访问的场景。例如,如果多个线程需要访问一个 Map 对象,可以考虑使用 ConcurrentHashMap 替代 synchronized 包装的 HashMapConcurrentHashMap 内部使用了分段锁,将 Map 分成多个段,每个段拥有独立的锁。这样,不同的线程可以同时访问不同的段,从而提高并发度。

示例:

原始代码(锁粒度过大):

import java.util.HashMap;
import java.util.Map;

public class SynchronizedMapExample {

    private final Map<String, Integer> map = new HashMap<>();

    public synchronized void put(String key, Integer value) {
        map.put(key, value);
    }

    public synchronized Integer get(String key) {
        return map.get(key);
    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedMapExample example = new SynchronizedMapExample();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                example.put("key" + i, i);
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                example.get("key" + i);
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Done");
    }
}

优化后的代码(使用 ConcurrentHashMap):

import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

public class ConcurrentHashMapExample {

    private final Map<String, Integer> map = new ConcurrentHashMap<>();

    public void put(String key, Integer value) {
        map.put(key, value);
    }

    public Integer get(String key) {
        return map.get(key);
    }

    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMapExample example = new ConcurrentHashMapExample();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                example.put("key" + i, i);
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                example.get("key" + i);
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Done");
    }
}

在这个例子中,我们将 synchronized 包装的 HashMap 替换为 ConcurrentHashMap,从而利用其内部的分段锁机制来提高并发度。

适用场景: 频繁读写共享数据结构的场景.

2. 基于业务逻辑的拆分

如果 synchronized 代码块中包含了多个独立的业务操作,可以将这些操作分解成多个小的 synchronized 代码块,每个代码块只保护与其相关的资源。

示例:

原始代码(锁粒度过大):

public class BankAccount {

    private int balance;

    public BankAccount(int initialBalance) {
        this.balance = initialBalance;
    }

    public synchronized void deposit(int amount) {
        // 1. 记录交易日志
        logTransaction("Deposit: " + amount);

        // 2. 更新账户余额
        balance += amount;

        // 3. 发送通知
        sendNotification("Deposit successful!");
    }

    private void logTransaction(String message) {
        // 模拟记录日志操作
        System.out.println("Logging: " + message);
        try {
            Thread.sleep(10); // 模拟耗时操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void sendNotification(String message) {
        // 模拟发送通知操作
        System.out.println("Sending notification: " + message);
        try {
            Thread.sleep(5); // 模拟耗时操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public synchronized int getBalance() {
        return balance;
    }

    public static void main(String[] args) throws InterruptedException {
        BankAccount account = new BankAccount(100);

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                account.deposit(10);
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
               account.getBalance();
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final Balance: " + account.getBalance());
    }
}

优化后的代码(基于业务逻辑拆分):

public class BankAccount {

    private int balance;
    private final Object lock = new Object(); // 显式锁对象

    public BankAccount(int initialBalance) {
        this.balance = initialBalance;
    }

    public void deposit(int amount) {
        // 1. 记录交易日志 (不涉及共享资源,不需要同步)
        logTransaction("Deposit: " + amount);

        // 2. 更新账户余额 (需要同步)
        synchronized (lock) {
            balance += amount;
        }

        // 3. 发送通知 (不涉及共享资源,不需要同步)
        sendNotification("Deposit successful!");
    }

    private void logTransaction(String message) {
        // 模拟记录日志操作
        System.out.println("Logging: " + message);
        try {
            Thread.sleep(10); // 模拟耗时操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void sendNotification(String message) {
        // 模拟发送通知操作
        System.out.println("Sending notification: " + message);
        try {
            Thread.sleep(5); // 模拟耗时操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public int getBalance() {
        synchronized (lock) {
            return balance;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        BankAccount account = new BankAccount(100);

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                account.deposit(10);
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
               account.getBalance();
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Final Balance: " + account.getBalance());
    }
}

在这个例子中,我们将 deposit 方法中不涉及共享资源的操作(记录日志和发送通知)移出 synchronized 代码块,只保留更新账户余额的操作在 synchronized 代码块中。这样,其他线程就可以在记录日志和发送通知的同时,访问账户余额,从而提高并发度。 同时,也将 getBalance() 方法改为同步代码块。

适用场景: 同步代码块中包含多个独立操作,且只有部分操作需要同步的场景. 使用显式锁对象可以更灵活地控制锁的范围。

3. 读写锁分离

如果对共享资源的访问模式是读多写少,可以使用读写锁(ReadWriteLock)来提高并发度。读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。这样,在读多写少的场景下,可以显著提高并发度。

示例:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {

    private final Map<String, Integer> map = new HashMap<>();
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final java.util.concurrent.locks.Lock readLock = readWriteLock.readLock();
    private final java.util.concurrent.locks.Lock writeLock = readWriteLock.writeLock();

    public void put(String key, Integer value) {
        writeLock.lock();
        try {
            map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    public Integer get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ReadWriteLockExample example = new ReadWriteLockExample();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                example.put("key" + i, i);
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                example.get("key" + i);
            }
        });

        Thread t3 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                example.get("key" + i);
            }
        });

        t1.start();
        t2.start();
        t3.start();

        t1.join();
        t2.join();
        t3.join();

        System.out.println("Done");
    }
}

在这个例子中,我们使用 ReadWriteLock 来保护 Map 对象。put 方法使用写锁,get 方法使用读锁。这样,多个线程可以同时读取 Map 对象,但只有一个线程可以写入 Map 对象,从而提高并发度。

适用场景: 读多写少的并发访问场景.

4. 锁剥离 (Lock Striping)

锁剥离是一种将一个锁分解成多个锁的技术,每个锁保护一部分共享资源。这种技术可以有效地减少锁竞争,提高并发度。ConcurrentHashMap 就是锁剥离的一个典型应用。

示例: 假设有一个需要同步访问的数组,可以将数组分成多个区域,每个区域使用一个独立的锁。

public class LockStripingExample {

    private final Object[] locks;
    private final int[] data;
    private final int numberOfLocks;

    public LockStripingExample(int arraySize, int numberOfLocks) {
        this.data = new int[arraySize];
        this.numberOfLocks = numberOfLocks;
        this.locks = new Object[numberOfLocks];
        for (int i = 0; i < numberOfLocks; i++) {
            locks[i] = new Object();
        }
    }

    public void increment(int index) {
        int lockIndex = Math.abs(index % numberOfLocks); // 根据索引计算锁的索引
        synchronized (locks[lockIndex]) {
            data[index]++;
        }
    }

    public int get(int index) {
        int lockIndex = Math.abs(index % numberOfLocks); // 根据索引计算锁的索引
        synchronized (locks[lockIndex]) {
            return data[index];
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int arraySize = 100;
        int numberOfLocks = 10;
        LockStripingExample example = new LockStripingExample(arraySize, numberOfLocks);

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < arraySize; i++) {
                example.increment(i);
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < arraySize; i++) {
                example.increment(i);
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        int sum = 0;
        for(int i = 0; i < arraySize; i++) {
            sum += example.get(i);
        }

        System.out.println("Sum: " + sum);
    }
}

在这个例子中,我们将数组 data 分成 numberOfLocks 个区域,每个区域使用一个独立的锁。increment 方法根据索引计算锁的索引,并使用对应的锁来保护数组元素。这样,不同的线程可以同时访问不同的区域,从而提高并发度。

适用场景: 对数组或集合等数据结构进行并发访问,且可以根据索引或其他方式将数据分成多个区域的场景。

5. 使用原子类

java.util.concurrent.atomic 包提供了一系列原子类,例如 AtomicIntegerAtomicLongAtomicReference 等。这些类提供了原子性的操作,可以避免使用 synchronized 关键字。原子类通常基于 CAS (Compare and Swap) 操作实现,在高并发场景下,性能通常优于 synchronized

示例:

原始代码(使用 synchronized):

public class Counter {

    private int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }

    public static void main(String[] args) throws InterruptedException {
        Counter counter = new Counter();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Count: " + counter.getCount());
    }
}

优化后的代码(使用 AtomicInteger):

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicCounter {

    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
        count.incrementAndGet();
    }

    public int getCount() {
        return count.get();
    }

    public static void main(String[] args) throws InterruptedException {
        AtomicCounter counter = new AtomicCounter();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter.increment();
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("Count: " + counter.getCount());
    }
}

在这个例子中,我们将 int 类型的 count 变量替换为 AtomicInteger 类型的 count 变量,并使用 incrementAndGet 方法进行原子性的递增操作。这样,就可以避免使用 synchronized 关键字,提高并发度。

适用场景: 对单个变量进行原子性操作的场景,例如计数器、标志位等。

策略选择

选择合适的锁粒度拆分策略需要根据具体的应用场景和性能需求进行权衡。以下是一些建议:

策略 适用场景 优点 缺点
数据结构拆分 频繁读写共享数据结构的场景 提高并发度,减少锁竞争 实现相对复杂,需要选择合适的数据结构
业务逻辑拆分 同步代码块中包含多个独立操作,且只有部分操作需要同步的场景 减少锁的持有时间,提高并发度 需要仔细分析业务逻辑,确保拆分后的代码仍然是线程安全的
读写锁分离 读多写少的并发访问场景 允许多个线程同时读取共享资源,提高并发度 只适用于读多写少的场景,如果写操作频繁,性能反而会下降
锁剥离 对数组或集合等数据结构进行并发访问,且可以根据索引或其他方式将数据分成多个区域的场景 减少锁竞争,提高并发度 需要合理选择锁的数量,如果锁的数量太少,仍然会存在锁竞争;如果锁的数量太多,会增加内存开销
使用原子类 对单个变量进行原子性操作的场景,例如计数器、标志位等 避免使用 synchronized 关键字,提高并发度 只适用于对单个变量进行原子性操作的场景,无法保护多个变量之间的依赖关系

在实际应用中,可以结合多种策略来优化代码。例如,可以先使用业务逻辑拆分来减少锁的持有时间,然后再使用读写锁分离来提高读操作的并发度。

总结

通过合理的锁粒度拆分策略,可以有效地减少锁竞争,提高并发度,从而提升程序的吞吐量。在选择拆分策略时,需要根据具体的应用场景和性能需求进行权衡,并进行充分的测试,以确保优化后的代码能够达到预期的效果。

一些建议和思考

  • 避免过度优化: 不要为了优化而优化,只有当锁竞争真正成为性能瓶颈时,才需要考虑使用锁粒度拆分策略。
  • 进行性能测试: 在优化代码之前和之后,都要进行性能测试,以评估优化效果。
  • 注意线程安全: 在拆分锁粒度时,一定要确保拆分后的代码仍然是线程安全的。
  • 考虑代码可读性: 锁粒度拆分可能会增加代码的复杂性,因此需要在性能和可读性之间进行权衡。
  • 使用工具辅助分析: 利用JProfiler, VisualVM等工具进行锁竞争分析,找出最需要优化的热点代码。

希望今天的分享能够帮助大家更好地理解和应用锁粒度拆分策略。 感谢大家的聆听!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注