JAVA WebSocket 广播消息异常?使用 ConcurrentHashMap 管理会话连接

JAVA WebSocket 广播消息异常?ConcurrentHashMap 管理会话连接?

大家好,今天我们来深入探讨一个常见的 WebSocket 开发问题:在使用 ConcurrentHashMap 管理 WebSocket 会话连接时,如何避免和处理广播消息过程中可能出现的异常。WebSocket 为我们提供了实时的双向通信能力,而广播消息则是 WebSocket 应用中一个非常普遍的需求,例如实时聊天、在线游戏、监控系统等。然而,在高并发场景下,不当的处理方式很容易导致广播消息失败,甚至影响整个应用的稳定性。

WebSocket 基础回顾

首先,我们快速回顾一下 WebSocket 的基础概念:

  • WebSocket 协议: 一种在单个 TCP 连接上进行全双工通信的协议。与传统的 HTTP 请求-响应模式不同,WebSocket 建立连接后可以保持长连接,服务器可以主动向客户端推送数据。

  • WebSocket 会话: 代表客户端与服务器之间建立的 WebSocket 连接。在 Java 中,通常使用 javax.websocket.Session 接口来表示。

  • Endpoint: WebSocket 服务端点,负责处理客户端的连接请求和消息。需要使用 @ServerEndpoint 注解进行标记。

一个简单的 WebSocket 服务端点可能如下所示:

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;

@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    @OnOpen
    public void onOpen(Session session) {
        System.out.println("Client connected: " + session.getId());
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("Received message: " + message + " from " + session.getId());
        try {
            session.getBasicRemote().sendText("Server received: " + message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @OnClose
    public void onClose(Session session) {
        System.out.println("Client disconnected: " + session.getId());
    }

    @OnError
    public void onError(Throwable error, Session session) {
        System.err.println("Error occurred: " + error.getMessage());
    }
}

使用 ConcurrentHashMap 管理 WebSocket 会话

为了实现广播消息,我们需要维护一个存储所有活跃 WebSocket 会话的集合。ConcurrentHashMap 是一个线程安全的哈希表,非常适合在多线程环境下使用。

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    private static Map<String, Session> sessions = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session) {
        sessions.put(session.getId(), session);
        System.out.println("Client connected: " + session.getId());
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("Received message: " + message + " from " + session.getId());
        broadcast(message); // 广播消息
    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session.getId());
        System.out.println("Client disconnected: " + session.getId());
    }

    @OnError
    public void onError(Throwable error, Session session) {
        System.err.println("Error occurred: " + error.getMessage());
        sessions.remove(session.getId());
    }

    private void broadcast(String message) {
        sessions.forEach((sessionId, session) -> {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                System.err.println("Error sending message to session " + sessionId + ": " + e.getMessage());
                // 如何处理异常?
            }
        });
    }
}

在这个示例中,我们使用 ConcurrentHashMap sessions 来存储所有 WebSocket 会话。onOpen 方法将新的会话添加到 sessions 中,onClose 方法从 sessions 中移除断开的会话。broadcast 方法遍历 sessions,将消息发送到每个会话。

广播消息异常的常见原因

broadcast 方法中,我们捕获了 IOException,这是广播消息过程中最常见的异常。IOException 的原因有很多,包括:

  • 客户端连接中断: 客户端可能在服务器发送消息之前断开了连接。
  • 网络问题: 网络不稳定可能导致消息发送失败。
  • 客户端处理能力不足: 客户端可能无法及时处理服务器发送的消息,导致连接超时或缓冲区溢出。

处理广播消息异常的策略

当广播消息失败时,我们不能简单地忽略异常。否则,可能会导致以下问题:

  • 消息丢失: 部分客户端可能无法收到消息。
  • 资源泄漏: 如果会话没有正确关闭,可能会导致资源泄漏。
  • 应用崩溃: 在极端情况下,未处理的异常可能导致应用崩溃。

以下是一些处理广播消息异常的策略:

  1. 删除无效会话:IOException 发生时,最重要的是要判断该会话是否已经失效。如果会话已经失效,应该从 sessions 中移除该会话,防止后续继续尝试向该会话发送消息。

    private void broadcast(String message) {
        sessions.forEach((sessionId, session) -> {
            try {
                session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                System.err.println("Error sending message to session " + sessionId + ": " + e.getMessage());
                // 移除无效会话
                if (session.isOpen()) {
                    try {
                        session.close(); //尝试关闭会话
                    } catch (IOException ex) {
                        System.err.println("Error closing session " + sessionId + ": " + ex.getMessage());
                    }
                }
                sessions.remove(sessionId);
            }
        });
    }
  2. 重试机制: 对于一些临时的网络问题,可以尝试重试发送消息。但是,需要注意重试的次数和间隔,避免无限重试导致死循环。 此外,重试机制需要防止消息乱序,尤其是在需要保证消息顺序的场景下。

    private void broadcast(String message) {
        int maxRetries = 3;
        long retryInterval = 1000; // 1 second
    
        sessions.forEach((sessionId, session) -> {
            boolean sent = false;
            int retryCount = 0;
    
            while (!sent && retryCount < maxRetries) {
                try {
                    session.getBasicRemote().sendText(message);
                    sent = true;
                } catch (IOException e) {
                    System.err.println("Error sending message to session " + sessionId + ", retry " + (retryCount + 1) + ": " + e.getMessage());
                    retryCount++;
                    try {
                        Thread.sleep(retryInterval);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        break; // 中断重试
                    }
                    if (session.isOpen()) {
                        try {
                            session.close(); //尝试关闭会话
                        } catch (IOException ex) {
                            System.err.println("Error closing session " + sessionId + ": " + ex.getMessage());
                        }
                    }
                    sessions.remove(sessionId);
    
                }
            }
    
            if (!sent) {
                System.err.println("Failed to send message to session " + sessionId + " after " + maxRetries + " retries.");
                // 移除无效会话
                if (session.isOpen()) {
                    try {
                        session.close(); //尝试关闭会话
                    } catch (IOException ex) {
                        System.err.println("Error closing session " + sessionId + ": " + ex.getMessage());
                    }
                }
                sessions.remove(sessionId);
            }
        });
    }
  3. 异步发送: 使用异步发送可以避免阻塞广播线程,提高应用的响应速度。可以使用 session.getAsyncRemote().sendText(message) 方法进行异步发送。

    private void broadcast(String message) {
        sessions.forEach((sessionId, session) -> {
            session.getAsyncRemote().sendText(message, result -> {
                if (!result.isOK()) {
                    System.err.println("Error sending message to session " + sessionId + ": " + result.getException().getMessage());
                    // 移除无效会话
                    if (session.isOpen()) {
                        try {
                            session.close(); //尝试关闭会话
                        } catch (IOException ex) {
                            System.err.println("Error closing session " + sessionId + ": " + ex.getMessage());
                        }
                    }
                    sessions.remove(sessionId);
                }
            });
        });
    }

    异步发送允许你使用 SendHandler 接口来处理发送结果。 SendHandleronResult 方法将在消息发送完成或失败时被调用。

  4. 限流: 为了防止服务器被过多的并发请求压垮,可以实施限流策略。可以使用令牌桶算法或漏桶算法来实现限流。

  5. 心跳检测: 通过定期发送心跳消息,可以检测客户端是否仍然在线。如果客户端长时间没有响应心跳消息,可以认为该会话已经失效,并将其从 sessions 中移除。

    客户端心跳示例(假设每30秒发送一次ping消息):

    let websocket = new WebSocket("ws://localhost:8080/websocket");
    
    websocket.onopen = function(event) {
      console.log("Connected to WebSocket server.");
      setInterval(function() {
        if (websocket.readyState === WebSocket.OPEN) {
          websocket.send("ping"); // 发送心跳消息
        }
      }, 30000); // 每30秒发送一次
    };
    
    websocket.onmessage = function(event) {
      console.log("Received: " + event.data);
    };
    
    websocket.onclose = function(event) {
      console.log("Disconnected from WebSocket server.");
    };

    服务器端心跳处理:

    import javax.websocket.*;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    @ServerEndpoint("/websocket")
    public class MyWebSocketEndpoint {
    
        private static Map<String, Session> sessions = new ConcurrentHashMap<>();
        private static final long HEARTBEAT_INTERVAL = 60 * 1000; // 60 seconds
        private long lastPongTime = System.currentTimeMillis();
        private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    
        @OnOpen
        public void onOpen(Session session) {
            sessions.put(session.getId(), session);
            System.out.println("Client connected: " + session.getId());
    
            // Schedule a task to check for heartbeats
            scheduler.scheduleAtFixedRate(() -> {
                if (System.currentTimeMillis() - lastPongTime > HEARTBEAT_INTERVAL * 2) {
                    System.out.println("Closing session due to heartbeat timeout: " + session.getId());
                    try {
                        session.close();
                    } catch (IOException e) {
                        System.err.println("Error closing session: " + e.getMessage());
                    } finally {
                        sessions.remove(session.getId());
                    }
                }else {
                    try {
                        session.getAsyncRemote().sendText("ping");
                    } catch (Exception e) {
                        System.err.println("Error sending ping to session: " + session.getId() + e.getMessage());
                    }
                }
            }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
    
        }
    
        @OnMessage
        public void onMessage(String message, Session session) {
            if ("ping".equals(message)) {
                // 忽略客户端发送的ping消息,仅用于避免控制台打印
            } else if ("pong".equals(message)) {
                lastPongTime = System.currentTimeMillis(); // 收到客户端的pong消息
                System.out.println("Received pong from client: " + session.getId());
    
            } else {
                System.out.println("Received message: " + message + " from " + session.getId());
                broadcast(message);
            }
        }
    
        @OnClose
        public void onClose(Session session) {
            sessions.remove(session.getId());
            System.out.println("Client disconnected: " + session.getId());
            scheduler.shutdown();
        }
    
        @OnError
        public void onError(Throwable error, Session session) {
            System.err.println("Error occurred: " + error.getMessage());
            sessions.remove(session.getId());
            scheduler.shutdown();
        }
    
        private void broadcast(String message) {
            sessions.forEach((sessionId, session) -> {
                session.getAsyncRemote().sendText(message, result -> {
                    if (!result.isOK()) {
                        System.err.println("Error sending message to session " + sessionId + ": " + result.getException().getMessage());
                        // 移除无效会话
                        if (session.isOpen()) {
                            try {
                                session.close(); //尝试关闭会话
                            } catch (IOException ex) {
                                System.err.println("Error closing session " + sessionId + ": " + ex.getMessage());
                            }
                        }
                        sessions.remove(sessionId);
                    }
                });
            });
        }
    }

    客户端需要回复 pong 消息:

    websocket.onmessage = function(event) {
      console.log("Received: " + event.data);
      if (event.data === "ping") {
        websocket.send("pong"); // 回复pong消息
      } else {
        // 处理其他消息
      }
    };
  6. 日志记录: 记录详细的日志信息,包括异常发生的时间、会话 ID、错误消息等。这有助于分析问题和排查错误。

代码示例:综合应用

以下是一个综合应用上述策略的代码示例:

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@ServerEndpoint("/websocket")
public class MyWebSocketEndpoint {

    private static Map<String, Session> sessions = new ConcurrentHashMap<>();
    private static final int MAX_RETRIES = 3;
    private static final long RETRY_INTERVAL = 1000; // 1 second
    private static final long HEARTBEAT_INTERVAL = 60 * 1000; // 60 seconds
    private long lastPongTime = System.currentTimeMillis();
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    @OnOpen
    public void onOpen(Session session) {
        sessions.put(session.getId(), session);
        System.out.println("Client connected: " + session.getId());

        // Schedule a task to check for heartbeats
        scheduler.scheduleAtFixedRate(() -> {
            if (System.currentTimeMillis() - lastPongTime > HEARTBEAT_INTERVAL * 2) {
                System.out.println("Closing session due to heartbeat timeout: " + session.getId());
                try {
                    session.close();
                } catch (IOException e) {
                    System.err.println("Error closing session: " + e.getMessage());
                } finally {
                    sessions.remove(session.getId());
                }
            }else {
                try {
                    session.getAsyncRemote().sendText("ping");
                } catch (Exception e) {
                    System.err.println("Error sending ping to session: " + session.getId() + e.getMessage());
                }
            }
        }, 0, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        if ("ping".equals(message)) {
            // 忽略客户端发送的ping消息,仅用于避免控制台打印
        } else if ("pong".equals(message)) {
            lastPongTime = System.currentTimeMillis(); // 收到客户端的pong消息
            System.out.println("Received pong from client: " + session.getId());

        } else {
            System.out.println("Received message: " + message + " from " + session.getId());
            broadcast(message);
        }

    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session.getId());
        System.out.println("Client disconnected: " + session.getId());
        scheduler.shutdown();
    }

    @OnError
    public void onError(Throwable error, Session session) {
        System.err.println("Error occurred: " + error.getMessage());
        sessions.remove(session.getId());
        scheduler.shutdown();
    }

    private void broadcast(String message) {
        sessions.forEach((sessionId, session) -> {
            sendMessageWithRetry(session, message, MAX_RETRIES, RETRY_INTERVAL);
        });
    }

    private void sendMessageWithRetry(Session session, String message, int maxRetries, long retryInterval) {
        for (int i = 0; i < maxRetries; i++) {
            try {
                session.getAsyncRemote().sendText(message, result -> {
                    if (!result.isOK()) {
                        System.err.println("Error sending message to session " + session.getId() + ": " + result.getException().getMessage());
                        removeInvalidSession(session);
                    }
                });
                return; // 发送成功,退出重试
            } catch (Exception e) {
                System.err.println("Attempt " + (i + 1) + " to send message to session " + session.getId() + " failed: " + e.getMessage());
                try {
                    Thread.sleep(retryInterval);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break; // 中断重试
                }
            }
        }
        System.err.println("Failed to send message to session " + session.getId() + " after " + maxRetries + " retries.");
        removeInvalidSession(session);
    }

    private void removeInvalidSession(Session session) {
        if (session.isOpen()) {
            try {
                session.close();
            } catch (IOException ex) {
                System.err.println("Error closing session " + session.getId() + ": " + ex.getMessage());
            }
        }
        sessions.remove(session.getId());
    }
}

其他注意事项

  • 选择合适的并发集合: 除了 ConcurrentHashMap,还可以使用其他线程安全的集合,例如 CopyOnWriteArrayList。选择哪种集合取决于具体的应用场景。例如,如果读操作远多于写操作,CopyOnWriteArrayList 可能更适合。
  • 会话超时: 可以设置会话超时时间,自动关闭长时间不活动的会话,释放资源。
  • 监控: 监控 WebSocket 连接的数量、消息发送速率、错误率等指标,及时发现和解决问题。

总结关键点

使用 ConcurrentHashMap 可以方便地管理 WebSocket 会话,但在广播消息时需要考虑异常处理。 通过删除无效会话、重试机制、异步发送和心跳检测等策略,可以提高应用的健壮性和可靠性。 选择合适的并发集合、设置会话超时和进行监控也是重要的最佳实践。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注