Java并发控制:信号量Semaphore在资源有限场景的应用
各位朋友,大家好!今天我们来聊聊Java并发控制中一个非常重要的工具——信号量(Semaphore)。在实际应用中,我们经常会遇到资源有限的场景,例如数据库连接池、线程池、或者某种硬件资源等等。如何有效地管理这些资源,防止资源耗尽,保证系统的稳定性和性能,是并发编程中一个关键问题。而信号量,正是解决这类问题的利器。
1. 什么是信号量?
信号量(Semaphore)是一个计数器,用于控制对共享资源的访问。它可以被看作是一种“许可证”机制。每个信号量维护一个许可证的数量,线程在访问共享资源之前,需要先获取一个许可证;当线程完成任务后,释放许可证,将其归还给信号量。
- 获取许可证(acquire()): 如果信号量中存在许可证(计数器大于0),则线程可以获取许可证,计数器减1。如果计数器为0,则线程会被阻塞,直到有其他线程释放许可证。
- 释放许可证(release()): 线程释放许可证后,计数器加1。如果有线程因为等待许可证而被阻塞,则其中一个线程会被唤醒,继续执行。
信号量可以分为两类:
- 二元信号量(Binary Semaphore): 计数器只能是0或1,类似于互斥锁(Mutex)。
- 计数信号量(Counting Semaphore): 计数器可以是任意非负整数,用于控制对多个资源的并发访问。
2. 信号量的工作原理
信号量内部维护了一个计数器和一个等待队列。
- 计数器(permits): 表示当前可用的许可证数量。
- 等待队列(wait queue): 存储了所有等待获取许可证的线程。
当一个线程调用acquire()方法时,信号量会执行以下步骤:
- 检查计数器是否大于0。
- 如果计数器大于0,则将计数器减1,并允许线程继续执行。
- 如果计数器等于0,则将当前线程加入等待队列,并阻塞该线程。
当一个线程调用release()方法时,信号量会执行以下步骤:
- 将计数器加1。
- 检查等待队列是否为空。
- 如果等待队列不为空,则从等待队列中唤醒一个线程,让其获取许可证并继续执行。
3. Java中的Semaphore类
在Java中,java.util.concurrent.Semaphore类提供了对信号量的支持。它提供了以下主要方法:
| 方法 | 描述 |
|---|---|
Semaphore(int permits) |
构造函数,创建一个具有给定数量的许可证的信号量。 |
Semaphore(int permits, boolean fair) |
构造函数,创建一个具有给定数量的许可证的信号量,并指定是否使用公平策略。fair为true时,等待时间最长的线程会优先获取许可证。 |
acquire() |
获取一个许可证。如果当前没有可用的许可证,则阻塞,直到有其他线程释放许可证。 |
acquire(int permits) |
获取指定数量的许可证。如果当前没有足够数量的许可证,则阻塞,直到有其他线程释放足够的许可证。 |
tryAcquire() |
尝试获取一个许可证。如果当前有可用的许可证,则立即获取并返回true;否则立即返回false,不会阻塞。 |
tryAcquire(long timeout, TimeUnit unit) |
尝试在指定的时间内获取一个许可证。如果成功获取,则返回true;如果在超时时间内没有获取到,则返回false,不会继续阻塞。 |
release() |
释放一个许可证,增加信号量的许可证计数。 |
release(int permits) |
释放指定数量的许可证,增加信号量的许可证计数。 |
availablePermits() |
返回当前可用的许可证数量。 |
drainPermits() |
获取并返回立即可用的所有许可证的数量。 |
4. 信号量的应用场景
信号量在资源有限的并发场景中非常有用。以下是一些常见的应用场景:
- 数据库连接池: 控制同时访问数据库的连接数,防止连接数过多导致数据库崩溃。
- 线程池: 控制同时执行的任务数量,防止任务过多导致系统资源耗尽。
- 文件下载: 限制同时下载文件的线程数量,防止带宽被过度占用。
- 打印机: 限制同时使用打印机的用户数量,防止打印任务冲突。
- 共享资源: 保护对共享资源的并发访问,例如共享内存、共享文件等。
5. 代码示例:数据库连接池
下面是一个使用信号量实现的简单数据库连接池的例子:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class ConnectionPool {
private String url;
private String user;
private String password;
private int maxConnections;
private List<Connection> connections;
private Semaphore semaphore;
public ConnectionPool(String url, String user, String password, int maxConnections) {
this.url = url;
this.user = user;
this.password = password;
this.maxConnections = maxConnections;
this.connections = new ArrayList<>();
this.semaphore = new Semaphore(maxConnections); // 初始化信号量,许可证数量为maxConnections
initializeConnections();
}
private void initializeConnections() {
try {
for (int i = 0; i < maxConnections; i++) {
Connection connection = DriverManager.getConnection(url, user, password);
connections.add(connection);
}
} catch (SQLException e) {
e.printStackTrace();
}
}
public Connection getConnection() throws InterruptedException, SQLException {
semaphore.acquire(); // 获取许可证,如果当前没有可用连接,则阻塞
Connection connection = null;
synchronized (connections) {
if (!connections.isEmpty()) {
connection = connections.remove(0);
}
}
if (connection == null) {
connection = DriverManager.getConnection(url,user,password);
}
return connection;
}
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (connections) {
connections.add(connection);
}
semaphore.release(); // 释放许可证,允许其他线程获取连接
}
}
public void close() throws SQLException {
for (Connection connection : connections) {
connection.close();
}
}
public static void main(String[] args) throws InterruptedException {
String url = "jdbc:mysql://localhost:3306/testdb";
String user = "root";
String password = "password";
int maxConnections = 5;
ConnectionPool connectionPool = new ConnectionPool(url, user, password, maxConnections);
// 模拟多个线程访问数据库
for (int i = 0; i < 10; i++) {
new Thread(() -> {
Connection connection = null;
try {
connection = connectionPool.getConnection();
System.out.println(Thread.currentThread().getName() + " 获取到连接");
// 模拟数据库操作
Thread.sleep(1000);
} catch (InterruptedException | SQLException e) {
e.printStackTrace();
} finally {
if(connection != null) {
connectionPool.releaseConnection(connection);
System.out.println(Thread.currentThread().getName() + " 释放连接");
}
}
}).start();
}
Thread.sleep(5000); // 等待所有线程执行完毕
try {
connectionPool.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
在这个例子中,ConnectionPool类维护了一个数据库连接的列表。semaphore控制同时可以获取的连接数量。getConnection()方法会尝试获取一个许可证,如果当前没有可用连接,则会阻塞。releaseConnection()方法会释放许可证,允许其他线程获取连接。
6. 代码示例:线程池任务调度
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class TaskScheduler {
private final int maxConcurrentTasks;
private final Semaphore semaphore;
private final ExecutorService executorService;
public TaskScheduler(int maxConcurrentTasks) {
this.maxConcurrentTasks = maxConcurrentTasks;
this.semaphore = new Semaphore(maxConcurrentTasks);
this.executorService = Executors.newCachedThreadPool(); // 或者使用其他类型的线程池
}
public void execute(Runnable task) {
try {
semaphore.acquire(); // 获取许可证,限制并发任务数量
executorService.execute(() -> {
try {
task.run();
} finally {
semaphore.release(); // 释放许可证
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) throws InterruptedException {
int maxConcurrentTasks = 3;
TaskScheduler taskScheduler = new TaskScheduler(maxConcurrentTasks);
for (int i = 0; i < 10; i++) {
final int taskId = i;
taskScheduler.execute(() -> {
try {
System.out.println("Task " + taskId + " started by " + Thread.currentThread().getName());
Thread.sleep(2000); // 模拟任务执行时间
System.out.println("Task " + taskId + " finished by " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
Thread.sleep(10000); // 等待一段时间让任务执行完毕
taskScheduler.shutdown();
}
}
这个例子中,TaskScheduler类使用信号量来限制同时执行的任务数量。execute()方法会尝试获取一个许可证,只有获取到许可证才能提交任务到线程池执行。任务执行完毕后,会释放许可证。
7. 公平性策略
Semaphore类提供了一个可选的fair参数,用于指定是否使用公平策略。
- 公平策略(fair = true): 按照线程请求许可证的顺序来分配许可证,等待时间最长的线程会优先获取许可证。
- 非公平策略(fair = false): 允许插队,即新请求的线程有可能比等待时间长的线程更快地获取许可证。
公平策略可以避免某些线程饥饿,但会降低整体的吞吐量。非公平策略可以提高吞吐量,但有可能导致某些线程长时间无法获取许可证。
在选择公平性策略时,需要根据具体的应用场景进行权衡。如果对公平性要求较高,则应该使用公平策略;如果对吞吐量要求较高,则可以使用非公平策略。默认情况下,Semaphore使用的是非公平策略。
8. 信号量的注意事项
- 许可证的释放: 必须确保每个获取许可证的线程最终都会释放许可证,否则会导致许可证泄露,最终导致所有线程都被阻塞。可以使用
try-finally块来确保许可证的释放。 - 并发访问: 信号量本身是线程安全的,但是在使用信号量保护的共享资源时,仍然需要注意并发访问的问题,可以使用
synchronized或其他并发控制机制来保护共享资源。 - 死锁: 避免多个线程互相等待对方释放许可证,导致死锁。
- 中断:
acquire()方法可以被中断,因此需要在代码中处理InterruptedException异常。 - 资源回收: 确保在使用完资源后,及时释放许可证,避免资源浪费。
9. 信号量 vs. 锁
信号量和锁都可以用于控制对共享资源的访问,但它们之间有一些重要的区别:
| 特性 | 信号量(Semaphore) | 锁(Lock/Mutex) |
|---|---|---|
| 目的 | 控制对多个资源的并发访问 | 控制对单个资源的独占访问 |
| 计数器 | 维护一个许可证计数器 | 只有两种状态:锁定和未锁定 |
| 获取和释放 | 可以由不同的线程获取和释放许可证 | 必须由同一个线程获取和释放锁 |
| 用途 | 资源池管理、流量控制 | 临界区保护、同步 |
| 灵活性 | 更加灵活,可以控制并发访问的数量 | 相对简单,只允许一个线程访问 |
| 复杂性 | 相对复杂,需要考虑许可证的数量和公平性策略 | 相对简单,只需要考虑锁定和解锁 |
| 可重入性 | 标准Semaphore本身不支持可重入,但可以基于Semaphore实现可重入锁 | Lock接口中的ReentrantLock支持可重入 |
简单来说,锁是排他性的,一次只允许一个线程访问资源;而信号量可以允许多个线程并发访问资源,只要许可证足够。
总结来说:信号量是管理资源的有效工具。
需要根据实际场景选择合适的并发控制方式。
理解信号量的工作原理和注意事项至关重要。