国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

簡(jiǎn)單易用的消息隊(duì)列框架的設(shè)計(jì)與實(shí)現(xiàn)

開發(fā) 開發(fā)工具
消息隊(duì)列在互聯(lián)網(wǎng)領(lǐng)域里得到了廣泛的應(yīng)用,它多應(yīng)用在異步處理、模塊之間的解偶和高并發(fā)的消峰等場(chǎng)景,消息隊(duì)列中表現(xiàn)最好的當(dāng)屬Apache開源項(xiàng)目Kafka,Kafka使用支持高并發(fā)的Scala語言開發(fā),利用操作系統(tǒng)的緩存原理達(dá)到高性能,并且天生具有可分區(qū),分布式的特點(diǎn),而且有不同語言的客戶端,使用起來非常的方便。

[[189769]]

1 背景介紹

消息隊(duì)列在互聯(lián)網(wǎng)領(lǐng)域里得到了廣泛的應(yīng)用,它多應(yīng)用在異步處理、模塊之間的解偶和高并發(fā)的消峰等場(chǎng)景,消息隊(duì)列中表現(xiàn)最好的當(dāng)屬Apache開源項(xiàng)目Kafka,Kafka使用支持高并發(fā)的Scala語言開發(fā),利用操作系統(tǒng)的緩存原理達(dá)到高性能,并且天生具有可分區(qū),分布式的特點(diǎn),而且有不同語言的客戶端,使用起來非常的方便。

Kclient是Kafka生產(chǎn)者客戶端和消費(fèi)者客戶端的一個(gè)簡(jiǎn)單易用的框架,它具有高效集成、高性能、高穩(wěn)定的高級(jí)特點(diǎn)。

在繼續(xù)閱讀kclient的功能特性、架構(gòu)設(shè)計(jì)和使用方法之前,讀者需要對(duì)Kafka進(jìn)行基本的學(xué)習(xí)和了解。如果讀者是Kafka的初學(xué)者,而且英文還不錯(cuò),也可以直接參考Kafka官方在線文檔:Kafka 0.8.2 Documentation,如果對(duì)英文不感性趣,可以在網(wǎng)上搜索Kakfa的中文資料進(jìn)行學(xué)習(xí),內(nèi)容需要涵蓋Kafka的使用向?qū)б约袄貌僮飨到y(tǒng)緩存的“空中接力”、持久化、分片機(jī)制、高可用等原理。

本文包含了背景介紹、功能特性、架構(gòu)設(shè)計(jì)、使用指南、API簡(jiǎn)介、后臺(tái)監(jiān)控和管理、消息處理機(jī)模板項(xiàng)目、以及性能壓測(cè)相關(guān)章節(jié)。如果你想使用kclient快速的構(gòu)建Kafka處理機(jī)服務(wù),請(qǐng)參考消息處理機(jī)模板項(xiàng)目章節(jié); 如果你想了解kclient的其他使用方式、功能特性、監(jiān)控和管理等,請(qǐng)參考功能特性、使用指南、API簡(jiǎn)介、后臺(tái)監(jiān)控和管理等章節(jié); 如果你想更深入的理解kclient的架構(gòu)設(shè)計(jì)和性能指標(biāo),請(qǐng)參考架構(gòu)設(shè)計(jì)和性能壓測(cè)章節(jié)。

2 功能特性

2.1 簡(jiǎn)單易用

簡(jiǎn)化了Kafka客戶端API的使用方法, 特別是對(duì)消費(fèi)端開發(fā),消費(fèi)端開發(fā)者只需要實(shí)現(xiàn)MessageHandler接口或者相關(guān)子類,在實(shí)現(xiàn)中處理消息完成業(yè)務(wù)邏輯,并且在主線程中啟動(dòng)封裝的消費(fèi)端服務(wù)器即可。它提供了各種常用的MessageHandler,框架自動(dòng)轉(zhuǎn)換消息到領(lǐng)域?qū)ο竽P突蛘逬SON對(duì)象等數(shù)據(jù)結(jié)構(gòu),讓開發(fā)者更專注于業(yè)務(wù)處理。如果使用服務(wù)源碼注解的方式聲明消息處理機(jī)的后臺(tái),可以將一個(gè)通用的服務(wù)方法直接轉(zhuǎn)變成具有完善功能的處理Kafka消息隊(duì)列的處理機(jī),使用起來極其簡(jiǎn)單,代碼看起來一目了然,在框架級(jí)別通過多種線程池技術(shù)保證了處理機(jī)的高性能。

在使用方面,它提供了多種使用方式:

  1. 直接使用Java API;
  2. 與Spring環(huán)境無縫集成;
  3. 服務(wù)源碼注解,通過注解聲明方式啟動(dòng)Kafka消息隊(duì)列的處理機(jī)。

除此之外,它基于注解提供了消息處理機(jī)的模板項(xiàng)目,可以根據(jù)模板項(xiàng)目通過配置快速開發(fā)Kafka的消息處理機(jī)。

2.2 高性能

為了在不同的業(yè)務(wù)場(chǎng)景下實(shí)現(xiàn)高性能, 它提供不同的線程模型:

適合輕量級(jí)服務(wù)的同步線程模型;

適合IO密集型服務(wù)的異步線程模型(細(xì)分為所有消費(fèi)者流共享線程池和每個(gè)流獨(dú)享線程池)。

另外,異步模型中的線程池也支持確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池。

2.3 高穩(wěn)定性

框架級(jí)別處理了常見的異常,計(jì)入錯(cuò)誤日志,可用于錯(cuò)誤手工恢復(fù)或者洗數(shù)據(jù),并實(shí)現(xiàn)了優(yōu)雅關(guān)機(jī)和重啟等功能。

3 架構(gòu)設(shè)計(jì)

3.1 線程模型

1. 同步線程模型

在這種線程模型中,客戶端為每一個(gè)消費(fèi)者流使用一個(gè)線程,每個(gè)線程負(fù)責(zé)從Kafka隊(duì)列里消費(fèi)消息,并且在同一個(gè)線程里進(jìn)行業(yè)務(wù)處理。我們把這些線程稱為消費(fèi)線程,把這些線程所在的線程池叫做消息消費(fèi)線程池。這種模型之所以在消息消費(fèi)線程池處理業(yè)務(wù),是因?yàn)樗嘤糜谔幚磔p量級(jí)別的業(yè)務(wù),例如:緩存查詢、本地計(jì)算等。

2. 異步線程模型

在這種線程模型中,客戶端為每一個(gè)消費(fèi)者流使用一個(gè)線程,每個(gè)線程負(fù)責(zé)從Kafka隊(duì)列里消費(fèi)消息,并且傳遞消費(fèi)得到的消息到后端的異步線程池,在異步線程池中處理業(yè)務(wù)。我們?nèi)匀话亚懊尕?fù)責(zé)消費(fèi)消息的線程池稱為消息消費(fèi)線程池,把后面的異步線程池稱為異步業(yè)務(wù)線程池。這種線程模型適合重量級(jí)的業(yè)務(wù),例如:業(yè)務(wù)中有大量的IO操作、網(wǎng)絡(luò)IO操作、復(fù)雜計(jì)算、對(duì)外部系統(tǒng)的調(diào)用等。

后端的異步業(yè)務(wù)線程池又細(xì)分為所有消費(fèi)者流共享線程池和每個(gè)流獨(dú)享線程池。

1)所有消費(fèi)者流共享線程池

所有消費(fèi)者流共享線程池對(duì)比每個(gè)流獨(dú)享線程池,創(chuàng)建更少的線程池對(duì)象,能節(jié)省些許的內(nèi)存,但是,由于多個(gè)流共享同一個(gè)線程池,在數(shù)據(jù)量較大的時(shí)候,流之間的處理可能互相影響。例如,一個(gè)業(yè)務(wù)使用2個(gè)區(qū)和兩個(gè)流,他們一一對(duì)應(yīng),通過生產(chǎn)者指定定制化的散列函數(shù)替換默認(rèn)的key-hash, 實(shí)現(xiàn)一個(gè)流(區(qū))用來處理普通用戶,另外一個(gè)流(區(qū))用來處理VIP用戶,如果兩個(gè)流共享一個(gè)線程池,當(dāng)普通用戶的消息大量產(chǎn)生的時(shí)候,VIP用戶只有少量,并且排在了隊(duì)列的后頭,就會(huì)產(chǎn)生餓死的情況。這個(gè)場(chǎng)景是可以使用多個(gè)topic來解決,一個(gè)普通用戶的topic,一個(gè)VIP用戶的topic,但是這樣又要多維護(hù)一個(gè)topic,客戶端發(fā)送的時(shí)候需要顯式的進(jìn)行判斷topic目標(biāo),也沒有多少好處。

2)每個(gè)流獨(dú)享線程池

每個(gè)流獨(dú)享線程池使用不同的異步業(yè)務(wù)線程池來處理不同的流里面的消息,互相隔離,互相獨(dú)立,不互相影響,對(duì)于不同的流(區(qū))的優(yōu)先級(jí)不同的情況,或者消息在不同流(區(qū))不均衡的情況下表現(xiàn)會(huì)更好,當(dāng)然,創(chuàng)建多個(gè)線程池會(huì)多使用些許內(nèi)存,但是這并不是一個(gè)大問題。

另外,異步業(yè)務(wù)線程池支持確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池。

  1. 核心業(yè)務(wù)硬件資源有保證,核心服務(wù)有專享的資源池,或者線上流量可預(yù)測(cè),請(qǐng)使用固定數(shù)量的線程池。
  2. 非核心業(yè)務(wù)一般混布,資源互相調(diào)配,線上流量不固定等情況請(qǐng)使用線程數(shù)量可伸縮的線程池。

3.2 異常處理

對(duì)于消息處理過程中產(chǎn)生的業(yè)務(wù)異常,當(dāng)前在業(yè)務(wù)處理的上層捕捉了Throwable, 在專用的錯(cuò)誤恢復(fù)日志中記錄出錯(cuò)的消息,后續(xù)可根據(jù)錯(cuò)誤恢復(fù)日志人工處理錯(cuò)誤消息,也可重做或者洗數(shù)據(jù)。TODO:考慮實(shí)現(xiàn)異常Listener體系結(jié)構(gòu), 對(duì)異常處理實(shí)現(xiàn)監(jiān)聽者模式,異常處理器可插拔等,默認(rèn)打印錯(cuò)誤日志。

由于默認(rèn)的異常處理中,捕捉異常,在專用的錯(cuò)誤回復(fù)日志中記錄錯(cuò)誤,并且繼續(xù)處理下一個(gè)消息。考慮到可能上線失敗,或者上游消息格式出錯(cuò),導(dǎo)致所有消息處理都出錯(cuò),堆滿錯(cuò)誤恢復(fù)日志的情況,我們需要訴諸于報(bào)警和監(jiān)控系統(tǒng)來解決。

3.3 優(yōu)雅關(guān)機(jī)

由于消費(fèi)者本身是一個(gè)事件驅(qū)動(dòng)的服務(wù)器,類似Tomcat,Tomcat接收HTTP請(qǐng)求返回HTTP響應(yīng),Consumer則接收Kafka消息,然后處理業(yè)務(wù)后返回,也可以將處理結(jié)果發(fā)送到下一個(gè)消息隊(duì)列。所以消費(fèi)者本身是非常復(fù)雜的,除了線程模型,異常處理,性能,穩(wěn)定性,可用性等都是需要思考點(diǎn)。既然消費(fèi)者是一個(gè)后臺(tái)的服務(wù)器,我們需要考慮如何優(yōu)雅的關(guān)機(jī),也就是在消費(fèi)者服務(wù)器在處理消息的時(shí)候,如果關(guān)機(jī)才能不導(dǎo)致處理的消息中斷而丟失。

優(yōu)雅關(guān)機(jī)的重點(diǎn)在于解決如下3個(gè)問題:

  1. 如何知道JVM要退出?
  2. 如何阻止Daemon的線程在JVM退出被殺掉而導(dǎo)致消息丟失?
  3. 如果Worker線程在阻塞,如何喚起并退出?

第一個(gè)問題:如果一個(gè)后臺(tái)程序運(yùn)行在控制臺(tái)的前臺(tái),通過Ctrl + C可以發(fā)送退出信號(hào)給JVM,也可以通過kill -2 PS_IS 或者 kill -15 PS_IS發(fā)送退出信號(hào),但是不能發(fā)送kill -9 PS_IS, 否則進(jìn)程會(huì)無條件強(qiáng)制退出。JVM收到退出信號(hào)后,會(huì)調(diào)用注冊(cè)的鉤子,我們通過的注冊(cè)的JVM退出鉤子進(jìn)行優(yōu)雅關(guān)機(jī)。

第二個(gè)問題:線程分為Daemon線程和非Daemon線程,一個(gè)線程默認(rèn)繼承父線程的Daemon屬性,如果當(dāng)前線程池是由Daemon線程創(chuàng)建的,則Worker線程是Daemon線程。如果Worker線程是Daemon線程,我們需要在JVM退出鉤子中等待Worker線程完成當(dāng)前手頭處理的消息,再退出JVM。如果不是Daemon線程,即使JVM收到退出信號(hào),也得等待Worker線程退出后再退出,不會(huì)丟掉正在處理的消息。

第三個(gè)問題:在Worker線程從Kafka服務(wù)器消費(fèi)消息的時(shí)候,Worker線程可能處于阻塞,這時(shí)需要中斷線程以退出,沒有消息被丟掉。在Worker線程處理業(yè)務(wù)時(shí)有可能有阻塞,例如:IO,網(wǎng)絡(luò)IO,在指定退出時(shí)間內(nèi)沒有完成,我們也需要中斷線程退出,這時(shí)會(huì)產(chǎn)生一個(gè)InterruptedException, 在異常處理的默認(rèn)處理器中被捕捉,并寫入錯(cuò)誤日志,Worker線程隨后退出。

4 使用指南

kclient提供了三種使用方法,對(duì)于每一種方法,按照下面的步驟可快速構(gòu)建Kafka生產(chǎn)者和消費(fèi)者程序。

4.1 前置步驟

1.下載源代碼后在項(xiàng)目根目錄執(zhí)行如下命令安裝打包文件到你的Maven本地庫。

  1. mvn install 

2.在你的項(xiàng)目pom.xml文件中添加對(duì)kclient的依賴。

  1. <dependency> 
  2.     <groupId>com.robert.kafka</groupId> 
  3.     <artifactId>kclient-core</artifactId> 
  4.     <version>0.0.1</version> 
  5. </dependency> 

3.根據(jù)Kafka官方文檔搭建Kafka環(huán)境,并創(chuàng)建兩個(gè)Topic, test1和test2。

4.然后,從Kafka安裝目錄的config目錄下拷貝kafka-consumer.properties和kafka-producer.properties到你的項(xiàng)目類路徑下,通常是src/main/resources目錄。

4.2 Java API

Java API提供了最直接,最簡(jiǎn)單的使用kclient的方法。

構(gòu)建Producer示例:

  1. KafkaProducer kafkaProducer = new KafkaProducer("kafka-producer.properties""test"); 
  2.  
  3. for (int i = 0; i < 10; i++) { 
  4.     Dog dog = new Dog(); 
  5.     dog.setName("Yours " + i); 
  6.     dog.setId(i); 
  7.     kafkaProducer.sendBean2Topic("test", dog); 
  8.  
  9.     System.out.format("Sending dog: %d \n", i + 1); 
  10.  
  11.     Thread.sleep(100); 

構(gòu)建Consumer示例:

  1. DogHandler mbe = new DogHandler(); 
  2.  
  3. KafkaConsumer kafkaConsumer = new KafkaConsumer("kafka-consumer.properties""test", 1, mbe); 
  4. try { 
  5.     kafkaConsumer.startup(); 
  6.  
  7.     try { 
  8.         System.in.read(); 
  9.     } catch (IOException e) { 
  10.         e.printStackTrace(); 
  11.     } 
  12. } finally { 
  13.     kafkaConsumer.shutdownGracefully(); 
  1. public class DogHandler extends BeanMessageHandler<Dog> { 
  2.     public DogHandler() { 
  3.         super(Dog.class); 
  4.     } 
  5.  
  6.     protected void doExecuteBean(Dog dog) { 
  7.         System.out.format("Receiving dog: %s\n", dog); 
  8.     } 

4.3 Spring環(huán)境集成

kclient可以與Spring環(huán)境無縫集成,你可以像使用Spring Bean一樣來使用KafkaProducer和KafkaConsumer。

構(gòu)建Producer示例:

  1. ApplicationContext ac = new ClassPathXmlApplicationContext("kafka-producer.xml"); 
  2.  
  3. KafkaProducer kafkaProducer = (KafkaProducer) ac.getBean("producer"); 
  4.  
  5. for (int i = 0; i < 10; i++) { 
  6.     Dog dog = new Dog(); 
  7.     dog.setName("Yours " + i); 
  8.     dog.setId(i); 
  9.     kafkaProducer.send2Topic("test", JSON.toJSONString(dog)); 
  10.  
  11.     System.out.format("Sending dog: %d \n", i + 1); 
  12.  
  13.     Thread.sleep(100); 
  1. <bean name="producer" class="com.robert.kafka.kclient.core.KafkaProducer" init-method="init"
  2.     <property name="propertiesFile" value="kafka-producer.properties"/> 
  3.     <property name="defaultTopic" value="test"/> 
  4. </bean> 

構(gòu)建Consumer示例:

  1. ApplicationContext ac = new ClassPathXmlApplicationContext( 
  2.         "kafka-consumer.xml"); 
  3.  
  4. KafkaConsumer kafkaConsumer = (KafkaConsumer) ac.getBean("consumer"); 
  5. try { 
  6.     kafkaConsumer.startup(); 
  7.  
  8.     try { 
  9.         System.in.read(); 
  10.     } catch (IOException e) { 
  11.         e.printStackTrace(); 
  12.     } 
  13. } finally { 
  14.     kafkaConsumer.shutdownGracefully(); 
  1. public class DogHandler extends BeanMessageHandler<Dog> { 
  2.     public DogHandler() { 
  3.         super(Dog.class); 
  4.     } 
  5.  
  6.     protected void doExecuteBean(Dog dog) { 
  7.         System.out.format("Receiving dog: %s\n", dog); 
  8.     } 
  1. <bean name="dogHandler" class="com.robert.kafka.kclient.sample.api.DogHandler" /> 
  2.  
  3. <bean name="consumer" class="com.robert.kafka.kclient.core.KafkaConsumer" init-method="init"
  4.     <property name="propertiesFile" value="kafka-consumer.properties" /> 
  5.     <property name="topic" value="test" /> 
  6.     <property name="streamNum" value="1" /> 
  7.     <property name="handler" ref="dogHandler" /> 
  8. </bean> 

4.4 服務(wù)源碼注解

kclient提供了類似Spring聲明式的編程方法,使用注解聲明Kafka處理器方法,所有的線程模型、異常處理、服務(wù)啟動(dòng)和關(guān)閉等都由后臺(tái)服務(wù)自動(dòng)完成,極大程度的簡(jiǎn)化了API的使用方法,提高了開發(fā)者的工作效率。

注解聲明Kafka消息處理器:

  1. @KafkaHandlers 
  2. public class AnnotatedDogHandler { 
  3.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) 
  4.     @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1"
  5.     public Cat dogHandler(Dog dog) { 
  6.         System.out.println("Annotated dogHandler handles: " + dog); 
  7.  
  8.         return new Cat(dog); 
  9.     } 
  10.  
  11.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) 
  12.     public void catHandler(Cat cat) throws IOException { 
  13.         System.out.println("Annotated catHandler handles: " + cat); 
  14.  
  15.         throw new IOException("Man made exception."); 
  16.     } 
  17.  
  18.     @ErrorHandler(exception = IOException.class, topic = "test1"
  19.     public void ioExceptionHandler(IOException e, String message) { 
  20.         System.out.println("Annotated excepHandler handles: " + e); 
  21.     } 

注解啟動(dòng)程序:

  1. public static void main(String[] args) { 
  2.     ApplicationContext ac = new ClassPathXmlApplicationContext( 
  3.             "annotated-kafka-consumer.xml"); 
  4.  
  5.     try { 
  6.         System.in.read(); 
  7.     } catch (IOException e) { 
  8.         e.printStackTrace(); 
  9.     } 

注解Spring環(huán)境配置:

  1. <bean name="kclientBoot" class="com.robert.kafka.kclient.boot.kclientBoot" init-method="init"/> 
  2.  
  3. <context:component-scan base-package="com.robert.kafka.kclient.sample.annotation" /> 

5 API簡(jiǎn)介

5.1 Producer API

KafkaProducer類提供了豐富的API來發(fā)送不同類型的消息,它支持發(fā)送字符串消息,發(fā)送一個(gè)普通的Bean,以及發(fā)送JSON對(duì)象等。在這些API中可以指定發(fā)送到某個(gè)Topic,也可以不指定而使用默認(rèn)的Topic。對(duì)于發(fā)送的數(shù)據(jù),支持帶Key值的消息和不帶Key值的消息。

發(fā)送字符串消息:

  1. public void send(String message); 
  2. public void send2Topic(String topicName, String message);  
  3. public void send(String key, String message);  
  4. public void send2Topic(String topicName, String key, String message);  
  5. public void send(Collection<String> messages);  
  6. public void send2Topic(String topicName, Collection<String> messages);  
  7. public void send(Map<String, String> messages);  
  8. public void send2Topic(String topicName, Map<String, String> messages); 

發(fā)送Bean消息:

  1. public <T> void sendBean(T bean);  
  2. public <T> void sendBean2Topic(String topicName, T bean);  
  3. public <T> void sendBean(String key, T bean);  
  4. public <T> void sendBean2Topic(String topicName, String key, T bean);  
  5. public <T> void sendBeans(Collection<T> beans);  
  6. public <T> void sendBeans2Topic(String topicName, Collection<T> beans);  
  7. public <T> void sendBeans(Map<String, T> beans);  
  8. public <T> void sendBeans2Topic(String topicName, Map<String, T> beans); 

發(fā)送JSON對(duì)象消息:

  1. public void sendObject(JSONObject jsonObject);  
  2. public void sendObject2Topic(String topicName, JSONObject jsonObject);  
  3. public void sendObject(String key, JSONObject jsonObject);  
  4. public void sendObject2Topic(String topicName, String key, JSONObject jsonObject);  
  5. public void sendObjects(JSONArray jsonArray);  
  6. public void sendObjects2Topic(String topicName, JSONArray jsonArray);  
  7. public void sendObjects(Map<String, JSONObject> jsonObjects);  
  8. public void sendObjects2Topic(String topicName, Map<String, JSONObject> jsonObjects); 

5.2 Consumer API

KafkaConsumer類提供了豐富的構(gòu)造函數(shù)用來指定Kafka消費(fèi)者服務(wù)器的各項(xiàng)參數(shù),包括線程池策略,線程池類型,流數(shù)量等等。

使用PROPERTIES文件初始化:

  1. public KafkaConsumer(String propertiesFile, String topic, int streamNum, MessageHandler handler); 
  2. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, MessageHandler handler); 
  3. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler); 
  4. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler); 
  5. public KafkaConsumer(String propertiesFile, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler); 

使用PROPERTIES對(duì)象初始化:

  1. public KafkaConsumer(Properties properties, String topic, int streamNum, MessageHandler handler); 
  2. public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, MessageHandler handler); 
  3. public KafkaConsumer(Properties properties, String topic, int streamNum, int fixedThreadNum, boolean isSharedThreadPool, MessageHandler handler); 
  4. public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, MessageHandler handler); 
  5. public KafkaConsumer(Properties properties, String topic, int streamNum, int minThreadNum, int maxThreadNum, boolean isSharedThreadPool,MessageHandler handler); 

5.3 消息處理器

消息處理器結(jié)構(gòu)提供了一個(gè)基本接口,并且提供了不同的抽象類實(shí)現(xiàn)不同層次的功能,讓功能得到最大化的重用,并且互相解偶,開發(fā)者可以根據(jù)需求選擇某一個(gè)抽象類來繼承和使用。

接口定義:

  1. public interface MessageHandler { 
  2.     public void execute(String message); 

安全處理異常抽象類:

  1. public abstract class SafelyMessageHandler implements MessageHandler { 
  2.     public void execute(String message) { 
  3.         try { 
  4.             doExecute(message); 
  5.         } catch (Throwable t) { 
  6.             handleException(t, message); 
  7.         } 
  8.     } 
  9.  
  10.     protected void handleException(Throwable t, String message) { 
  11.         for (ExceptionHandler excepHandler : excepHandlers) { 
  12.             if (t.getClass() == IllegalStateException.class 
  13.                     && t.getCause() != null 
  14.                     && t.getCause().getClass() == InvocationTargetException.class 
  15.                     && t.getCause().getCause() != null
  16.                 t = t.getCause().getCause(); 
  17.  
  18.             if (excepHandler.support(t)) { 
  19.                 try { 
  20.                     excepHandler.handle(t, message); 
  21.                 } catch (Exception e) { 
  22.                     log.error( 
  23.                             "Exception hanppens when the handler {} is handling the exception {} and the message {}. Please check if the exception handler is configured properly."
  24.                             excepHandler.getClass(), t.getClass(), message); 
  25.                     log.error( 
  26.                             "The stack of the new exception on exception is, "
  27.                             e); 
  28.                 } 
  29.             } 
  30.         } 
  31.     } 
  32.  
  33. protected abstract void doExecute(String message); 

面向類型的抽象類:

  1. public abstract class BeanMessageHandler<T> extends SafelyMessageHandler {...} 
  2. public abstract class BeansMessageHandler<T> extends SafelyMessageHandler {...} 
  3. public abstract class DocumentMessageHandler extends SafelyMessageHandler {...} 
  4. public abstract class ObjectMessageHandler extends SafelyMessageHandler {...} 
  5. public abstract class ObjectsMessageHandler extends SafelyMessageHandler {...} 

5.4 消息處理器注解

正如上面使用指南第三部分服務(wù)源碼注解所講述的那樣,kclient可以通過注解來聲明Kafka消息處理器,kclient提供了@KafkaHandlers、@InputConsumer、@OutputProducer和@ErrorHandler等注解。

@KafkaHandlers:

  1. @Target({ ElementType.TYPE }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. @Component 
  5. public @interface KafkaHandlers { 

@InputConsumer:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface InputConsumer { 
  5.     String propertiesFile() default ""
  6.  
  7.     String topic() default ""
  8.  
  9.     int streamNum() default 1; 
  10.  
  11.     int fixedThreadNum() default 0; 
  12.  
  13.     int minThreadNum() default 0; 
  14.  
  15.     int maxThreadNum() default 0; 

@OutputProducer:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface OutputProducer { 
  5.     String propertiesFile() default ""
  6.  
  7.     String defaultTopic() default ""

@ErrorHandler:

  1. @Target({ ElementType.METHOD }) 
  2. @Retention(RetentionPolicy.RUNTIME) 
  3. @Documented 
  4. public @interface ErrorHandler { 
  5.     Class<? extends Throwable> exception() default Throwable.class; 
  6.  
  7.     String topic() default ""

6 消息處理機(jī)模板項(xiàng)目

6.1 快速開發(fā)向?qū)?/strong>

通過下面步驟可以快速開發(fā)Kafka處理機(jī)服務(wù)。

1.從本項(xiàng)目下載kclient-processor項(xiàng)目模板,并且根據(jù)業(yè)務(wù)需要修改pom.xml后導(dǎo)入Eclipse。

2.根據(jù)業(yè)務(wù)需要更改com.robert.kclient.app.handler.AnimalsHandler類名稱,并且根據(jù)業(yè)務(wù)需要修改處理器的注解。這里,可以導(dǎo)入業(yè)務(wù)服務(wù)對(duì)消息進(jìn)行處理。

  1. @KafkaHandlers 
  2. public class AnimalsHandler { 
  3.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1) 
  4.     @OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1"
  5.     public Cat dogHandler(Dog dog) { 
  6.         System.out.println("Annotated dogHandler handles: " + dog); 
  7.  
  8.         return new Cat(dog); 
  9.     } 
  10.  
  11.     @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1) 
  12.     public void catHandler(Cat cat) throws IOException { 
  13.         System.out.println("Annotated catHandler handles: " + cat); 
  14.  
  15.         throw new IOException("Man made exception."); 
  16.     } 
  17.  
  18.     @ErrorHandler(exception = IOException.class, topic = "test1"
  19.     public void ioExceptionHandler(IOException e, String message) { 
  20.         System.out.println("Annotated excepHandler handles: " + e); 
  21.     } 

3.通過mvn package既可以打包包含Spring Boot功能的自啟動(dòng)jar包。

4.通過java -jar kclient-processor.jar即可啟動(dòng)服務(wù)。

6.2 后臺(tái)監(jiān)控和管理

kclient模板項(xiàng)目提供了后臺(tái)管理接口來監(jiān)控和管理消息處理服務(wù)。

1.歡迎詞 - 用來校驗(yàn)服務(wù)是否啟動(dòng)成功。

curl http://localhost:8080/

2.服務(wù)狀態(tài) - 顯示處理器數(shù)量。

curl http://localhost:8080/status

3.重啟服務(wù) - 重新啟動(dòng)服務(wù)。

curl http://localhost:8080/restart

7 性能壓測(cè)

Benchmark應(yīng)該覆蓋推送QPS、接收處理QPS以及單線程、多線程生產(chǎn)者的用例。

用例1: 輕量級(jí)服務(wù)同步線程模型和異步線程模型的性能對(duì)比。

用例2: 重量級(jí)服務(wù)同步線程模型和異步線程模型的性能對(duì)比。

用例3: 重量級(jí)服務(wù)異步線程模型中所有消費(fèi)者流共享線程池和每個(gè)流獨(dú)享線程池的性能對(duì)比。

用例4: 重量級(jí)服務(wù)異步線程模型中每個(gè)流獨(dú)享線程池的對(duì)比的確定數(shù)量線程的線程池和線程數(shù)量可伸縮的線程池的性能對(duì)比。

由于筆者在發(fā)文的時(shí)候還沒有時(shí)間完成前面四種場(chǎng)景的壓測(cè),暫時(shí)留給讀者親自動(dòng)手,作為一個(gè)練習(xí)。

8 更多思考

盡管本文設(shè)計(jì)和實(shí)現(xiàn)的kclient項(xiàng)目提供了許多高級(jí)功能,并且使用起來方便,而且筆者在幾家公司里在線上進(jìn)行了應(yīng)用,已經(jīng)發(fā)揮了不少的作用,但是,還有一些細(xì)節(jié)需要提高。

kclient處理器項(xiàng)目中管理Restful服務(wù)暫時(shí)只提供了獲得狀態(tài)的API,需要進(jìn)行進(jìn)一步的豐富,增加對(duì)線程池的監(jiān)控,以及消息處理性能的監(jiān)控。

當(dāng)前注解@ErrorHandler里面的exception參數(shù)為必選,完全可以使用方法第一參數(shù)進(jìn)行推導(dǎo),簡(jiǎn)化開發(fā)人員配置的工作。

模板項(xiàng)目還不完善,需要增加啟動(dòng)和關(guān)閉腳本,這樣讀者可以直接拷貝使用。

盡管線上應(yīng)用已經(jīng)證明了kclient沒有性能問題,但是開發(fā)一款中間件系統(tǒng)是需要閉環(huán)的,需要盡快把性能壓測(cè)這塊昨晚并且形成壓測(cè)報(bào)表。

點(diǎn)擊《簡(jiǎn)單易用的消息隊(duì)列框架的設(shè)計(jì)與實(shí)現(xiàn)》閱讀原文。

【本文為51CTO專欄作者“李艷鵬”的原創(chuàng)稿件,轉(zhuǎn)載可通過作者簡(jiǎn)書號(hào)(李艷鵬)或51CTO專欄獲取聯(lián)系】

戳這里,看該作者更多好文

責(zé)任編輯:武曉燕 來源: 51CTO專欄
相關(guān)推薦

2023-09-12 14:58:00

Redis

2023-11-13 08:37:33

消息中間件分布式架構(gòu)

2023-12-30 13:47:48

Redis消息隊(duì)列機(jī)制

2024-05-07 09:02:47

2022-01-21 19:22:45

RedisList命令

2022-01-15 07:20:18

Redis List 消息隊(duì)列

2024-02-26 07:43:10

大語言模型LLM推理框架

2010-02-26 13:14:39

Java日志系統(tǒng)

2015-11-10 18:04:22

FileMaker

2024-10-16 15:11:58

消息隊(duì)列系統(tǒng)設(shè)計(jì)

2024-10-25 08:41:18

消息隊(duì)列RedisList

2021-12-16 13:04:41

消息隊(duì)列緩存

2022-09-07 21:43:34

云原生存儲(chǔ)技術(shù)消息隊(duì)列

2025-03-12 07:55:46

2022-05-14 23:49:32

Python數(shù)據(jù)計(jì)算技巧

2025-11-11 09:25:19

2025-10-20 04:00:00

2022-04-03 15:44:55

Vue.js框架設(shè)計(jì)設(shè)計(jì)與實(shí)現(xiàn)

2024-03-22 12:10:39

Redis消息隊(duì)列數(shù)據(jù)庫

2009-08-06 16:21:09

點(diǎn)對(duì)點(diǎn)消息隊(duì)列
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

国产原创一区二区三区| 亚洲一区二区三区在线| 亚洲人成亚洲精品| 久久精品人人做人人综合| 日韩欧美区一区二| 久操成人在线视频| 97人人澡人人爽| 神马伦理电影| 亚洲欧洲免费| 中文字幕一区二区三区四区 | 日韩国产高清影视| 日韩欧美高清一区| 久久综合久久久| 久草在线资源视频| 精品视频一二| ww亚洲ww在线观看国产| 亚洲国产精品字幕| a级片一区二区| 久久久久久久久免费视频| 羞羞视频在线观看欧美| 大伊人狠狠躁夜夜躁av一区| 国产一区二区黄| 国内视频一区二区| 五月婷婷在线视频| 美日韩一区二区| 亚洲精品午夜精品| 茄子视频成人免费观看| 国产精品一区二区三区四区在线观看 | 色综合桃花网| 26uuu精品一区二区| 国产一区视频在线播放| 亚洲丝袜一区| xfplay精品久久| 国产激情999| 91精品久久久久久粉嫩| 国产色产综合产在线视频| 国产精品国模在线| 女囚岛在线观看| 国产精品狼人久久影院观看方式| 国产精品免费视频一区二区| 成人三级小说| 一区二区三区视频在线看| 精品一区二区三区在线观看国产| 久久99久久99精品免观看粉嫩| 九色自拍视频在线观看| 91精品啪在线观看国产18| 亚洲欧美一区二区三区情侣bbw| 久久婷五月综合| 国产高清欧美| 中日韩美女免费视频网址在线观看 | 国产天堂素人系列在线视频| 久久99日本精品| 国产精品91在线| 欧美视频免费看| 色欧美日韩亚洲| 在线视频日韩一区| 91久久国产精品| 国产导航在线| 小嫩嫩精品导航| 亚洲国产精品黑人久久久| 99精品中文字幕在线不卡| 亚洲精品666| 极品一区美女高清| 欧美一二三在线| 116美女写真午夜一级久久| 国产91高潮流白浆在线麻豆| 人人澡人人澡人人看欧美| 香蕉久久免费电影| 欧美日韩精品福利| 午夜国产一区二区三区| 老司机一区二区三区| 国产片一区二区| 欧美性猛交xxxx黑人交| 国产盗摄视频在线观看| 成人中文字幕视频| 91国产免费观看| 老司机午夜网站| 少妇一区二区视频| 精品国产乱码久久久久久牛牛| 好男人www社区| 麻豆精品91| 国产福利精品在线| 国产精品欧美一区二区三区不卡| 一区二区三区日韩精品| 亚洲激情一区二区三区| 一道本一区二区三区| 亚洲欧洲另类国产综合| 日本午夜在线亚洲.国产| 99精产国品一二三产品香蕉| 亚洲区国产区| 国内外成人免费激情在线视频| 美女精品导航| 亚洲一级片在线观看| 福利二区91精品bt7086| 欧洲成人一区二区| 亚洲aaa级| 久热精品视频在线免费观看| h视频在线免费观看| 亚洲欧洲日韩在线| 99视频精品免费| 成年人午夜久久久| 精品日韩美女| 亚洲影视综合| 国产亚洲欧美一区二区| 日韩激情啪啪| 日韩暖暖在线视频| 欧美成人家庭影院| 久久精品美女视频网站| 深夜福利一区| 国产精品视频99| 色综合色综合| 成人网页在线免费观看| 9999久久久久| 欧美激情精品久久久久久久变态| 奇米777日韩| 欧美成人手机在线| 精品视频久久| 亚洲一区在线播放| www.av毛片| av电影在线观看不卡| 杨幂一区欧美专区| 成人综合婷婷国产精品久久 | 亚洲欧美一区二区三区极速播放| 色总=综合色| 亚洲专区一二三| 成人福利网站| 在线日韩日本国产亚洲| 国产精选久久| 日韩有码视频在线| 日韩中文字幕一区二区三区| 日韩一级免费在线观看| 日韩欧美一区二区三区久久| 天海翼一区二区三区四区在线观看| 亚洲人成精品久久久久| 亚洲 日韩 国产第一区| 欧美日本一区二区视频在线观看 | 亚洲欧美区自拍先锋| 美女做暖暖视频免费在线观看全部网址91 | 国产精品午夜久久| 人成免费电影一二三区在线观看| 亚洲免费伊人电影| 麻豆传媒视频在线观看| 日韩中文字幕视频| 久久免费大视频| 亚洲一区二区蜜桃| 国产精品久久久久婷婷| 国产三级香港三韩国三级| 色综合激情五月| 免费动漫网站在线观看| 亚洲免费视频成人| 污视频在线观看免费| 国产精品欧美一区二区三区| 福利网在线观看| 免费一级欧美片在线观看| 欧美精品成人在线| 日韩三区在线| 日韩午夜三级在线| 日韩中文在线视频| 国内精品卡一卡二卡三新区| 国产在线精品免费| 国产91精品黑色丝袜高跟鞋| 可以在线观看的av网站| 欧美亚洲一区二区在线| 深夜在线视频| 欧美最猛性xxxx| 精品成人国产| 亚洲不卡中文字幕无码| 亚洲国产精品人人做人人爽| 女囚岛在线观看| 综合网日日天干夜夜久久| 国产精品白浆| 欧洲亚洲女同hd| 欧美亚洲一区| 中国国产一级毛片| 亚洲激情免费观看| 依依综合在线| 国产国语刺激对白av不卡| 久久综合婷婷| 毛片网站在线| 蜜臀久久99精品久久久久久宅男| 亚洲情侣在线| 中文字幕一二三区在线观看| 亚洲美女精品久久| 日韩精品午夜视频| 午夜视频免费在线观看| 精品国产区一区二区三区在线观看 | 久久综合精品一区| 婷婷开心激情综合| 亚洲欧美国产精品桃花| 天天操天天干天天综合网| 美女久久久久久| 亚洲自拍偷拍福利| 久久精品一区二区三区av| 蜜桃传媒在线观看免费进入 | 欧美精品一区二区三区久久| 欧美日韩亚洲视频| 久久久久久久久丰满| 五月婷婷之综合激情| 北条麻妃99精品青青久久| 成人av资源在线|