国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

Go 語言微服務(wù)框架 Kratos 集成第三方庫 kafka-go 操作消息隊(duì)列 Kafka

開發(fā) 架構(gòu)
Go 語言微服務(wù)框架 Kratos 不限制使用任何第三方庫,Go 語言操作消息隊(duì)列 Kafka 有很多優(yōu)秀的第三方庫,比如 sarama 和 kafka-go,我們在之前的文章中介紹過 Go 語言怎么使用 sarama 操作消息隊(duì)列 Kafka。

1.介紹

Go 語言微服務(wù)框架 Kratos 不限制使用任何第三方庫,Go 語言操作消息隊(duì)列 Kafka 有很多優(yōu)秀的第三方庫,比如 sarama 和 kafka-go,我們在之前的文章中介紹過 Go 語言怎么使用 sarama 操作消息隊(duì)列 Kafka。

本文我們介紹 Go 微服務(wù)框架 Kratos 怎么集成第三方庫 kafka-go[1] 操作消息隊(duì)列 Kafka。

2.Kratos 集成第三方庫 kafka-go

我們在本地搭建 Go 運(yùn)行環(huán)境,并安裝 kratos 工具,使用 kratos 工具創(chuàng)建項(xiàng)目 blog。

在 blog 項(xiàng)目中,集成第三方庫 kafka-go。

創(chuàng)建項(xiàng)目

示例代碼:

kratos new blog

安裝 kafka-go

go get github.com/segmentio/kafka-go

集成 Kafka Producer(生產(chǎn)者)和 Kafka Consumer(消費(fèi)者)

編寫文件 blog/internal/data/data.go

導(dǎo)入第三方庫:

import (
 "github.com/segmentio/kafka-go"
)

添加 Kafka Producer(生產(chǎn)者)和 Kafka Consumer(消費(fèi)者):

// Data .
type Data struct {
 // TODO wrapped database client
 dbEngine *xorm.Engine
 kp       *kafkaProducer
 kc       *KafkaConsumer
}

// NewData .
func NewData(c *conf.Data, logger log.Logger, dbEngin *xorm.Engine, kp *kafkaProducer, kc *KafkaConsumer) (*Data, func(), error) {
 cleanup := func() {
  log.NewHelper(logger).Info("closing the data resources")
 }
 return &Data{
  dbEngine: dbEngin,
  kp:       kp,
  kc:       kc,
 }, cleanup, nil
}

Kafka Producer(生產(chǎn)者):

type kafkaProducer struct {
 writer *kafka.Writer
}

func NewKafkaProducer(c *conf.Data) *kafkaProducer {
 brokers := c.Kafka.Brokers
 topic := c.Kafka.Topic
 writer := &kafka.Writer{
  Addr:     kafka.TCP(brokers...),
  Topic:    topic,
  Balancer: &kafka.LeastBytes{},
 }
 return &kafkaProducer{writer: writer}
}

func (p *kafkaProducer) SendMessage(ctx context.Context, key, value []byte) error {
 err := p.writer.WriteMessages(ctx, kafka.Message{
  Key:   key,
  Value: value,
 })
 if err != nil {
  return err
 }
 return nil
}

func (p *kafkaProducer) Close() error {
 return p.writer.Close()
}

Kafka Consumer(消費(fèi)者):

type KafkaConsumer struct {
 reader *kafka.Reader
}

func NewKafkaConsumer(c *conf.Data) *KafkaConsumer {
 brokers := c.Kafka.Brokers
 topic := c.Kafka.Topic
 groupId := c.Kafka.GroupId
 reader := kafka.NewReader(kafka.ReaderConfig{
  Brokers: brokers,
  Topic:   topic,
  GroupID: groupId,
 })
 return &KafkaConsumer{
  reader: reader,
 }
}

func (c *KafkaConsumer) Start(ctx context.Context) {
 for {
  msg, err := c.reader.ReadMessage(ctx)
  if err != nil {
   return
  }
  log.Debugf("key=%s || value=%s", string(msg.Key), string(msg.Value))
 }
}

func (c *KafkaConsumer) Close() error {
 return c.reader.Close()
}

生產(chǎn) kafka 消息的方法:

創(chuàng)建文件 blog/internal/data/kafka.go。

示例代碼:

func (u *userRepository) KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error) {
 defer u.data.kp.Close()
 // 設(shè)置超時(shí)時(shí)間
 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 defer cancel()
 err = u.data.kp.SendMessage(ctx, key, value)
 if err != nil {
  log.Errorf("KafkaSendMessage() || err=%v", err)
  return
 }
 return
}

閱讀上面這段代碼,我們可以發(fā)現(xiàn) KafkaSendMessage 方法封裝了生產(chǎn) kafka 消息的方法 u.data.kp.SendMessage。

需要注意的是,我們需要設(shè)置超時(shí)時(shí)間,否則,會(huì)返回錯(cuò)誤消息 context deadline exceeded。

添加 wire 提供者:

// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewGreeterRepo, NewDbEngine, NewUserRepository, NewKafkaProducer, NewKafkaConsumer)

生成 wire 代碼:

cd blog/cmd/blog
wire

3.操作 Kafka

在 Kratos 項(xiàng)目中,一般在項(xiàng)目的 biz 或 service 層使用 Kafka 的生產(chǎn)邏輯;在 service 層使用 Kafka 的消費(fèi)邏輯。

限于篇幅,我們以 Kafka 的生產(chǎn)邏輯為例,介紹怎么在 biz 層生產(chǎn) Kafka 消息。

編寫文件 blog/internal/biz/user.go,在 CreateUser 方法中添加生產(chǎn) Kafka 消息的代碼。

type UserRepository interface {
 Create(ctx context.Context, user *User) (int64, error)
 KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error)
}

func (u *UserUsecase) CreateUser(ctx context.Context, user *User) (id int64, err error) {
 id, err = u.userRepo.Create(ctx, user)
 if err != nil {
  return
 }
 if id > 0 {
  var b []byte
  b, err = json.Marshal(user)
  if err != nil {
   return
  }
  err = u.userRepo.KafkaSendMessage(ctx, []byte(user.Name), b)
  if err != nil {
   return
  }
 }
 return
}

閱讀上面這段代碼,我們可以發(fā)現(xiàn) UserRepository 接口中的方法 KafkaSendMessage,就是我們在 blog/internal/data/kafka.go 文件中實(shí)現(xiàn)的方法。

項(xiàng)目運(yùn)行和測試:

Kratos 運(yùn)行:

kratos run

curl 請求示例:

curl -H "Content-Type: application/json" -X POST -d '{"name":"mac", "email":"mac@gmail.com", "password":"123456"}' http://192.168.110.209:8000/user/create

kafka 消費(fèi)者:

kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
{"Id":10,"Name":"mac","Email":"mac@gmail.com","Password":"123456","Created":1735972949,"Updated":1735972949}

4.總結(jié)

本文我們通過示例代碼,介紹 Kratos 微服務(wù)框架怎么集成第三方庫 kafka-go,操作 Kafka。

參考資料

[1]kafka-go: https://github.com/segmentio/kafka-go

責(zé)任編輯:武曉燕 來源: Golang語言開發(fā)棧
相關(guān)推薦

2024-12-30 00:38:23

Go語言微服務(wù)

2025-08-04 01:22:00

Go 語言微服務(wù)Kratos

2024-12-23 00:22:55

2025-01-13 00:00:07

Go語言微服務(wù)

2025-01-20 00:10:00

Go語言Kratos

2021-10-11 06:38:52

Go開源庫語言

2015-04-27 19:32:16

Moxtra

2019-07-30 11:35:54

AndroidRetrofit

2015-11-05 16:44:37

第三方登陸android源碼

2025-10-20 07:17:10

Go語言微服務(wù)

2021-09-26 10:43:08

注冊Istio集成

2021-09-13 07:23:53

KafkaGo語言

2020-06-04 07:48:08

Istio服務(wù)注冊API Server

2014-07-22 10:56:45

Android Stu第三方類庫

2021-08-03 10:07:41

鴻蒙HarmonyOS應(yīng)用

2022-08-15 23:09:53

jsonGo語言

2010-11-08 09:51:34

jQueryJavaScript

2014-07-23 08:55:42

iOSFMDB

2022-01-14 09:57:14

鴻蒙HarmonyOS應(yīng)用

2011-07-25 14:14:49

iPhone SQLITE Pldatabase
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號

青青草成人网| 黄色网址在线免费| 国产日韩欧美三区| 欧美日韩国产成人在线| 欧美a级在线观看| 欧美一卡2卡三卡4卡5免费| 亚洲欧美另类图片| 一区二区三区免费网站| 缴情综合网五月天| 国产亚洲精品aa午夜观看| 国产免费裸体视频| 麻豆久久久久久| 热re99久久精品国99热蜜月| 9色国产精品| 国产精品一区而去| 影音先锋久久| 好吊妞www.84com只有这里才有精品| 成人在线丰满少妇av| 欧美专区在线视频| 国产欧美一区二区三区精品观看 | 久久要要av| 国产在线999| 天天插综合网| 精品欧美一区二区三区久久久| 欧美日韩国产欧| 国产精品.com| 在线视频免费在线观看一区二区| 91免费看网站| 午夜一级久久| 四虎影院一区二区三区| 精品亚洲免费视频| 成年网站在线免费观看| 亚洲美女在线一区| 国产综合视频一区二区三区免费| 疯狂做受xxxx高潮欧美日本| 91精彩视频在线观看| 欧美成人女星排名| 国产极品一区| 人妖精品videosex性欧美| 不卡在线一区二区| 久久涩涩网站| 国产在线播精品第三| 国产在线青青草| 亚洲欧美另类小说视频| 撸视在线观看免费视频| 精品成人免费观看| 中文字幕日本一区| 国产精品视频资源| 美女诱惑黄网站一区| 黄色三级中文字幕| 欧美经典三级视频一区二区三区| 在线欧美成人| 亚洲国产精品成人一区二区| 亚洲三级av| 成人永久免费| 成人sese在线| 青青草免费在线视频| 亚洲欧美在线一区| 国产一区二区亚洲| av电影一区二区三区| 亚洲老司机在线| 888av在线视频| 欧美中文字幕在线播放| 日韩黄色小视频| 成人图片小说| 日韩精品一区二区三区swag | 日韩中文字幕亚洲精品欧美| 专区另类欧美日韩| 中国av在线播放| 韩国三级日本三级少妇99| 亚洲另类黄色| 中文字幕国产传媒| 日韩欧美激情四射| 九一成人免费视频| 国产免费色视频| 亚洲一区二区综合| 亚洲成人激情社区| 91精品国产一区二区三区动漫| 不卡一区二区中文字幕| 福利成人在线观看| 97精品一区二区三区| 日本在线不卡视频| 日本高清视频网站www| 亚洲丝袜在线视频| 亚洲午夜精品久久久久久app| 日韩视频在线免费看| 在线不卡免费欧美| 国产精品亚洲片在线播放| 中文字幕一区二区三区四区五区人 | 91精品国产91久久久久游泳池 | 91成人网在线观看| 亚洲欧洲偷拍精品| 在线成年人视频| 久久大片网站| 黑人极品ⅴideos精品欧美棵| 亚洲福利精品在线| www国产亚洲精品久久网站| 国产视频一区二区不卡| 欧美日韩精品一区二区三区| 高潮按摩久久久久久av免费| 两个人hd高清在线观看| 视频在线精品一区| 精品国产乱子伦一区| 欧美理论在线播放| 超碰在线公开超碰在线| 日韩av不卡电影| 欧美mv日韩mv国产网站| 99久久99久久精品国产片果冻| 日韩一区二区三免费高清在线观看| 国产精品区一区| 欧美日韩高清一区二区不卡| 精品一区二区三区日韩| 四虎地址8848精品| 三级网站在线| 精品综合在线| 国产精品xxx视频| 久久久久久18| 久久久精品国产| 色综合久久精品亚洲国产| 亚洲成人av一区二区| 国产精品久久| 亚洲一区二区三区| 一区二区精品伦理...| 向日葵污视频在线观看| 亚洲人成电影网站色www| 天堂精品中文字幕在线| 99青草视频在线播放视| 成人午夜高潮视频| 亚洲成人一区二区在线观看| 精品久久对白| 国产福利一区视频| 日韩在线视频免费观看| 岛国一区二区三区| 九七电影院97理论片久久tvb| 糖心vlog在线免费观看| 亚洲剧情一区二区| 国产福利精品一区二区| 亚洲午夜天堂| 日韩a级黄色片| 久久影院模特热| 久久免费的精品国产v∧| 亚洲伊人精品酒店| mm1313亚洲国产精品无码试看| 裸体女人亚洲精品一区| 国产午夜亚洲精品羞羞网站| ccyy激情综合| 传媒视频在线| av电影成人| 欧美精品一区二区高清在线观看| 久草精品在线观看| 亚洲欧美综合久久久久久v动漫| 91淫黄看大片| 国产精品久久久久99| 欧美日韩在线播| 国产精品538一区二区在线| 51亚洲精品| 欧美女同网站| 亚洲欧美综合一区| 日韩在线小视频| 亚洲精品国产第一综合99久久| 欧美wwwww| 678在线观看视频| 美女一区二区三区视频| 91精品啪在线观看麻豆免费| 欧美日韩一级二级三级| 精品在线一区二区| 国内精品国产成人国产三级粉色| av在线电影网站| 美女主播视频一区| 日韩在线观看网站| 亚洲自拍另类综合| 国产情侣一区| 成人看片在线观看| 黄色仓库视频网站| 鲁丝片一区二区三区| 俺去啦;欧美日韩| 欧美日韩国产专区| 国内精品写真在线观看 | 国产午夜久久av| 成人福利视频在| 欧美12av| 久久久久久久一| 欧美日韩激情一区| 97se狠狠狠综合亚洲狠狠| 91精品国产麻豆国产在线观看| 91吃瓜在线观看| 国产精品粉嫩av| 亚洲精品国产系列| 欧美在线观看一区二区三区| 538在线一区二区精品国产| 久久久国产午夜精品| 欧美日韩国产在线一区| 亚洲人体在线| av小片在线| 成人看片app| 视频一区视频二区视频三区高| 91国产视频在线播放| 亚洲电影免费观看高清完整版在线| 自拍偷拍欧美激情| 国产精品18久久久久久久网站|