公司中有通过websocket实现添加用户、公司成功后的自动通知,还有一些关于流程通过后让用户及时处理的通知。类似这些功能,公司项目中使用了基于websocket技术的长链接的通信。 公司项目中的实现思路主要是通过长链接实时监听前端调用接口的情况,一旦接口调用成功,立即触发websocket中的handleTextMessage方法,里面大都是一些复杂的业务逻辑。这里还有一种使用@OnMessage注解的使用websocket的方法,但是还是感觉直接使用继承类的方式比较容易控制。关于两种方式的对比可以参考:
传统@ServerEndpoint方式开发WebSocket应用和SpringBoot构建WebSocket应用程序-CSDN博客
下面是根据公司项目实现功能的思路加入消息队列实现对分布式websocket的支持。
环境准备
rocketmq 4.9.2、rocketmq-dashboard、mongdb 5.2、maven 3.8.6、jdk8
代码实现
1、引入pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>
<!-- 有冲突删除-->
<!-- <dependency>-->
<!-- <groupId>org.apache.rocketmq</groupId>-->
<!-- <artifactId>rocketmq-client</artifactId>-->
<!-- <version>4.6.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2、配置文件修改
server.port=8090
# rocketmq
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=rocketmq-websocket-group
# mongodb
spring.data.mongodb.uri=mongodb://localhost:27017/test
3、实体类创建
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "message")
@Builder
public class Message {
// 指定序列化方式为字符串
@JsonSerialize(using = ToStringSerializer.class)
private ObjectId id;
private String msg;
/**
* 消息状态
*/
@Indexed
private Integer status;
@Field("send_date")
@Indexed
private Date sendDate;
@Field("read_date")
private Date readDate;
@Indexed
private User from;
@Indexed
private User to;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class User {
private Long id;
private String username;
}
/**
* 模拟存储登录用户
*/
public class UserData {
public static final Map<Long,User> USER_MAP = new HashMap<>();
static {
USER_MAP.put(1001L, User.builder().id(1001L).username("zhangsan").build());
USER_MAP.put(1002L, User.builder().id(1002L).username("lisi").build());
USER_MAP.put(1003L, User.builder().id(1003L).username("wangwu").build());
USER_MAP.put(1004L, User.builder().id(1004L).username("zhaoliu").build());
USER_MAP.put(1005L, User.builder().id(1005L).username("sunqi").build());
}
}
4、mongodb数据操作
public interface MessageDao {
Message saveMessage(Message message);
UpdateResult updateMessageState(ObjectId id, Integer state);
}
@Service
public class MessageMongoService implements MessageDao {
@Resource
private MongoTemplate mongoTemplate;
@Override
public Message saveMessage(Message message) {
return mongoTemplate.save(message);
}
@Override
public UpdateResult updateMessageState(ObjectId id, Integer status) {
Query query = Query.query(Criteria.where("id").is(id));
Update update = Update.update("status", status);
if (status == 1) {
update.set("send_date", new Date());
} else if (status == 2) {
update.set("read_date", new Date());
}
return mongoTemplate.updateFirst(query, update, Message.class);
}
}
5、websocket配置类实现
@Configuration
@EnableWebSocket
public class WebsocketConfig implements WebSocketConfigurer {
@Resource
private MessageHandler messageHandler;
@Resource
private MessageHandshakeInterceptor messageHandshakeInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(messageHandler, "/ws/*") // 设置处理和路径
.setAllowedOrigins("*")
.addInterceptors(messageHandshakeInterceptor);// 设置拦截器
}
}
// 实现拦截器,拦截器可以实现前置和后置处理,这里还是挺方便的
@Component
public class MessageHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String path = request.getURI().getPath();
String[] ss = StringUtils.split(path, "/");
if (ss.length != 2) {
return false;
}
// 是否为数字
if (!StringUtils.isNumeric(ss[1])) {
return false;
}
attributes.put("uid", Long.valueOf(ss[1]));
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
6、重点:实现消息处理类
/**
* @author Ash
* @date 2024/6/24 16:50
* @description: 信息处理类 rocketmq是后期加入的,原本只实现的是TextWebSocketHandler
* 加入mq之后解决分布式的问题
*/
@Component
@RocketMQMessageListener(topic = "rocketmq-websocket-topic",// topic
consumerGroup = "message-consumer-group", // group
selectorExpression = "SEND_MSG",// tag
messageModel = MessageModel.BROADCASTING // 广播
)
@Slf4j
public class MessageHandler extends TextWebSocketHandler implements RocketMQListener<String> {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private MessageDao messageDao;
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Map<Long, WebSocketSession> SESSIONS = new HashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Long uid = (Long) session.getAttributes().get("uid");
// 将当前用户放到map中
SESSIONS.put(uid, session);
log.info("建立连接并存入uid:{}", uid);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage textMessage) throws Exception {
log.info("开始处理消息。。。。");
Long uid = (Long) session.getAttributes().get("uid");
JsonNode jsonNode = MAPPER.readTree(textMessage.getPayload());
long toId = jsonNode.get("toId").asLong();
String msg = jsonNode.get("msg").asText();
Message message = Message.builder()
.from(UserData.USER_MAP.get(uid))
.to(UserData.USER_MAP.get(toId))
.msg(msg)
.build();
// 将消息保存到mongodb
message = messageDao.saveMessage(message);
String msgJson = MAPPER.writeValueAsString(message);
// 判断用户是否在线
WebSocketSession toSession = SESSIONS.get(toId);
if (toSession != null && toSession.isOpen()) {
// todo 需要和前端商定
toSession.sendMessage(new TextMessage(msgJson));
// 更新消息状态为已读
messageDao.updateMessageState(message.getId(), 2);
} else {
// 剩下的是以离线或者或可能在其他节点 放入mq中
rocketMQTemplate.convertAndSend("rocketmq-websocket-topic:SEND_MSG", msgJson);
}
}
/**
* 监听到消息后执行
* todo 这里还需要考虑防止消息丢失,消息重复提交等情况
* @param message
*/
@Override
public void onMessage(String message) {
try {
JsonNode jsonNode = MAPPER.readTree(message);
long toId = jsonNode.get("to").get("id").asLong();
WebSocketSession socketSession = SESSIONS.get(toId);
// 判断是否在线
if (socketSession != null && socketSession.isOpen()) {
// todo 需要和前端商定
socketSession.sendMessage(new TextMessage(message));
messageDao.updateMessageState(new ObjectId(jsonNode.get("id").asText()), 2);
} else {
// 不做处理
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
测试
1、打开idea中允许项目多开的功能,模拟多个不同的服务;
2、使用websocket客户端测试工具分别连接不同的服务进行测试WebSocket在线测试工具 (wstool.js.org);