HTTP 輪詢 vs MQTT:SpringBoot 通信實(shí)踐
引言
在實(shí)時(shí)通信場(chǎng)景中,消息傳遞的效率、可靠性與資源占用一直是開(kāi)發(fā)者關(guān)注的核心。從早期的HTTP輪詢到如今廣泛應(yīng)用的MQTT協(xié)議,技術(shù)方案的演進(jìn)始終圍繞更高效地實(shí)現(xiàn)端到端通信這一目標(biāo)展開(kāi)。
技術(shù)演進(jìn):為什么從 HTTP 輪詢走向 MQTT?
HTTP 輪詢:簡(jiǎn)單但低效的被動(dòng)通信
HTTP 協(xié)議作為互聯(lián)網(wǎng)的基礎(chǔ)協(xié)議,基于請(qǐng)求 - 響應(yīng)模型設(shè)計(jì),天然適合客戶端主動(dòng)發(fā)起請(qǐng)求、服務(wù)端被動(dòng)返回?cái)?shù)據(jù)的場(chǎng)景。但在實(shí)時(shí)通信(如即時(shí)聊天、設(shè)備狀態(tài)監(jiān)控、消息推送)中,為了獲取實(shí)時(shí)更新的數(shù)據(jù),開(kāi)發(fā)者不得不采用輪詢方案,常見(jiàn)的實(shí)現(xiàn)方式有兩種:
(1)普通輪詢(Polling)
客戶端按照固定時(shí)間間隔(如1秒、5秒)向服務(wù)端發(fā)送HTTP請(qǐng)求,查詢是否有新數(shù)據(jù);服務(wù)端無(wú)論是否有數(shù)據(jù),都會(huì)立即返回響應(yīng)。
- 核心問(wèn)題:
資源浪費(fèi)嚴(yán)重:大部分請(qǐng)求是無(wú)效請(qǐng)求(服務(wù)端無(wú)新數(shù)據(jù)),卻占用了網(wǎng)絡(luò)帶寬、服務(wù)端連接數(shù)與 CPU 資源;
實(shí)時(shí)性差:數(shù)據(jù)更新的延遲等于輪詢間隔(如5秒輪詢,延遲最高可達(dá)5秒),無(wú)法滿足低延遲場(chǎng)景需求。
(2)長(zhǎng)輪詢(Long Polling)
為優(yōu)化普通輪詢的資源浪費(fèi)問(wèn)題,長(zhǎng)輪詢對(duì)邏輯進(jìn)行了調(diào)整:客戶端發(fā)送請(qǐng)求后,服務(wù)端不會(huì)立即返回響應(yīng),而是掛起請(qǐng)求(通常設(shè)置超時(shí)時(shí)間,如30秒);若期間有新數(shù)據(jù),服務(wù)端立即返回響應(yīng);客戶端收到響應(yīng)后,立即發(fā)起下一次長(zhǎng)輪詢。
- 核心問(wèn)題:
連接占用時(shí)間長(zhǎng):服務(wù)端需要維護(hù)大量掛起的HTTP連接,在高并發(fā)場(chǎng)景下會(huì)消耗大量?jī)?nèi)存與線程資源;
協(xié)議開(kāi)銷大:HTTP請(qǐng)求頭(如Cookie、User-Agent)通常占整個(gè)請(qǐng)求體積的70%以上,即使僅傳遞少量數(shù)據(jù),也需要攜帶完整的請(qǐng)求頭,帶寬利用率低;
不支持多對(duì)多通信:HTTP輪詢本質(zhì)是客戶端 - 服務(wù)端的點(diǎn)對(duì)點(diǎn)通信,無(wú)法直接實(shí)現(xiàn)設(shè)備間、客戶端間的消息轉(zhuǎn)發(fā)。
MQTT:為實(shí)時(shí)、低耗通信而生
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸)是1999年誕生的輕量級(jí)發(fā)布 / 訂閱(Pub/Sub)協(xié)議,最初用于石油管道監(jiān)控場(chǎng)景,如今已成為物聯(lián)網(wǎng)(IoT)、實(shí)時(shí)消息推送的主流協(xié)議。其設(shè)計(jì)目標(biāo)是在帶寬有限、網(wǎng)絡(luò)不穩(wěn)定的環(huán)境下,實(shí)現(xiàn)可靠的低功耗通信,核心特性完美解決了HTTP輪詢的痛點(diǎn):
特性 | 說(shuō)明 |
發(fā)布 / 訂閱模型 | 客戶端(發(fā)布者)不直接與接收者(訂閱者)通信,而是通過(guò) “主題(Topic)” 轉(zhuǎn)發(fā)消息,支持多對(duì)多通信; |
輕量級(jí)協(xié)議 | 協(xié)議頭最小僅 2 字節(jié),遠(yuǎn)低于 HTTP 的幾十 KB,帶寬利用率極高; |
持久化連接 | 客戶端與服務(wù)端建立一次 TCP 連接后,可長(zhǎng)期復(fù)用,無(wú)需頻繁建立連接,減少資源消耗; |
QoS 服務(wù)質(zhì)量 | 支持 3 級(jí)消息可靠性:QoS 0(最多一次)、QoS 1(至少一次)、QoS 2(恰好一次); |
斷開(kāi)重連與遺囑 | 客戶端異常斷開(kāi)時(shí),服務(wù)端可自動(dòng)觸發(fā)遺囑消息(Last Will and Testament),通知其他訂閱者; |
消息保留 | 服務(wù)端可保留某個(gè)主題的最新消息,新訂閱者上線后可直接獲取該消息,無(wú)需等待發(fā)布者再次發(fā)送; |
實(shí)踐案例
核心組件:
- MQTT服務(wù)端(Broker):負(fù)責(zé)接收、存儲(chǔ)、轉(zhuǎn)發(fā)消息,常見(jiàn)實(shí)現(xiàn)有Eclipse Mosquitto(開(kāi)源輕量)、EMQX(企業(yè)級(jí))、AWS IoT Core等;
- MQTT客戶端:SpringBoot應(yīng)用作為客戶端,實(shí)現(xiàn)發(fā)布消息與訂閱消息功能,常用客戶端庫(kù)為Eclipse Paho。
集成 MQTT 客戶端
引入依賴
<!-- MQTT客戶端 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>配置 MQTT 連接參數(shù)
spring:
mqtt:
# MQTT服務(wù)端地址(tcp://ip:端口)
broker-url: tcp://localhost:1883
# 客戶端ID(必須唯一,建議添加隨機(jī)后綴避免沖突)
client-id: springboot-mqtt-client-${random.uuid}
# 用戶名(Mosquitto默認(rèn)無(wú)密碼,若配置了認(rèn)證需填寫(xiě))
username:
# 密碼
password:
# 默認(rèn)訂閱的主題(可配置多個(gè),用逗號(hào)分隔)
default-topics: test/topic, device/status
# QoS級(jí)別(0/1/2)
qos: 1
# 是否自動(dòng)重連
automatic-reconnect: true
# 連接超時(shí)時(shí)間(毫秒)
connection-timeout: 3000
# 保持連接心跳時(shí)間(秒)
keep-alive-interval: 60編寫(xiě) MQTT 配置類:初始化客戶端
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt") // 綁定application.yml中的配置
@Data
public class MqttConfig {
private String brokerUrl;
private String clientId;
private String username;
private String password;
private String[] defaultTopics;
private int qos;
private boolean automaticReconnect;
private int connectionTimeout;
private int keepAliveInterval;
/**
* 初始化MQTT客戶端
*/
@Bean
public MqttClient mqttClient() throws MqttException {
// 1. 創(chuàng)建連接選項(xiàng)
MqttConnectOptions options = new MqttConnectOptions();
// 設(shè)置用戶名密碼(若服務(wù)端無(wú)認(rèn)證,可省略)
if (username != null && !username.isEmpty()) {
options.setUserName(username);
}
if (password != null && !password.isEmpty()) {
options.setPassword(password.toCharArray());
}
// 設(shè)置自動(dòng)重連、連接超時(shí)、心跳時(shí)間
options.setAutomaticReconnect(automaticReconnect);
options.setConnectionTimeout(connectionTimeout);
options.setKeepAliveInterval(keepAliveInterval);
// 禁用“清除會(huì)話”(確保斷開(kāi)重連后,未接收的消息能繼續(xù)接收)
options.setCleanSession(false);
// 2. 創(chuàng)建MqttClient實(shí)例(MemoryPersistence表示消息持久化到內(nèi)存)
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
// 3. 設(shè)置客戶端回調(diào)(處理連接成功、消息到達(dá)、連接丟失等事件)
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
// 連接丟失時(shí)觸發(fā)(可在這里實(shí)現(xiàn)重連邏輯,不過(guò)options已配置自動(dòng)重連)
System.out.println("MQTT連接丟失:" + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 收到訂閱的消息時(shí)觸發(fā)
System.out.println("收到MQTT消息:");
System.out.println("主題:" + topic);
System.out.println("內(nèi)容:" + new String(message.getPayload()));
System.out.println("QoS級(jí)別:" + message.getQos());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// 消息發(fā)布完成時(shí)觸發(fā)(僅QoS>0時(shí)有效)
try {
if (token.isComplete()) {
System.out.println("消息發(fā)布成功:" + token.getMessageId());
}
} catch (MqttException e) {
e.printStackTrace();
}
}
});
// 4. 連接服務(wù)端并訂閱默認(rèn)主題
client.connect(options);
if (defaultTopics != null && defaultTopics.length > 0) {
// 訂閱多個(gè)主題(第二個(gè)參數(shù)為QoS數(shù)組,與主題數(shù)組一一對(duì)應(yīng))
int[] qosArray = new int[defaultTopics.length];
for (int i = 0; i < defaultTopics.length; i++) {
qosArray[i] = qos;
}
client.subscribe(defaultTopics, qosArray);
System.out.println("MQTT連接成功,已訂閱主題:" + String.join(",", defaultTopics));
}
return client;
}
}編寫(xiě) MQTT 工具類:封裝發(fā)布 / 訂閱方法
@Component
public class MqttUtil {
@Resource
private MqttClient mqttClient;
/**
* 發(fā)布消息
* @param topic 主題
* @param payload 消息內(nèi)容
* @param qos QoS級(jí)別(0/1/2)
* @param retained 是否保留消息(true:服務(wù)端保留最新消息,新訂閱者可獲取)
*/
public void publish(String topic, String payload, int qos, boolean retained) throws MqttException {
if (!mqttClient.isConnected()) {
mqttClient.reconnect(); // 若連接斷開(kāi),先重連
}
// 創(chuàng)建MQTT消息
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
message.setRetained(retained);
// 發(fā)布消息
mqttClient.publish(topic, message);
}
/**
* 訂閱主題(重載方法,使用默認(rèn)QoS)
*/
public void subscribe(String topic) throws MqttException {
subscribe(topic, 1); // 默認(rèn)QoS=1
}
/**
* 訂閱主題
*/
public void subscribe(String topic, int qos) throws MqttException {
if (!mqttClient.isConnected()) {
mqttClient.reconnect();
}
mqttClient.subscribe(topic, qos);
System.out.println("已訂閱主題:" + topic + "(QoS:" + qos + ")");
}
/**
* 取消訂閱主題
*/
public void unsubscribe(String topic) throws MqttException {
if (mqttClient.isConnected()) {
mqttClient.unsubscribe(topic);
System.out.println("已取消訂閱主題:" + topic);
}
}
}編寫(xiě)測(cè)試接口:驗(yàn)證 MQTT 功能
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Resource
private MqttUtil mqttUtil;
/**
* 發(fā)布MQTT消息接口
* @param topic 主題(如test/topic)
* @param message 消息內(nèi)容
* @param qos QoS級(jí)別(0/1/2,默認(rèn)1)
* @return 發(fā)布結(jié)果
*/
@PostMapping("/publish")
public String publish(
@RequestParam String topic,
@RequestParam String message,
@RequestParam(required = false, defaultValue = "1") int qos) {
try {
// 發(fā)布消息(retained=false:不保留消息)
mqttUtil.publish(topic, message, qos, false);
return"消息發(fā)布成功!主題:" + topic + ",內(nèi)容:" + message;
} catch (MqttException e) {
e.printStackTrace();
return"消息發(fā)布失敗:" + e.getMessage();
}
}
}



























