最近 AI 爆火,与之相关的技术也成为香饽饽,SSE 推送就是其中之一。SSE,全称 Server-Sent Events,是 HTML5 Web API 的一员。它能让网页低延迟、高效地接收服务器实时更新,通过 HTTP 协议实现服务器主动向客户端推送数据。借助持久化 HTTP 长连接,服务器可以向客户端实时推送数据,不过客户端无法通过 SSE 向服务端回传数据。
很多小伙伴想上手 SSE 推送开发,却不知从何开始。别担心!本文将基于 Spring Boot,带大家实操 SSE 推送。不仅介绍了 Spring Boot 实现 SSE 推送的多种方式,还会兼顾 Spring Boot 2.4 前后版本的差异,助你快速掌握这项热门技术。
SSE推送实战:基于定时器实现
1、引入依赖:在项目中引入 webflux,仅需添加一行代码搞定依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2、编写控制器:构建一个定时发送数据的控制器。
private final AtomicInteger counter = new AtomicInteger(0);
@GetMapping(path = "/interval/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE+ "; charset=UTF-8")
public Flux<ServerSentEvent<Integer>> streamSseMvc() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<Integer>builder()
.data(counter.incrementAndGet())
.build()).takeUntil(event -> event.data() > 10).doOnComplete(() -> log.info("complete"));
}
这段代码实现了每秒发送一次数据,当发送数据值大于11 时停止。
3、搭建 SSE 客户端页面:设计一个简单的 HTML 页面接收数据。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE Example</title>
</head>
<body>
<div id="messages"></div>
<script>
const eventSource = new EventSource('/sse/interval/stream');
eventSource.onmessage = function (event) {
const messagesDiv = document.getElementById('messages');
const newMessage = document.createElement('p');
newMessage.textContent = `Received: ${event.data}`;
messagesDiv.appendChild(newMessage);
};
eventSource.onerror = function (error) {
console.error('EventSource failed:', error);
eventSource.close();
};
</script>
</body>
</html>
4、测试
浏览器访问客户端页面,会发现页面打印如下内容
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9
Received: 10
Received: 11
看到这个内容,不知道大家会不会有疑问,为啥页面还会出现11,这主要是因为 takeUntil 操作符的工作机制。takeUntil 操作符是在满足指定条件后才停止发出元素,不过它会包含最后一个满足条件的元素。
具体而言,当 counter 的值为 10 时,下一次递增后变为 11,此时 event.data() > 10 条件成立,Flux 停止发出新的元素,但 11 这个元素已经被生成并发出了,所以客户端会接收到值为 11 的事件。
若不想让页面输出 11,可以把 takeUntil 的条件改为 event.data() >= 10,这样当 counter 的值达到 10 时,Flux 就会停止发出新的元素
更灵活的实现:基于发布订阅模式
定时器实现虽简单,但触发时间点不好控制。发布订阅模式则更灵活,生产者发送消息,消费者接收消息。
Spring Boot 2.4 之前版本
核心主要利用FluxProcessor,它有好几种实现类,我们常用大致有3种,UnicastProcessor、DirectProcessor、ReplayProcessor,这三种的实现大同小异,其主要区别在于
UnicastProcessor:适用于仅需一个订阅者的场景,像单个客户端接收实时数据更新的情况。
DirectProcessor:适合有多个订阅者的场景,比如多个客户端都要接收相同的实时消息。
ReplayProcessor:适合需要向新订阅者重播之前消息的场景,例如新客户端连接时需要获取历史消息。
本文就以DirectProcessor来讲解
1、创建发布订阅管理类:借助DirectProcessor实现消息发布与订阅。
public class DirectProcessorSsePublisherService implements SsePublisherService {
private final DirectProcessor<String> processor = DirectProcessor.create();
private final Flux<String> flux = processor.replay().autoConnect();
@Override
public Flux<String> getMessages() {
return flux;
}
@Override
public void publishMessage(String message) {
processor.onNext(message);
}
@Override
public void complete() {
processor.onComplete();
}
}
DirectProcessorSsePublisherService类的主要功能是作为一个消息发布服务,借助DirectProcessor和Flux来实现响应式消息的发布与订阅。外部类可以通过getMessages方法订阅消息流,通过publishMessage方法发布新消息,通过complete方法结束消息流。
2、设置定时器生产数据:定时生成数据并发布。
@Component
public class ProduceDataTask {
@Autowired
private SsePublisherService ssePublisherService;
private final AtomicInteger counter = new AtomicInteger(0);
@Scheduled(fixedRate = 5000)
public void run(){
int num = counter.incrementAndGet();
System.out.println("num = " + num);
ssePublisherService.publishMessage("hello-" + num);
if (num > 10) {
ssePublisherService.complete();
}
}
}
ProduceDataTask 类的主要功能是每隔 5 秒生成一个递增的整数,并将包含该整数的消息通过 SsePublisherService 发布出去,当生成的整数大于 10 时,结束消息发布。
3、编写消费控制器:接收并处理发布的消息。
@RestController
@RequestMapping("/sse")
@Slf4j
public class SseController {
@Autowired
private SsePublisherService ssePublisherService;
@GetMapping(path = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> subscribe() {
return ssePublisherService.getMessages()
.map(message -> ServerSentEvent.<String>builder()
.data(message)
.build());
}
}
4、搭建 SSE 客户端页面:设计一个简单的 HTML 页面接收数据。
页面和方式一的页面雷同,就不贴代码了
5、测试
浏览器访问客户端页面,会发现页面打印如下内容
Received: hello-1
Received: hello-2
Received: hello-3
Received: hello-4
Received: hello-5
Received: hello-6
Received: hello-7
Received: hello-8
Received: hello-9
Received: hello-10
Spring Boot 2.4 及之后版本
1、使用 Sinks 管理发布订阅:核心类换成Sinks。
@Service
public class SinksSsePublisherService implements SsePublisherService {
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
@Override
public Flux<String> getMessages() {
return sink.asFlux();
}
@Override
public void publishMessage(String message) {
sink.tryEmitNext(message);
}
@Override
public void complete() {
sink.tryEmitComplete();
}
}
SinksSsePublisherService 类的主要功能是作为一个消息发布服务,使用 Sinks 来管理消息的发布和订阅。外部代码可以通过 getMessages 方法订阅消息流,通过 publishMessage 方法发布新消息,通过 complete 方法结束消息流。同时,使用 onBackpressureBuffer() 策略来处理背压,确保在订阅者处理速度较慢时数据不会丢失。
2、生产数据与消费控制器:定时器和消费控制器代码与低版本相同。
3、测试:同样能看到推送的消息。
作者:linyb极客之路
链接:https://juejin.cn