Java应用中的并发控制:信号量Semaphore在资源有限场景的应用

Java并发控制:信号量Semaphore在资源有限场景的应用

各位朋友,大家好!今天我们来聊聊Java并发控制中一个非常重要的工具——信号量(Semaphore)。在实际应用中,我们经常会遇到资源有限的场景,例如数据库连接池、线程池、或者某种硬件资源等等。如何有效地管理这些资源,防止资源耗尽,保证系统的稳定性和性能,是并发编程中一个关键问题。而信号量,正是解决这类问题的利器。

1. 什么是信号量?

信号量(Semaphore)是一个计数器,用于控制对共享资源的访问。它可以被看作是一种“许可证”机制。每个信号量维护一个许可证的数量,线程在访问共享资源之前,需要先获取一个许可证;当线程完成任务后,释放许可证,将其归还给信号量。

  • 获取许可证(acquire()): 如果信号量中存在许可证(计数器大于0),则线程可以获取许可证,计数器减1。如果计数器为0,则线程会被阻塞,直到有其他线程释放许可证。
  • 释放许可证(release()): 线程释放许可证后,计数器加1。如果有线程因为等待许可证而被阻塞,则其中一个线程会被唤醒,继续执行。

信号量可以分为两类:

  • 二元信号量(Binary Semaphore): 计数器只能是0或1,类似于互斥锁(Mutex)。
  • 计数信号量(Counting Semaphore): 计数器可以是任意非负整数,用于控制对多个资源的并发访问。

2. 信号量的工作原理

信号量内部维护了一个计数器和一个等待队列。

  • 计数器(permits): 表示当前可用的许可证数量。
  • 等待队列(wait queue): 存储了所有等待获取许可证的线程。

当一个线程调用acquire()方法时,信号量会执行以下步骤:

  1. 检查计数器是否大于0。
  2. 如果计数器大于0,则将计数器减1,并允许线程继续执行。
  3. 如果计数器等于0,则将当前线程加入等待队列,并阻塞该线程。

当一个线程调用release()方法时,信号量会执行以下步骤:

  1. 将计数器加1。
  2. 检查等待队列是否为空。
  3. 如果等待队列不为空,则从等待队列中唤醒一个线程,让其获取许可证并继续执行。

3. Java中的Semaphore类

在Java中,java.util.concurrent.Semaphore类提供了对信号量的支持。它提供了以下主要方法:

方法 描述
Semaphore(int permits) 构造函数,创建一个具有给定数量的许可证的信号量。
Semaphore(int permits, boolean fair) 构造函数,创建一个具有给定数量的许可证的信号量,并指定是否使用公平策略。fairtrue时,等待时间最长的线程会优先获取许可证。
acquire() 获取一个许可证。如果当前没有可用的许可证,则阻塞,直到有其他线程释放许可证。
acquire(int permits) 获取指定数量的许可证。如果当前没有足够数量的许可证,则阻塞,直到有其他线程释放足够的许可证。
tryAcquire() 尝试获取一个许可证。如果当前有可用的许可证,则立即获取并返回true;否则立即返回false,不会阻塞。
tryAcquire(long timeout, TimeUnit unit) 尝试在指定的时间内获取一个许可证。如果成功获取,则返回true;如果在超时时间内没有获取到,则返回false,不会继续阻塞。
release() 释放一个许可证,增加信号量的许可证计数。
release(int permits) 释放指定数量的许可证,增加信号量的许可证计数。
availablePermits() 返回当前可用的许可证数量。
drainPermits() 获取并返回立即可用的所有许可证的数量。

4. 信号量的应用场景

信号量在资源有限的并发场景中非常有用。以下是一些常见的应用场景:

  • 数据库连接池: 控制同时访问数据库的连接数,防止连接数过多导致数据库崩溃。
  • 线程池: 控制同时执行的任务数量,防止任务过多导致系统资源耗尽。
  • 文件下载: 限制同时下载文件的线程数量,防止带宽被过度占用。
  • 打印机: 限制同时使用打印机的用户数量,防止打印任务冲突。
  • 共享资源: 保护对共享资源的并发访问,例如共享内存、共享文件等。

5. 代码示例:数据库连接池

下面是一个使用信号量实现的简单数据库连接池的例子:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class ConnectionPool {

    private String url;
    private String user;
    private String password;
    private int maxConnections;
    private List<Connection> connections;
    private Semaphore semaphore;

    public ConnectionPool(String url, String user, String password, int maxConnections) {
        this.url = url;
        this.user = user;
        this.password = password;
        this.maxConnections = maxConnections;
        this.connections = new ArrayList<>();
        this.semaphore = new Semaphore(maxConnections); // 初始化信号量,许可证数量为maxConnections
        initializeConnections();
    }

    private void initializeConnections() {
        try {
            for (int i = 0; i < maxConnections; i++) {
                Connection connection = DriverManager.getConnection(url, user, password);
                connections.add(connection);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public Connection getConnection() throws InterruptedException, SQLException {
        semaphore.acquire(); // 获取许可证,如果当前没有可用连接,则阻塞
        Connection connection = null;
        synchronized (connections) {
            if (!connections.isEmpty()) {
                connection = connections.remove(0);
            }
        }
        if (connection == null) {
                connection = DriverManager.getConnection(url,user,password);
        }

        return connection;
    }

    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (connections) {
                connections.add(connection);
            }
            semaphore.release(); // 释放许可证,允许其他线程获取连接
        }
    }

    public void close() throws SQLException {
        for (Connection connection : connections) {
            connection.close();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        String url = "jdbc:mysql://localhost:3306/testdb";
        String user = "root";
        String password = "password";
        int maxConnections = 5;

        ConnectionPool connectionPool = new ConnectionPool(url, user, password, maxConnections);

        // 模拟多个线程访问数据库
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                Connection connection = null;
                try {
                    connection = connectionPool.getConnection();
                    System.out.println(Thread.currentThread().getName() + " 获取到连接");
                    // 模拟数据库操作
                    Thread.sleep(1000);
                } catch (InterruptedException | SQLException e) {
                    e.printStackTrace();
                } finally {
                    if(connection != null) {
                        connectionPool.releaseConnection(connection);
                        System.out.println(Thread.currentThread().getName() + " 释放连接");
                    }

                }
            }).start();
        }

        Thread.sleep(5000); // 等待所有线程执行完毕
        try {
            connectionPool.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

在这个例子中,ConnectionPool类维护了一个数据库连接的列表。semaphore控制同时可以获取的连接数量。getConnection()方法会尝试获取一个许可证,如果当前没有可用连接,则会阻塞。releaseConnection()方法会释放许可证,允许其他线程获取连接。

6. 代码示例:线程池任务调度

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class TaskScheduler {

    private final int maxConcurrentTasks;
    private final Semaphore semaphore;
    private final ExecutorService executorService;

    public TaskScheduler(int maxConcurrentTasks) {
        this.maxConcurrentTasks = maxConcurrentTasks;
        this.semaphore = new Semaphore(maxConcurrentTasks);
        this.executorService = Executors.newCachedThreadPool(); // 或者使用其他类型的线程池
    }

    public void execute(Runnable task) {
        try {
            semaphore.acquire(); // 获取许可证,限制并发任务数量
            executorService.execute(() -> {
                try {
                    task.run();
                } finally {
                    semaphore.release(); // 释放许可证
                }
            });
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int maxConcurrentTasks = 3;
        TaskScheduler taskScheduler = new TaskScheduler(maxConcurrentTasks);

        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            taskScheduler.execute(() -> {
                try {
                    System.out.println("Task " + taskId + " started by " + Thread.currentThread().getName());
                    Thread.sleep(2000); // 模拟任务执行时间
                    System.out.println("Task " + taskId + " finished by " + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        Thread.sleep(10000); // 等待一段时间让任务执行完毕
        taskScheduler.shutdown();
    }
}

这个例子中,TaskScheduler类使用信号量来限制同时执行的任务数量。execute()方法会尝试获取一个许可证,只有获取到许可证才能提交任务到线程池执行。任务执行完毕后,会释放许可证。

7. 公平性策略

Semaphore类提供了一个可选的fair参数,用于指定是否使用公平策略。

  • 公平策略(fair = true): 按照线程请求许可证的顺序来分配许可证,等待时间最长的线程会优先获取许可证。
  • 非公平策略(fair = false): 允许插队,即新请求的线程有可能比等待时间长的线程更快地获取许可证。

公平策略可以避免某些线程饥饿,但会降低整体的吞吐量。非公平策略可以提高吞吐量,但有可能导致某些线程长时间无法获取许可证。

在选择公平性策略时,需要根据具体的应用场景进行权衡。如果对公平性要求较高,则应该使用公平策略;如果对吞吐量要求较高,则可以使用非公平策略。默认情况下,Semaphore使用的是非公平策略。

8. 信号量的注意事项

  • 许可证的释放: 必须确保每个获取许可证的线程最终都会释放许可证,否则会导致许可证泄露,最终导致所有线程都被阻塞。可以使用try-finally块来确保许可证的释放。
  • 并发访问: 信号量本身是线程安全的,但是在使用信号量保护的共享资源时,仍然需要注意并发访问的问题,可以使用synchronized或其他并发控制机制来保护共享资源。
  • 死锁: 避免多个线程互相等待对方释放许可证,导致死锁。
  • 中断: acquire()方法可以被中断,因此需要在代码中处理InterruptedException异常。
  • 资源回收: 确保在使用完资源后,及时释放许可证,避免资源浪费。

9. 信号量 vs. 锁

信号量和锁都可以用于控制对共享资源的访问,但它们之间有一些重要的区别:

特性 信号量(Semaphore) 锁(Lock/Mutex)
目的 控制对多个资源的并发访问 控制对单个资源的独占访问
计数器 维护一个许可证计数器 只有两种状态:锁定和未锁定
获取和释放 可以由不同的线程获取和释放许可证 必须由同一个线程获取和释放锁
用途 资源池管理、流量控制 临界区保护、同步
灵活性 更加灵活,可以控制并发访问的数量 相对简单,只允许一个线程访问
复杂性 相对复杂,需要考虑许可证的数量和公平性策略 相对简单,只需要考虑锁定和解锁
可重入性 标准Semaphore本身不支持可重入,但可以基于Semaphore实现可重入锁 Lock接口中的ReentrantLock支持可重入

简单来说,锁是排他性的,一次只允许一个线程访问资源;而信号量可以允许多个线程并发访问资源,只要许可证足够。

总结来说:信号量是管理资源的有效工具。
需要根据实际场景选择合适的并发控制方式。
理解信号量的工作原理和注意事项至关重要。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注