Java并发编程:Semaphore信号量深度解析
大家好,今天我们来深入探讨Java并发编程中的一个重要工具:Semaphore信号量。 Semaphore在并发编程中扮演着资源控制的角色,它允许我们限制同时访问特定资源的线程数量,从而避免资源竞争和数据损坏。 相比于锁机制,Semaphore提供了更细粒度的并发控制能力,尤其适用于控制对共享资源(例如数据库连接、文件句柄等)的并发访问。
1. Semaphore的本质:计数器与许可
Semaphore本质上是一个计数器,它维护着一定数量的“许可”(permit)。 线程想要访问受Semaphore保护的资源,必须先获取一个许可。 当线程获取许可时,计数器减1; 当线程释放许可时,计数器加1。 如果计数器为0,则试图获取许可的线程将被阻塞,直到有其他线程释放许可,计数器大于0时才能获取许可。
Semaphore主要有两个核心方法:
acquire(): 尝试获取一个许可。 如果许可可用(计数器大于0),则计数器减1,线程继续执行; 否则,线程将被阻塞,直到有许可可用。release(): 释放一个许可。 计数器加1,并唤醒等待许可的线程(如果有)。
Semaphore的构造方法
Semaphore类提供了几个构造方法,常用的有:
Semaphore(int permits): 创建一个具有给定许可数量的Semaphore。Semaphore(int permits, boolean fair): 创建一个具有给定许可数量的Semaphore,并指定是否使用公平策略。fair参数为true表示使用公平策略,即等待时间最长的线程优先获取许可;false表示使用非公平策略,线程获取许可的顺序是不确定的。
公平性
公平性是Semaphore的一个重要特性。 如果Semaphore被设置为公平模式,那么等待时间最长的线程将优先获得许可。 这可以避免某些线程一直被“饿死”的情况,提高系统的公平性。 但是,公平模式通常会带来一定的性能开销,因为需要维护等待队列。 非公平模式下,线程获取许可的顺序是不确定的,可能会导致某些线程饥饿,但通常性能更好。
2. Semaphore的应用场景:资源限制与流量控制
Semaphore最常见的应用场景是限制对共享资源的并发访问数量。 例如,限制数据库连接池的大小,限制同时访问某个文件的线程数量,或者限制某个API的调用频率。
2.1 限制数据库连接池大小
假设我们有一个数据库连接池,最多允许同时存在10个连接。 我们可以使用Semaphore来控制并发访问数据库连接的数量,避免连接耗尽。
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class DatabaseConnectionPool {
private final String url;
private final String user;
private final String password;
private final int maxConnections;
private final Semaphore semaphore;
private final List<Connection> connections = new ArrayList<>();
public DatabaseConnectionPool(String url, String user, String password, int maxConnections) {
this.url = url;
this.user = user;
this.password = password;
this.maxConnections = maxConnections;
this.semaphore = new Semaphore(maxConnections);
}
public Connection getConnection() throws InterruptedException, SQLException {
semaphore.acquire(); // 获取许可,如果连接池已满,则阻塞
Connection connection = DriverManager.getConnection(url, user, password);
connections.add(connection);
return connection;
}
public void releaseConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
connections.remove(connection);
semaphore.release(); // 释放许可
}
}
public void shutdown() throws SQLException {
for (Connection connection : connections) {
connection.close();
}
connections.clear();
semaphore.release(maxConnections - semaphore.availablePermits()); //释放所有剩余的许可
}
public static void main(String[] args) throws InterruptedException, SQLException {
DatabaseConnectionPool pool = new DatabaseConnectionPool("jdbc:mysql://localhost:3306/test", "root", "password", 10);
// 模拟多个线程并发访问数据库
for (int i = 0; i < 20; i++) {
new Thread(() -> {
Connection connection = null;
try {
connection = pool.getConnection();
System.out.println(Thread.currentThread().getName() + " acquired connection.");
// 模拟数据库操作
Thread.sleep(100);
} catch (InterruptedException | SQLException e) {
e.printStackTrace();
} finally {
pool.releaseConnection(connection);
System.out.println(Thread.currentThread().getName() + " released connection.");
}
}).start();
}
// 等待所有线程执行完成
Thread.sleep(5000);
pool.shutdown();
}
}
在这个例子中,Semaphore的许可数量等于数据库连接池的最大连接数。 当一个线程需要使用数据库连接时,它首先调用semaphore.acquire()获取许可。 如果连接池已满(semaphore的计数器为0),则线程将被阻塞,直到有其他线程释放连接。 当线程使用完连接后,它调用semaphore.release()释放许可,允许其他线程获取连接。
2.2 流量控制
Semaphore还可以用于流量控制,限制某个API的调用频率。 例如,我们希望限制某个API每秒最多被调用10次。 我们可以使用Semaphore来实现这个功能。
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class RateLimiter {
private final Semaphore semaphore;
private final int permitsPerSecond;
public RateLimiter(int permitsPerSecond) {
this.permitsPerSecond = permitsPerSecond;
this.semaphore = new Semaphore(permitsPerSecond);
startReplenishThread();
}
private void startReplenishThread() {
Thread replenishThread = new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(1); // 每秒补充许可
semaphore.release(permitsPerSecond - semaphore.availablePermits()); // 补充许可
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
replenishThread.setDaemon(true); // 设置为守护线程
replenishThread.start();
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public static void main(String[] args) throws InterruptedException {
RateLimiter rateLimiter = new RateLimiter(10);
for (int i = 0; i < 20; i++) {
if (rateLimiter.tryAcquire()) {
System.out.println("Request " + i + " processed.");
} else {
System.out.println("Request " + i + " rejected.");
}
Thread.sleep(50); // 模拟请求间隔
}
}
}
在这个例子中,Semaphore的初始许可数量等于每秒允许的调用次数。 我们启动一个后台线程,每秒钟向Semaphore中补充许可,使Semaphore的许可数量保持在最大值。 当一个线程需要调用API时,它调用semaphore.tryAcquire()尝试获取许可。 如果获取成功,则允许调用API; 否则,拒绝调用。
2.3 资源池的管理
Semaphore可以用于管理任何类型的资源池,例如线程池、对象池等。 通过控制Semaphore的许可数量,我们可以限制资源池的大小,避免资源耗尽。
3. Semaphore的进阶用法:多种许可获取与释放
除了acquire()和release()方法,Semaphore还提供了其他一些有用的方法,可以更灵活地控制并发访问。
acquire(int permits): 尝试获取指定数量的许可。 如果许可可用,则计数器减去指定的数量,线程继续执行; 否则,线程将被阻塞,直到有足够的许可可用。release(int permits): 释放指定数量的许可。 计数器加上指定的数量,并唤醒等待许可的线程(如果有)。tryAcquire(): 尝试获取一个许可,如果许可可用,则立即返回true,否则立即返回false,不会阻塞线程。tryAcquire(long timeout, TimeUnit unit): 尝试在指定的时间内获取一个许可。 如果在指定时间内获取到许可,则返回true,否则返回false。availablePermits(): 返回当前可用的许可数量。drainPermits(): 获取并返回所有可用的许可,并将计数器设置为0。 这可以用于一次性阻止所有线程访问资源。hasQueuedThreads(): 判断是否有线程在等待获取许可。getQueueLength(): 返回等待获取许可的线程数量。
3.1 批量许可获取与释放
acquire(int permits)和release(int permits)方法允许我们一次性获取或释放多个许可。 这在某些场景下非常有用。 例如,我们需要执行一个需要占用多个资源的复杂操作,可以一次性获取多个许可,确保操作能够顺利完成。
import java.util.concurrent.Semaphore;
public class BulkPermitExample {
private static final Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is trying to acquire 3 permits.");
semaphore.acquire(3); // 获取3个许可
System.out.println(Thread.currentThread().getName() + " acquired 3 permits.");
// 模拟需要占用多个资源的操作
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(3); // 释放3个许可
System.out.println(Thread.currentThread().getName() + " released 3 permits.");
}
}).start();
}
}
}
在这个例子中,每个线程尝试获取3个许可。 由于Semaphore的初始许可数量为10,因此最多只能有3个线程同时获取到许可。
3.2 非阻塞许可获取:tryAcquire()
tryAcquire()方法允许我们尝试获取许可,如果许可不可用,则立即返回false,不会阻塞线程。 这可以用于实现非阻塞的资源访问。
import java.util.concurrent.Semaphore;
public class NonBlockingPermitExample {
private static final Semaphore semaphore = new Semaphore(1);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
if (semaphore.tryAcquire()) {
try {
System.out.println(Thread.currentThread().getName() + " acquired permit.");
// 模拟需要占用资源的操作
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println(Thread.currentThread().getName() + " released permit.");
}
} else {
System.out.println(Thread.currentThread().getName() + " failed to acquire permit.");
}
}).start();
Thread.sleep(100); // 稍微延迟启动线程
}
}
}
在这个例子中,每个线程尝试获取一个许可。 由于Semaphore的初始许可数量为1,因此只有一个线程能够获取到许可。 其他线程由于无法获取到许可,会立即执行else分支的代码。
3.3 超时许可获取:tryAcquire(long timeout, TimeUnit unit)
tryAcquire(long timeout, TimeUnit unit)方法允许我们尝试在指定的时间内获取许可。 如果在指定时间内获取到许可,则返回true,否则返回false。 这可以用于避免线程长时间阻塞。
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class TimeoutPermitExample {
private static final Semaphore semaphore = new Semaphore(1);
public static void main(String[] args) throws InterruptedException {
// 先让一个线程占用许可
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquired permit and will hold it for 5 seconds.");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println(Thread.currentThread().getName() + " released permit.");
}
}).start();
Thread.sleep(100); // 稍微延迟启动其他线程
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " is trying to acquire permit with a timeout of 2 seconds.");
if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
try {
System.out.println(Thread.currentThread().getName() + " acquired permit.");
// 模拟需要占用资源的操作
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println(Thread.currentThread().getName() + " released permit.");
}
} else {
System.out.println(Thread.currentThread().getName() + " failed to acquire permit within the timeout.");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
在这个例子中,第一个线程占用许可并持有5秒钟。 其他线程尝试在2秒钟内获取许可。 由于第一个线程需要5秒钟才能释放许可,因此其他线程可能会在超时时间内无法获取到许可。
4. Semaphore与Lock的区别与选择
Semaphore和Lock都是用于控制并发访问的工具,但它们之间有一些重要的区别。
| 特性 | Semaphore | Lock |
|---|---|---|
| 本质 | 计数器,维护一定数量的许可 | 锁,用于保护临界区 |
| 适用场景 | 控制对共享资源的并发访问数量 | 保护临界区,实现互斥访问 |
| 许可数量 | 可以有多个许可 | 只有一个锁 |
| 所有权 | 没有明确的所有权概念 | 必须由获取锁的线程释放 |
| 灵活性 | 更加灵活,可以实现更细粒度的并发控制 | 相对简单,适用于简单的互斥场景 |
| 性能 | 在某些场景下可能比Lock性能更好 | 在简单的互斥场景下通常性能更好 |
| 可重入性 | 默认不支持可重入,但可以通过组合实现 | 通常支持可重入 |
选择Semaphore还是Lock
选择Semaphore还是Lock取决于具体的应用场景。
- 如果需要控制对共享资源的并发访问数量,例如限制数据库连接池的大小,或者限制某个API的调用频率,则应该使用Semaphore。
- 如果需要保护临界区,实现互斥访问,则应该使用Lock。
一般来说,如果可以使用Lock解决问题,则应该优先使用Lock,因为Lock更加简单易用。 只有在需要更细粒度的并发控制时,才应该考虑使用Semaphore。
5. Semaphore使用注意事项:避免错误使用
虽然Semaphore是一个强大的并发工具,但也需要注意一些事项,避免错误使用。
- 确保正确释放许可: 必须确保在所有情况下都释放许可,即使发生异常。 可以使用
try-finally块来保证许可的释放。 - 避免过度释放许可: 过度释放许可会导致计数器超过初始值,可能会导致并发问题。
- 注意公平性: 根据实际需求选择公平模式或非公平模式。 公平模式可以提高系统的公平性,但可能会带来一定的性能开销。
- 避免死锁: 在使用多个Semaphore时,需要注意避免死锁。 可以使用资源排序等方法来避免死锁。
- 注意线程中断:
acquire()方法可能会被中断。 需要处理InterruptedException异常,并在必要时恢复中断状态。 - 不要依赖
availablePermits()的精确值:availablePermits()方法返回的是一个近似值,不应该依赖它的精确值来进行复杂的逻辑判断。
6. 总结与回顾:有效管理并发资源
Semaphore是一个强大的并发工具,可以用于控制对共享资源的并发访问数量,实现流量控制,以及管理资源池。 它提供了比锁机制更细粒度的并发控制能力,适用于复杂的并发场景。 在使用Semaphore时,需要注意正确释放许可,避免过度释放许可,注意公平性,避免死锁,以及处理线程中断。 正确使用Semaphore可以有效地提高系统的并发性能和稳定性。 通过理解Semaphore的原理和应用场景,我们可以更好地掌握Java并发编程,编写出更高效、更可靠的多线程程序。