高并發(fā)!Spring Boot 響應(yīng)式 SSE 實(shí)時(shí)推送,單機(jī)吞吐量10萬+
環(huán)境:SpringBoot3.4.2
1. 簡(jiǎn)介
在如今互聯(lián)網(wǎng)應(yīng)用中,實(shí)時(shí)消息推送的需求日益增長(zhǎng),例如股票行情、在線聊天室消息更新等場(chǎng)景。Spring Boot 響應(yīng)式 SSE(Server-Sent Events)技術(shù)為此提供了高效解決方案。
SSE 允許服務(wù)器通過一個(gè)持久化的 HTTP 連接,將數(shù)據(jù)實(shí)時(shí)推送給客戶端,無需客戶端頻繁輪詢。在 Spring Boot 響應(yīng)式編程模型下,結(jié)合 WebFlux,能充分發(fā)揮 SSE 的優(yōu)勢(shì),輕松應(yīng)對(duì)高并發(fā)場(chǎng)景。
要基于WebFlux 響應(yīng)式技術(shù)通過SSE實(shí)現(xiàn)實(shí)時(shí)廣播消息,我們首先需要了解一個(gè)核心組件Sink。
什么是 Sinks?
Sinks 既是發(fā)布者(publisher),又是訂閱者(subscriber)。多個(gè)數(shù)據(jù)流可以通過一端發(fā)送數(shù)據(jù),而另一端則像一個(gè) Flux,訂閱者可以在其中觀察元素。
Sinks 的關(guān)鍵特性:
- 發(fā)布者(Publisher):該對(duì)象允許使用 emit 或 tryEmit 方法將數(shù)據(jù)推送到數(shù)據(jù)流中。
- 訂閱者(Subscriber):Sink 中的數(shù)據(jù)可以通過 asFlux() 或 asMono() 方法傳播給訂閱者。
Sinks 如何工作?
- 生成數(shù)據(jù):調(diào)用 tryEmitNext()、tryEmitComplete() 或 tryEmitError() 等方法,用于發(fā)送數(shù)據(jù)、完成數(shù)據(jù)流或報(bào)告錯(cuò)誤。
- 消費(fèi)數(shù)據(jù):訂閱者使用 asFlux() 或 asMono() 連接到 Sink,并在數(shù)據(jù)到達(dá)時(shí)接收數(shù)據(jù)。
Sinks 的類型
Sinks 種類繁多,適用于不同的使用場(chǎng)景:
- Sinks.Many:允許發(fā)送多個(gè)元素,有以下幾種選項(xiàng):
unicast() —— 僅向一個(gè)訂閱者提供數(shù)據(jù)
multicast() —— 允許數(shù)據(jù)同時(shí)傳遞給多個(gè)訂閱者
replay() —— 新訂閱者會(huì)接收到在他們訂閱之前發(fā)送的最新元素。
- Sinks.One:用于發(fā)送單個(gè)元素。
如下 Sink 工作原理:
圖片
了解了這最核心的Sink組件后,接下來,我們將通過一個(gè)完整的示例演示。
2.實(shí)戰(zhàn)案例
2.1 準(zhǔn)備環(huán)境
定義接收消息的對(duì)象
public class Message {
private Integer id ;
private String author ;
private String time ;
private String message ;
// getters, setters
}2.2 定義Sink
根據(jù)上面的介紹,定義Sink實(shí)現(xiàn)消息的發(fā)布及訂閱,所有訂閱者都可以通過該Sink獲取實(shí)時(shí)最新的消息。
@Configuration
public class SinkConfig {
@Bean
Sinks.Many<Message> sink() {
return Sinks.many().replay().limit(1) ;
}
}解釋:
Sinks.Many: 這是一個(gè)能夠處理多個(gè) Message 類型元素的接收器(Sink)- replay().limit: 確保新訂閱者在連接時(shí)能夠獲取到已發(fā)布的最后一項(xiàng)數(shù)據(jù)。這對(duì)于那些希望立即獲取最新數(shù)據(jù)的新訂閱者來說非常有意義。
2.3 消息訂閱/發(fā)布
@Service
public class MessageService {
private final Sinks.Many<Message> messageSink ;
public MessageService(Many<Message> messageSink) {
this.messageSink = messageSink;
}
public Mono<Message> saveMessage(Mono<Message> message) {
return message.doOnNext(messageSink::tryEmitNext) ;
}
public Flux<Message> messageStream() {
return messageSink.asFlux() ;
}
}解釋:
- tryEmitNext: 嘗試發(fā)送一個(gè)非空元素,生成一個(gè) onNext 信號(hào)。此次嘗試的結(jié)果會(huì)以 EmitResult 的形式表示,該結(jié)果可能指示出錯(cuò)誤情況。
- asFlux: 返回此 Sink 的一個(gè) Flux 視圖。每次調(diào)用都會(huì)返回同一個(gè)實(shí)例。
2.4 Controller接口
@RestController
@RequestMapping("/messages")
public class MessageController {
private final AtomicInteger count = new AtomicInteger() ;
private final MessageService messageService;
public MessageController(MessageService messageService) {
this.messageService = messageService;
}
@GetMapping("/send")
public Mono<Message> sendMessage(String message) {
String time = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now()) ;
Message msg = new Message(count.incrementAndGet(), "Pack", time, message) ;
return messageService.saveMessage(Mono.just(msg)) ;
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message> messageStream() {
return messageService.messageStream() ;
}
}以上我們就完成了后端接口的所有開發(fā)工作。接下來,我們實(shí)現(xiàn)前端頁(yè)面及功能。
2.5 前端頁(yè)面
HTML部分
<div class="container">
<h1>實(shí)時(shí)消息</h1>
<div class="controls">
<div class="buttons">
<button id="startBtn">實(shí)時(shí)監(jiān)聽消息</button>
<button id="stopBtn" disabled>停止消息監(jiān)聽</button>
</div>
</div>
<table>
<thead>
<tr>
<th>編號(hào)</th>
<th>作者</th>
<th>時(shí)間</th>
<th>內(nèi)容</th>
</tr>
</thead>
<tbody id="messages"></tbody>
</table>
</div>CSS樣式
body {font-family: 'Roboto', sans-serif;margin: 0;background-color: #f5f5f5;display: flex;justify-content: center;align-items: flex-start;padding: 20px;}
.container {width: 90%;max-width: 1000px;margin: 0 auto;text-align: center;}
h1 {font-size: 2rem;font-weight: 500;color: #4285f4;margin-bottom: 20px;}
.controls {display: flex;justify-content: space-between;margin-bottom: 20px;}
.buttons {display: flex;gap: 10px;}
button {padding: 8px 16px;background-color: #4285f4;color: white;border: none;border-radius: 5px;font-size: 1rem;cursor: pointer;}
button:disabled {background-color: #ccc;cursor: not-allowed;}
table {width: 100%;border-collapse: collapse;background-color: white;box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);border-radius: 8px;overflow: hidden;}
th, td {padding: 15px;text-align: left;border-bottom: 1px solid #ddd;}
th {background-color: #f1f1f1;font-weight: 500;color: #333;}
tr:hover {background-color: #f9f9f9;}JavaScript
<script>
let eventSource;
const messageContainer = document.getElementById('messages');
const startBtn = document.getElementById('startBtn');
const stopBtn = document.getElementById('stopBtn');
function startStream() {
const url = `http://localhost:8080/messages/stream`;
eventSource = new EventSource(url);
eventSource.onmessage = event => {
const data = JSON.parse(event.data);
if (data) {
const message = `
<tr>
<td class="${data.type}">${data.id}</td>
<td>${data.author}</td>
<td>${data.time}</td>
<td>${data.message}</td>
</tr>
`;
messageContainer.insertAdjacentHTML('afterbegin', message);
}
};
startBtn.disabled = true;
stopBtn.disabled = false;
}
function stopStream() {
if (eventSource) {
eventSource.close();
startBtn.disabled = false;
stopBtn.disabled = true;
}
}
startBtn.addEventListener('click', startStream);
stopBtn.addEventListener('click', stopStream);
</script>最終頁(yè)面效果如下:
圖片
2.6 測(cè)試
下面最終展示效果:
圖片

































