AI 爆火背后,Spring Boot SSE 推送该怎么学?
2025-04-30 08:12 阅读(9)

最近 AI 爆火,与之相关的技术也成为香饽饽,SSE 推送就是其中之一。SSE,全称 Server-Sent Events,是 HTML5 Web API 的一员。它能让网页低延迟、高效地接收服务器实时更新,通过 HTTP 协议实现服务器主动向客户端推送数据。借助持久化 HTTP 长连接,服务器可以向客户端实时推送数据,不过客户端无法通过 SSE 向服务端回传数据。

很多小伙伴想上手 SSE 推送开发,却不知从何开始。别担心!本文将基于 Spring Boot,带大家实操 SSE 推送。不仅介绍了 Spring Boot 实现 SSE 推送的多种方式,还会兼顾 Spring Boot 2.4 前后版本的差异,助你快速掌握这项热门技术。

SSE推送实战:基于定时器实现


1、引入依赖:在项目中引入 webflux,仅需添加一行代码搞定依赖。

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

2、编写控制器:构建一个定时发送数据的控制器。

  private final AtomicInteger counter = new AtomicInteger(0);

    @GetMapping(path = "/interval/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE+ "; charset=UTF-8")
    public Flux<ServerSentEvent<Integer>> streamSseMvc() {
        return Flux.interval(Duration.ofSeconds(1))
               .map(seq -> ServerSentEvent.<Integer>builder()
                       .data(counter.incrementAndGet())
                       .build()).takeUntil(event -> event.data() > 10).doOnComplete(() -> log.info("complete"));
    }

这段代码实现了每秒发送一次数据,当发送数据值大于11 时停止。


3、搭建 SSE 客户端页面:设计一个简单的 HTML 页面接收数据。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE Example</title>
</head>
<body>
    <div id="messages"></div>
    <script>
        const eventSource = new EventSource('/sse/interval/stream');
        eventSource.onmessage = function (event) {
            const messagesDiv = document.getElementById('messages');
            const newMessage = document.createElement('p');
            newMessage.textContent = `Received: ${event.data}`;
            messagesDiv.appendChild(newMessage);
        };
        eventSource.onerror = function (error) {
            console.error('EventSource failed:', error);
            eventSource.close();
        };
    </script>
</body>
</html>    

4、测试


浏览器访问客户端页面,会发现页面打印如下内容

Received: 1

Received: 2

Received: 3

Received: 4

Received: 5

Received: 6

Received: 7

Received: 8

Received: 9

Received: 10

Received: 11

看到这个内容,不知道大家会不会有疑问,为啥页面还会出现11,这主要是因为 takeUntil 操作符的工作机制。takeUntil 操作符是在满足指定条件后才停止发出元素,不过它会包含最后一个满足条件的元素。

具体而言,当 counter 的值为 10 时,下一次递增后变为 11,此时 event.data() > 10 条件成立,Flux 停止发出新的元素,但 11 这个元素已经被生成并发出了,所以客户端会接收到值为 11 的事件。

若不想让页面输出 11,可以把 takeUntil 的条件改为 event.data() >= 10,这样当 counter 的值达到 10 时,Flux 就会停止发出新的元素

更灵活的实现:基于发布订阅模式

定时器实现虽简单,但触发时间点不好控制。发布订阅模式则更灵活,生产者发送消息,消费者接收消息。

Spring Boot 2.4 之前版本

核心主要利用FluxProcessor,它有好几种实现类,我们常用大致有3种,UnicastProcessor、DirectProcessor、ReplayProcessor,这三种的实现大同小异,其主要区别在于


UnicastProcessor:适用于仅需一个订阅者的场景,像单个客户端接收实时数据更新的情况。

DirectProcessor:适合有多个订阅者的场景,比如多个客户端都要接收相同的实时消息。

ReplayProcessor:适合需要向新订阅者重播之前消息的场景,例如新客户端连接时需要获取历史消息。


本文就以DirectProcessor来讲解


1、创建发布订阅管理类:借助DirectProcessor实现消息发布与订阅。

public class DirectProcessorSsePublisherService implements SsePublisherService {

    private final DirectProcessor<String> processor = DirectProcessor.create();
    private final Flux<String> flux = processor.replay().autoConnect();

    @Override
    public Flux<String> getMessages() {
        return flux;
    }

    @Override
    public void publishMessage(String message) {
        processor.onNext(message);
    }

    @Override
    public void complete() {
        processor.onComplete();
    }
}

DirectProcessorSsePublisherService类的主要功能是作为一个消息发布服务,借助DirectProcessor和Flux来实现响应式消息的发布与订阅。外部类可以通过getMessages方法订阅消息流,通过publishMessage方法发布新消息,通过complete方法结束消息流。


2、设置定时器生产数据:定时生成数据并发布。

@Component
public class ProduceDataTask {

    @Autowired
    private SsePublisherService ssePublisherService;

    private final AtomicInteger counter = new AtomicInteger(0);


    @Scheduled(fixedRate = 5000)
    public void run(){
        int num = counter.incrementAndGet();
        System.out.println("num = " + num);
        ssePublisherService.publishMessage("hello-" + num);
        if (num > 10) {
            ssePublisherService.complete();
        }

    }
}

ProduceDataTask 类的主要功能是每隔 5 秒生成一个递增的整数,并将包含该整数的消息通过 SsePublisherService 发布出去,当生成的整数大于 10 时,结束消息发布。


3、编写消费控制器:接收并处理发布的消息。

@RestController
@RequestMapping("/sse")
@Slf4j
public class SseController {

    @Autowired
    private SsePublisherService ssePublisherService;

 


    @GetMapping(path = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> subscribe() {
        return ssePublisherService.getMessages()
                .map(message -> ServerSentEvent.<String>builder()
                        .data(message)
                        .build());
    }
    }

4、搭建 SSE 客户端页面:设计一个简单的 HTML 页面接收数据。


页面和方式一的页面雷同,就不贴代码了


5、测试


浏览器访问客户端页面,会发现页面打印如下内容

Received: hello-1

Received: hello-2

Received: hello-3

Received: hello-4

Received: hello-5

Received: hello-6

Received: hello-7

Received: hello-8

Received: hello-9

Received: hello-10

Spring Boot 2.4 及之后版本

1、使用 Sinks 管理发布订阅:核心类换成Sinks。

@Service
public class SinksSsePublisherService implements SsePublisherService {

    private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Flux<String> getMessages() {
        return sink.asFlux();
    }

    @Override
    public void publishMessage(String message) {
        sink.tryEmitNext(message);
    }

    @Override
    public void complete() {
        sink.tryEmitComplete();
    }
}

SinksSsePublisherService 类的主要功能是作为一个消息发布服务,使用 Sinks 来管理消息的发布和订阅。外部代码可以通过 getMessages 方法订阅消息流,通过 publishMessage 方法发布新消息,通过 complete 方法结束消息流。同时,使用 onBackpressureBuffer() 策略来处理背压,确保在订阅者处理速度较慢时数据不会丢失。


2、生产数据与消费控制器:定时器和消费控制器代码与低版本相同。



3、测试:同样能看到推送的消息。


作者:linyb极客之路

链接:https://juejin.cn


https://www.zuocode.com