基于RocketMQ实现分布式WebSocket通信
2024-09-07 09:21 阅读(351)

公司中有通过websocket实现添加用户、公司成功后的自动通知,还有一些关于流程通过后让用户及时处理的通知。类似这些功能,公司项目中使用了基于websocket技术的长链接的通信。 公司项目中的实现思路主要是通过长链接实时监听前端调用接口的情况,一旦接口调用成功,立即触发websocket中的handleTextMessage方法,里面大都是一些复杂的业务逻辑。这里还有一种使用@OnMessage注解的使用websocket的方法,但是还是感觉直接使用继承类的方式比较容易控制。关于两种方式的对比可以参考:

传统@ServerEndpoint方式开发WebSocket应用和SpringBoot构建WebSocket应用程序-CSDN博客

下面是根据公司项目实现功能的思路加入消息队列实现对分布式websocket的支持。

环境准备

rocketmq 4.9.2、rocketmq-dashboard、mongdb 5.2、maven 3.8.6、jdk8

代码实现

1、引入pom依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>2.0.32</version>
</dependency>
​
<!--        有冲突删除-->
<!--        <dependency>-->
<!--            <groupId>org.apache.rocketmq</groupId>-->
<!--            <artifactId>rocketmq-client</artifactId>-->
<!--            <version>4.6.0</version>-->
<!--        </dependency>-->
​
​
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
​
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
​
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
​
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2、配置文件修改

server.port=8090  
  
# rocketmq  
rocketmq.name-server=127.0.0.1:9876  
rocketmq.producer.group=rocketmq-websocket-group  

# mongodb  
spring.data.mongodb.uri=mongodb://localhost:27017/test

3、实体类创建

@Data  
@AllArgsConstructor  
@NoArgsConstructor  
@Document(collection = "message")  
@Builder  
public class Message {  
  
    // 指定序列化方式为字符串  
    @JsonSerialize(using = ToStringSerializer.class)  
    private ObjectId id;  
    private String msg;  
  
    /**  
     * 消息状态  
     */  
    @Indexed  
    private Integer status;  
  
    @Field("send_date")  
    @Indexed  
    private Date sendDate;  
  
    @Field("read_date")  
    private Date readDate;  
  
    @Indexed  
    private User from;  
  
    @Indexed  
    private User to;  
}
@Data  
@AllArgsConstructor  
@NoArgsConstructor  
@Builder  
public class User {  
  
    private Long id;  
    private String username;  
  
}
/**
* 模拟存储登录用户
*/
public class UserData {  
    public static final Map<Long,User> USER_MAP = new HashMap<>();  
  
    static {  
        USER_MAP.put(1001L, User.builder().id(1001L).username("zhangsan").build());  
        USER_MAP.put(1002L, User.builder().id(1002L).username("lisi").build());  
        USER_MAP.put(1003L, User.builder().id(1003L).username("wangwu").build());  
        USER_MAP.put(1004L, User.builder().id(1004L).username("zhaoliu").build());  
        USER_MAP.put(1005L, User.builder().id(1005L).username("sunqi").build());  
    }  
}

4、mongodb数据操作

public interface MessageDao {  
  
    Message saveMessage(Message message);  
  
    UpdateResult updateMessageState(ObjectId id, Integer state);  
}
@Service  
public class MessageMongoService implements MessageDao {  
  
    @Resource  
    private MongoTemplate mongoTemplate;  
  
    @Override  
    public Message saveMessage(Message message) {  
        return mongoTemplate.save(message);  
    }  
  
    @Override  
    public UpdateResult updateMessageState(ObjectId id, Integer status) {  
        Query query = Query.query(Criteria.where("id").is(id));  
        Update update = Update.update("status", status);  
        if (status == 1) {  
            update.set("send_date", new Date());  
        } else if (status == 2) {  
            update.set("read_date", new Date());  
        }  
  
       return mongoTemplate.updateFirst(query, update, Message.class);  
    }  
}

5、websocket配置类实现

@Configuration  
@EnableWebSocket  
public class WebsocketConfig implements WebSocketConfigurer {  
  
    @Resource  
    private MessageHandler messageHandler;  
    @Resource  
    private MessageHandshakeInterceptor messageHandshakeInterceptor;  
  
    @Override  
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {  
        registry.addHandler(messageHandler, "/ws/*") // 设置处理和路径  
                .setAllowedOrigins("*")  
                .addInterceptors(messageHandshakeInterceptor);// 设置拦截器  
    }  
  
}
// 实现拦截器,拦截器可以实现前置和后置处理,这里还是挺方便的
@Component  
public class MessageHandshakeInterceptor implements HandshakeInterceptor {  
    @Override  
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {  
        String path = request.getURI().getPath();  
        String[] ss = StringUtils.split(path, "/");  
        if (ss.length != 2) {  
            return false;  
        }  
        // 是否为数字  
        if (!StringUtils.isNumeric(ss[1])) {  
            return false;  
        }  
        attributes.put("uid", Long.valueOf(ss[1]));  
        return true;  
    }  
  
    @Override  
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {  
  
    }
}

6、重点:实现消息处理类

/**  
 * @author Ash  
 * @date 2024/6/24 16:50  
 * @description: 信息处理类 rocketmq是后期加入的,原本只实现的是TextWebSocketHandler  
 * 加入mq之后解决分布式的问题  
 */  
@Component  
@RocketMQMessageListener(topic = "rocketmq-websocket-topic",// topic  
        consumerGroup = "message-consumer-group", // group  
        selectorExpression = "SEND_MSG",// tag  
        messageModel = MessageModel.BROADCASTING // 广播  
)  
@Slf4j  
public class MessageHandler extends TextWebSocketHandler implements RocketMQListener<String> {  
  
    @Resource  
    private RocketMQTemplate rocketMQTemplate;  
  
    @Resource  
    private MessageDao messageDao;  
  
    private static final ObjectMapper MAPPER = new ObjectMapper();  
  
    private static final Map<Long, WebSocketSession> SESSIONS = new HashMap<>();  
  
  
    @Override  
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {  
        Long uid = (Long) session.getAttributes().get("uid");  
        // 将当前用户放到map中  
        SESSIONS.put(uid, session);  
        log.info("建立连接并存入uid:{}", uid);  
    }  
  
    @Override  
    protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) throws Exception {  
        log.info("开始处理消息。。。。");  
        Long uid = (Long) session.getAttributes().get("uid");  
  
        JsonNode jsonNode = MAPPER.readTree(textMessage.getPayload());  
        long toId = jsonNode.get("toId").asLong();  
        String msg = jsonNode.get("msg").asText();  
  
        Message message = Message.builder()  
                .from(UserData.USER_MAP.get(uid))  
                .to(UserData.USER_MAP.get(toId))  
                .msg(msg)  
                .build();  
  
        // 将消息保存到mongodb  
        message = messageDao.saveMessage(message);  
  
        String msgJson = MAPPER.writeValueAsString(message);  
  
        // 判断用户是否在线  
        WebSocketSession toSession = SESSIONS.get(toId);  
        if (toSession != null && toSession.isOpen()) {  
            // todo 需要和前端商定  
            toSession.sendMessage(new TextMessage(msgJson));  
            // 更新消息状态为已读  
            messageDao.updateMessageState(message.getId(), 2);  
        } else {  
            // 剩下的是以离线或者或可能在其他节点 放入mq中  
            rocketMQTemplate.convertAndSend("rocketmq-websocket-topic:SEND_MSG", msgJson);  
        }  
  
    }  
  
    /**  
     * 监听到消息后执行  
     * todo 这里还需要考虑防止消息丢失,消息重复提交等情况  
     * @param message  
     */  
    @Override  
    public void onMessage(String message) {  
        try {  
            JsonNode jsonNode = MAPPER.readTree(message);  
            long toId = jsonNode.get("to").get("id").asLong();  
  
            WebSocketSession socketSession = SESSIONS.get(toId);  
            // 判断是否在线  
            if (socketSession != null && socketSession.isOpen()) {  
                // todo 需要和前端商定  
                socketSession.sendMessage(new TextMessage(message));  
                messageDao.updateMessageState(new ObjectId(jsonNode.get("id").asText()), 2);  
            } else {  
                // 不做处理  
            }  
        }catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}

测试

1、打开idea中允许项目多开的功能,模拟多个不同的服务;

2、使用websocket客户端测试工具分别连接不同的服务进行测试WebSocket在线测试工具 (wstool.js.org);