国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

RocketMQ控制臺消費者堆棧信息展示優(yōu)化分析

開發(fā) 前端
在同一個程序中創(chuàng)建兩個不同Group ID的消費端實例,在控制臺中查看一個Group ID下單個消費端堆棧信息,堆棧信息中包含了兩個Group ID消費端的堆棧信息,給排查問題造成了困擾。

背景介紹

專有云企業(yè)版v_3_12,消息隊列RocketMQ控制臺->Group管理,查看Group ID下單個消費端堆棧信息,期望只展示與該Group ID相關(guān)的堆棧信息,在以下場景與期望不符。

場景介紹

在同一個程序中創(chuàng)建兩個不同Group ID的消費端實例,在控制臺中查看一個Group ID下單個消費端堆棧信息,堆棧信息中包含了兩個Group ID消費端的堆棧信息,給排查問題造成了困擾。

示例代碼

pom

<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.3.Final</version>
</dependency>

code

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class Main {
public static void main(String[] args){
String nameSrvAddr = "xxx";
String accessKey = "xxx";
String secretKey = "xxx";
String groupId1 = "Goup_ID_1";
String topic1 = "xxx_1";
String tag1 = "xxx_1";
BatchMessageListener batchMessageListener1 = (messages, context) -> Action.CommitMessage;
BatchConsumerBean batchConsumerBean1 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,
groupId1,topic1,tag1,batchMessageListener1);
batchConsumerBean1.start();

String groupId2 = "Goup_ID_2";
String topic2 = "xxx_2";
String tag2 = "xxx_2";
BatchMessageListener batchMessageListener2 = (messages, context) -> Action.CommitMessage;
BatchConsumerBean batchConsumerBean2 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,
groupId2,topic2,tag2,batchMessageListener2);
batchConsumerBean2.start();
}

private static BatchConsumerBean batchConsumerBean(String nameSrvAddr,String accessKey,String secretKey,String groupId,String topic,String tag,BatchMessageListener batchMessageListener){
BatchConsumerBean batchConsumerBean = new BatchConsumerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr);
properties.put(PropertyKeyConst.AccessKey,accessKey);
properties.put(PropertyKeyConst.SecretKey,secretKey);
properties.put(PropertyKeyConst.GROUP_ID,groupId);
batchConsumerBean.setProperties(properties);

Subscription subscription = new Subscription();
subscription.setTopic(topic);
subscription.setExpression(tag);
Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<>();
subscriptionTable.put(subscription,batchMessageListener);

batchConsumerBean.setSubscriptionTable(subscriptionTable);
return batchConsumerBean;
}
}

分析過程

首先分析示例代碼中與BatchConsumerBean相關(guān)聯(lián)的對象,然后分析控制臺展示消費端堆棧信息的流程,最后分析下不同版本的RocketMQ Client SDK對消費端消費線程命名方式的變化。

BatchConsumerBean

示例代碼中創(chuàng)建了兩個BatchConsumerBean實例,與BatchConsumerBean實例相關(guān)聯(lián)的對象如下:

與BatchConsumerBean關(guān)聯(lián)的對象

從上圖看,BatchConsumerBean實例是比較重的,所以上面的示例代碼可以優(yōu)化為只創(chuàng)建一個BatchConsumerBean實例,與該問題不太相關(guān),暫時忽略;
上圖中與該問題直接相關(guān)的是ClientRemotingProcessor、MQClientInstance、DefaultMQPushConsumerImpl、ConsumerStatsManager,下面繼續(xù)分析。

堆棧信息展示流程

下面描述的是在瀏覽器請求一個Group ID單個消費端堆棧信息的流程。

堆棧信息展示流程

瀏覽器請求控制臺應(yīng)用

當在控制臺單機某個消費端堆棧信息的時候,瀏覽器會向控制臺應(yīng)用發(fā)起http請求,主要請求參數(shù)是:
GroupID,ClientId,其中每個MQClientInstance實例對應(yīng)一個ClientId。

控制臺應(yīng)用請求Broker

控制臺應(yīng)用收到瀏覽器請求后,主要進行以下操作:

String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,timeoutMillis * 3);
}
}
}
  1. 根據(jù)%RETRY% + GroupIID查找對應(yīng)的TopicRouteData
  2. 從TopicRouteData中選擇一個Broker的地址發(fā)送getConsumerRunningInfo請求

Broker請求Consumer

Broker收到請求后,主要進行以下操作:

ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);
newRequest.setExtFields(request.getExtFields());
newRequest.setBody(request.getBody());
return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
  1. AdminBrokerProcessor響應(yīng)查詢請求
  2. 根據(jù)GroupID和ClientId找到對應(yīng)Consumer實例的channel socket
  3. 通過channel socket發(fā)送請求到Consumer實例

Consumer處理邏輯

Consumer收到請求后,主要進行以下操作:

ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
if (requestHeader.isJstackEnable()) {
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
String jstack = UtilAll.jstack(map);
consumerRunningInfo.setJstack(jstack);
}
  1. 通過MQClientInstance實例請求Consumer實例的consumerRunningInfo方法獲取Consumer運行信息,如:pullRT、pullTPS、consumeRT、consumeOKTPS、consumeFailedTPS等信息
  2. 獲取JVM所有線程棧信息
  3. 將獲取到的ConsumerRunningInfo返回給Broker。

其中第2步【獲取JVM所有線程棧信息】就是我們需要查看的堆棧信息,目前控制臺主要展示了以ConsumeMessageThread__開頭的線程和RebalanceService線程,這塊期望只展示與該消費端相關(guān)的ConsumeMessageThread__線程和Rebalance線程,不期望將不相關(guān)的消費端線程也展示出來。

ConsumeMessageThread線程的命名

在當前版本中處理業(yè)務(wù)的消費者線程名的形式是:ConsumeMessageThread_數(shù)字,
ConsumeMessageConcurrentlyService類中相關(guān)代碼如下:

//該線程池用于處理業(yè)務(wù)邏輯
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));

新版本中線程的命名中增加了GroupId,相關(guān)代碼如下:

String consumeThreadPrefix = null;
if (consumerGroup.length() > 100) {
consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup, 0, 100).append("_").toString();
} else {
consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();
}
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));

線程名形式為:ConsumeMessageThread_GroupId__數(shù)字,從一定程度對以上問題進行了優(yōu)化。

總結(jié)

  1. ONS SDK對RocketMQ Client進行了封裝,更加方便業(yè)務(wù)的使用,Consumer對象比較重,需要根據(jù)業(yè)務(wù)采用合理的初始化方式
  2. ConsumerStatsManager記錄了消費端的一些統(tǒng)計信息
  3. ConsumeMessageConcurrentlyService對消費端線程命名進行了優(yōu)化?
責任編輯:武曉燕 來源: 今日頭條
相關(guān)推薦

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2022-07-07 09:00:49

RocketMQ消費者消息消費

2022-11-08 07:36:17

RocketMQ消費者消息堆積

2024-01-24 09:00:31

SSD訂閱關(guān)系內(nèi)存

2024-04-22 00:00:00

RocketMQ優(yōu)化位點

2022-03-30 08:40:00

JavaScript控制臺

2022-05-09 11:15:05

RocketMQPULL 模式PUSH 模式

2011-07-22 17:05:56

IOS 控制臺 GDB

2011-07-06 15:25:33

Windows控制臺

2023-06-01 08:08:38

kafka消費者分區(qū)策略

2015-08-26 09:39:30

java消費者

2010-12-21 14:32:43

操作控制臺

2011-08-05 16:21:24

2011-07-22 16:25:38

CA TechnoloIT消費化

2022-03-14 11:05:01

RocketMQRedis緩存

2009-08-13 13:14:31

C#生產(chǎn)者和消費者

2011-08-08 10:55:14

IOS 控制臺 Consol

2009-06-15 09:50:34

JBoss控制臺

2017-10-15 10:24:27

開發(fā)

2015-06-15 11:29:34

數(shù)據(jù)中心綠色數(shù)據(jù)中心
點贊
收藏

51CTO技術(shù)棧公眾號

欧美在线黄色| 免费看日本一区二区| 一区视频在线播放| 欧美精品123| 激情视频亚洲| 欧美另类高清zo欧美| 特级毛片在线观看| 99天天综合性| 日本一区免费看| 91九色精品| 992tv成人免费影院| 视频精品导航| 亚洲福利视频久久| 91大神xh98hx在线播放| 亚洲综合视频网| 黄色三级视频在线| 国产毛片一区二区| 日韩美女一区| 亚洲午夜久久久久久尤物| 国产精品18久久久久久首页狼| 欧美一区二区三区婷婷| 亚洲成人网在线| 日韩经典av| 欧美日韩国产精品自在自线| 伪装者免费全集在线观看| 一区二区三区美女视频| gogo高清免费视频| 亚洲欧美日韩成人高清在线一区| 性chinese极品按摩| 欧美激情综合在线| 国产又黄又猛又粗又爽的视频| 成人视屏免费看| 日韩亚洲欧美视频| 不卡在线观看av| 国产资源在线视频| av在线不卡电影| 嫩草av久久伊人妇女超级a| 97久久人人超碰| 一道本视频在线观看| 国产精品日产欧美久久久久| 国产裸体免费无遮挡| 国产精品国产三级国产aⅴ中文 | 欧美精品一卡二卡| eeuss影院在线观看| 欧美视频在线一区| 97超碰资源站在线观看| 亚洲精品久久久久中文字幕二区| 在线免费av资源| 在线成人激情视频| 精品国产一区二| 国产99久久精品一区二区| 欧美韩国日本在线观看| 国产一区精品在线| 欧美96一区二区免费视频| 男人添女人下部视频免费| 久久亚洲欧美国产精品乐播| 成年人在线免费观看视频网站| 欧美日韩激情视频| 成人直播在线| 在线视频精品一| 久久97精品| 97超碰人人模人人爽人人看| 青青青伊人色综合久久| 北条麻妃69av| 亚洲综合免费观看高清在线观看| 国产一区二区三区福利| 日韩欧美高清在线| 免费一级欧美在线观看视频| 91国产美女在线观看| 一区二区在线影院| 最新av在线免费观看| 国产精品嫩草影院com| 国产免费a∨片在线观看不卡| 日韩成人av在线| 欧美性生活一级片| 久久av二区| 91欧美激情一区二区三区成人| 黄页视频在线免费观看| 亚洲第一中文字幕在线观看| 北条麻妃一区二区三区在线| 成人毛片网站| 99久久精品国产导航| 四虎精品在永久在线观看 | 中文字幕av专区| 欧美视频二区36p| 97久久香蕉国产线看观看| 日av在线播放中文不卡| 亚洲经典自拍| 少妇性l交大片| 88在线观看91蜜桃国自产| 99re久久| 国产精品99久久久久久久| 波多野结衣在线一区| 久热av在线| 欧美另类极品videosbestfree| 欧美日一区二区三区在线观看国产免| 欧美国产综合在线| 色综合久久综合网| 国色天香久久精品国产一区| 激情小说综合区| 国产精品国产三级国产aⅴ无密码| 懂色av中文在线| 午夜剧场成人观在线视频免费观看| 久久av一区| eeuss在线观看| 久久久www成人免费精品| 国产精品日韩精品欧美精品| 色琪琪原网站亚洲香蕉| 精品香蕉在线观看视频一| 亚洲高清资源在线观看| 国产真实乱子伦| 亚洲激情视频在线播放| 欧美 日韩 国产一区二区在线视频 | 1204国产成人精品视频| 免费试看一区| 亚洲国产一区二区a毛片| 国产第一亚洲| 欧美日韩在线观看一区二区三区| 亚洲日韩欧美一区二区在线| 丁香久久综合| 亚洲精品在线视频观看| 91国产福利在线| 亚洲伊人春色| 已婚少妇美妙人妻系列| 亚洲免费一在线| 久久资源在线| av免费观看一区二区| 国产在线观看精品一区二区三区| 国产精品久久免费看| 中文成人在线| 九色自拍视频在线观看| 亚洲第一网中文字幕| 亚洲一区区二区| av播放在线观看| 国产欧美韩国高清| 国产精品毛片久久久久久久| 高清av一区二区三区| 中文字幕剧情在线观看一区| 欧美精品三级日韩久久| 欧美激情一区| 伪装者在线观看完整版免费| 国产精品流白浆视频| 亚洲欧美激情一区二区| 老司机精品视频在线播放| 成人亚洲精品777777大片| 色综合久久久888| 国产日韩精品久久久| 日本免费一区二区三区视频| 久久久一本二本三本| 在线精品91av| 99久久亚洲一区二区三区青草| 欧美日韩国产网站| 久久精品xxx| 日韩在线资源网| 2023国产精品自拍| 日韩精品一级| siro系绝美精品系列| 国产成人精品久久二区二区91| 亚洲免费观看高清完整版在线| 伊甸园亚洲一区| 黄色一级影院| 91网站免费观看| 欧美日韩日日夜夜| 日韩一区精品字幕| 天堂中文在线播放| 日韩极品视频在线观看 | 九色|91porny| 超碰一区二区| 国产精品网站免费| 久久久免费在线观看| 日韩久久一区二区| 中国精品18videos性欧美| 视频免费一区| 中文字幕欧美日韩一区二区| 亚洲欧美在线第一页| 久久久一区二区三区| 婷婷精品在线| 成人精品一区二区三区免费| 亚洲欧美日韩精品久久久| 中文字幕亚洲欧美日韩在线不卡| 国产午夜精品一区二区三区嫩草 | 国产又猛又黄的视频| 国产精品麻豆va在线播放| 欧美日韩中文精品| 国产精品系列在线观看| 天堂精品久久久久| 日韩精品123| 在线视频不卡国产| 国产69精品99久久久久久宅男| 亚洲第一狼人社区| 久久久久久久欧美精品| 成人av色网站| 色网址在线观看| 亚洲韩国在线| 97视频在线播放| 91精品欧美久久久久久动漫 | 欧美激情亚洲视频| 欧美性猛交xxxx富婆| 国产在线精品视频| 久久99国产精品视频|