国产精品电影_久久视频免费_欧美日韩国产激情_成年人视频免费在线播放_日本久久亚洲电影_久久都是精品_66av99_九色精品美女在线_蜜臀a∨国产成人精品_冲田杏梨av在线_欧美精品在线一区二区三区_麻豆mv在线看

用 Go 實現一個支持任務下發的后臺服務:Gin + Machinery v2 完整實戰

開發 后端
本文將帶你一步一步使用Go語言,結合Gin框架與Machinery v2實現一個完整的 “任務下發服務”,并支持通過REST API發起任務。

在日常開發當中,我們經常希望通過消息通信機制來異步執行任務,例如發郵件、生成報表、風控計算等。這種場景中,使用“任務隊列”框架來解耦主業務流程是一種最佳實踐。

本文將帶你一步一步使用Go語言,結合Gin框架與Machinery v2實現一個完整的 “任務下發服務”,并支持通過REST API發起任務。廢話不多說,開始今天的內容吧,let's Go!!!

核心目標

項目開始前我們先設定一個小目標,具體項如下所示:

  • 使用 Gin 提供一個 HTTP 接口,用于接收任務參數
  • 使用 Machinery v2 執行后臺任務(通過 Redis 通信)
  • 使用消息隊列解耦 API 層與實際執行邏輯

整體流程

用戶請求 -> Gin Server -> Redis Machinery -> Worker

用戶通過HTTP請求提交任務參數,Gin服務將任務發送到Machinery任務隊列中,后續由Worker異步消費任務并執行。

準備工作

(1) 安裝 Redis

本地環境可以直接通過 Docker 運行:

docker run -d --name redis -p 6379:6379 redis

(2) 初始化 Go 項目

go mod init machinery-gin
go get github.com/gin-gonic/gin
go get github.com/RichardKnop/machinery/v2

(3) 創建對應目錄和代碼文件

?  machinery-gin tree .
.
├── cmd
│   ├── api
│   │   └── main.go
│   └── worker
│       └── main.go
├── config
│   └── config.go
├── controller
│   └── task_controller.go
├── go.mod
├── go.sum
├── router
│   └── router.go
├── scheduler
│   └── manager.go
├── service
│   └── task_service.go
└── tasks
    ├── handler.go
    └── registry.go


10 directories, 11 files

核心模塊代碼實現

(1) config/config.go

package config


import "github.com/RichardKnop/machinery/v2/config"


func GetMachineryConfig() *config.Config {
	return &config.Config{
		Broker:        "redis://localhost:6379",
		DefaultQueue:  "machinery_tasks",
		ResultBackend: "redis://localhost:6379",
	}
}

(2) tasks/handler.go

package tasks


import (
	"fmt"
	"time"
)


func PrintMessage(msg string) error {
	fmt.Printf("?? Task Received: %s at %s\n", msg, time.Now().Format(time.RFC3339))
	return nil
}

(3) tasks/registry.go

package tasks


import "github.com/RichardKnop/machinery/v2"


func RegisterTasks(server *machinery.Server) error {
	return server.RegisterTasks(map[string]interface{}{
		"print_message": PrintMessage,
	})
}

(4) scheduler/manager.go

package scheduler


import (
	"sync"
	"time"


	"github.com/RichardKnop/machinery/v2"
	"github.com/RichardKnop/machinery/v2/tasks"
)


type ScheduledTask struct {
	Name     string
	Interval time.Duration
	Msg      string
	Paused   bool
	StopChan chan struct{}
}


var (
	tasksMap = make(map[string]*ScheduledTask)
	mu       sync.Mutex
)


func AddScheduledTask(server *machinery.Server, name, msg string, interval time.Duration) {
	mu.Lock()
	defer mu.Unlock()


	if _, exists := tasksMap[name]; exists {
		return
	}


	t := &ScheduledTask{
		Name:     name,
		Interval: interval,
		Msg:      msg,
		StopChan: make(chan struct{}),
	}
	tasksMap[name] = t


	go func(task *ScheduledTask) {
		ticker := time.NewTicker(task.Interval)
		defer ticker.Stop()


		for {
			select {
			case <-ticker.C:
				if !task.Paused {
					signature := &tasks.Signature{
						Name: "print_message",
						Args: []tasks.Arg{
							{Type: "string", Value: task.Msg},
						},
					}
					server.SendTask(signature)
				}
			case <-task.StopChan:
				return
			}
		}
	}(t)
}


func PauseTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = true
	}
}


func ResumeTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		task.Paused = false
	}
}


func StopTask(name string) {
	mu.Lock()
	defer mu.Unlock()
	if task, ok := tasksMap[name]; ok {
		close(task.StopChan)
		delete(tasksMap, name)
	}
}

(5) service/task_service.go

package service


import (
	"time"


	"github.com/RichardKnop/machinery/v2"
	"machinery-gin/scheduler"
)


func ScheduleNewTask(server *machinery.Server, name, msg string, intervalSec int) {
	scheduler.AddScheduledTask(server, name, msg, time.Duration(intervalSec)*time.Second)
}


func PauseScheduledTask(name string) {
	scheduler.PauseTask(name)
}


func ResumeScheduledTask(name string) {
	scheduler.ResumeTask(name)
}


func StopScheduledTask(name string) {
	scheduler.StopTask(name)
}

(6) controller/task_controller.go

package controller


import (
	"net/http"


	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/service"
)


func TaskHandler(server *machinery.Server) gin.HandlerFunc {
	return func(c *gin.Context) {
		var req struct {
			Name     string `json:"name"`
			Interval int    `json:"interval"`
			Msg      string `json:"msg"`
		}


		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
			return
		}


		service.ScheduleNewTask(server, req.Name, req.Msg, req.Interval)
		c.JSON(http.StatusOK, gin.H{"status": "task scheduled"})
	}
}


func PauseHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.PauseScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "paused"})
	}
}


func ResumeHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.ResumeScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "resumed"})
	}
}


func StopHandler() gin.HandlerFunc {
	return func(c *gin.Context) {
		name := c.Query("name")
		service.StopScheduledTask(name)
		c.JSON(http.StatusOK, gin.H{"status": "stopped"})
	}
}

(7) router/router.go

package routes


import (
	"github.com/RichardKnop/machinery/v2"
	"github.com/gin-gonic/gin"
	"machinery-gin/controller"
)


func SetupRouter(server *machinery.Server) *gin.Engine {
	r := gin.Default()


	r.POST("/task/start", controller.TaskHandler(server))
	r.POST("/task/pause", controller.PauseHandler())
	r.POST("/task/resume", controller.ResumeHandler())
	r.POST("/task/stop", controller.StopHandler())


	return r
}

(8) cmd/api/main.go (啟動Gin API服務)

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/router"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()


	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)


	_ = tasks.RegisterTasks(machineryServer)
	r := routes.SetupRouter(machineryServer)
	r.Run(":9311")
}

(9) cmd/worker/main.go (啟動Worker消費者)

package main


import (
	server "github.com/RichardKnop/machinery/v2"
	redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
	redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
	eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
	"machinery-gin/config"
	"machinery-gin/tasks"
)


func main() {
	cfg := config.GetMachineryConfig()
	broker := redisbroker.New(cfg, "localhost:6379", "", "", 0)
	backend := redisbackend.New(cfg, "localhost:6379", "", "", 0)
	lock := eagerlock.New()
	machineryServer := server.NewServer(cfg, broker, backend, lock)
	_ = tasks.RegisterTasks(machineryServer)


	worker := machineryServer.NewWorker("worker_name", 10)
	_ = worker.Launch()
}

測試程序

啟動 API 服務和 Worker:

go run cmd/api/main.go
go run cmd/worker/main.go

測試命令如下所示:

?  ~ curl -X POST http://localhost:9311/task/start -H 'Content-Type: application/json' -d '{
  "name": "hello-task",
  "interval": 5,
  "msg": "Hello from Machinery"
}'
{"status":"task scheduled"}%                                                       
 ?  ~ curl -X POST http://localhost:9311/task/pause\?name\=hello-task
{"status":"paused"}%
?  ~ curl -X POST http://localhost:9311/task/resume\?name\=hello-task
{"status":"resumed"}%                                                                 
?  ~ curl -X POST http://localhost:9311/task/stop\?name\=hello-task
{"status":"stopped"}%

測試結果如下所示:

總結

我們已經實現了任務的“下發與執行”,“暫停/恢復”后續可以進一步擴展:

  • 支持任務列表,任務詳情
  • 支持“周期定時任務(調度器)”
  • 支持任務執行狀態查詢/UI管理面板
責任編輯:趙寧寧 來源: 馬嘍編程筆記
相關推薦

2025-09-15 08:49:44

GoJSONAPI

2022-05-22 13:55:30

Go 語言

2025-05-20 09:39:57

GogRPC微服務

2024-01-08 08:36:29

HTTPGo代理服務器

2024-01-02 13:58:04

GoREST API語言

2024-05-10 08:47:22

標準庫v2Go

2024-03-15 15:20:10

并發服務IP

2025-03-06 08:54:24

泛型類型MapGo1

2010-08-05 17:00:04

RIP V2協議

2010-08-06 14:07:21

RIP V2

2014-04-14 15:54:00

print()Web服務器

2022-03-06 19:57:50

狀態機easyfsm項目

2023-05-10 08:05:41

GoWeb應用

2021-09-27 09:55:06

Chrome瀏覽器Manifest V2

2012-04-24 18:10:56

華為E5

2017-05-08 15:00:20

H5代碼服務器

2020-07-03 10:21:48

Go框架Docker

2021-08-23 15:14:09

Linuxat命令任務

2023-02-26 01:37:57

goORM代碼

2023-03-01 09:39:40

調度系統
點贊
收藏

51CTO技術棧公眾號

国产区亚洲区欧美区| 中文字幕第21页| 亚洲人123区| 国产v综合v| 欧美日韩一区二| 五月激情六月综合| 欧美午夜寂寞| 日本特黄a级片| 欧美激情久久久久久| 国产91精品一区二区| 亚洲奶水xxxx哺乳期| 动漫精品视频| 欧美日韩亚洲综合| 精品二区视频| 日韩大胆人体| 国产在线一区二区三区| 亚洲成人777| 欧美区一区二| 97caopor国产在线视频| 亚洲春色在线视频| 国产一区二区av| 国产不卡在线播放| 成人短视频软件网站大全app| 国产老熟妇精品观看| 久久夜精品va视频免费观看| 久草精品在线观看| 日本欧美在线| 天堂中文字幕| 日本一区二区三区视频在线观看| 日韩精品在线观看网站| 亚洲免费激情| 日本www在线| 久久国产精品视频在线观看| 韩剧1988在线观看免费完整版| 亚洲天天做日日做天天谢日日欢 | 中文字幕av一区 二区| 成人在线视频国产| 日本高清好狼色视频| 亚洲在线一区二区| 亚洲国产日韩欧美在线99| 26uuu另类欧美亚洲曰本| 91成人网在线观看| 中文日本在线观看| 日韩视频在线视频| 久久久国产精品一区| 一区二区三区在线免费播放| 亚洲在线观看| 国产福利一区二区精品秒拍| 阿v免费在线观看| aa免费在线观看| 日本电影亚洲天堂| 自拍偷拍亚洲欧美| 日韩精品一区二区三区中文不卡| 成人毛片老司机大片| 午夜欧美视频| 久久久久高潮毛片免费全部播放| 邻居大乳一区二区三区| 少妇无码av无码专区在线观看| 成人av资源在线播放| 欧美成人激情视频| 亚洲精品狠狠操| 懂色av影视一区二区三区| 久久精品一区二区| 国产综合久久久久久鬼色| 婷婷亚洲综合| 免费av一区二区三区四区| 成人97精品毛片免费看| 欧美日韩大片| 国精产品一区二区三区有限公司| 亚乱亚乱亚洲乱妇| 中文在线www| 制服丝袜影音| 麻豆av免费在线| 国产精品久久久久不卡| 亚洲欧洲午夜一线一品| 91黄视频在线观看| 久久久久久9999| 亚洲一区中文| 亚洲乱码久久| 久久免费大视频| 要久久爱电视剧全集完整观看| 麻豆影院在线观看| 午夜在线视频播放| 在线免费观看视频黄| 精品第一国产综合精品aⅴ| 91麻豆精品视频| 久久久久国产精品免费免费搜索| 国产精品欧美一级免费| 亚洲男人天堂av网| 另类调教123区| 麻豆视频观看网址久久| 欧美日韩第一区| 亚洲一区二区三区高清| 日韩高清一区在线| 国产在线精品一区二区三区不卡| 丰满放荡岳乱妇91ww| 久久性感美女视频| 国内精品嫩模av私拍在线观看 | 91插插插插插插插插| 亚洲精品一区视频| www.久久久久.com| 日韩脚交footjobhd| 国产极品嫩模在线观看91精品| 成人污版视频| 成人精品久久| 久久精品男女| 成人综合在线网站| 亚洲人精品午夜| 精品国产髙清在线看国产毛片| 在线观看日韩欧美| 成人免费看吃奶视频网站| 欧美亚州一区二区三区| 鲁丝一区二区三区免费| 国产乱子夫妻xx黑人xyx真爽 | 国产精品久久久久久久久粉嫩av | 欧美精品在线观看| 亚洲精品欧美日韩专区| 日韩中文字幕在线免费| www.av在线| 高h视频在线观看| 麻豆tv入口在线看| 久久99国内| 91蜜桃在线免费视频| 日韩一级片网站| 亲爱的老师9免费观看全集电视剧| 国产一区在线免费| 久久久久免费精品| 国产对白在线正在播放| 99久久亚洲国产日韩美女| 国产欧美不卡| 久久久久久久一区| 亚洲天堂色网站| 亚洲一区二区三区精品在线观看| 调教视频vk| 深夜视频一区二区| 国产在线精品一区二区| 在线免费观看日本欧美| 久久综合国产精品| 欧美高清激情brazzers| 91人成网站www| 久久久精品有限公司| 导航艳情国产电影| 久久香蕉网站| 精品亚洲成a人在线观看| 欧美色视频日本高清在线观看| 欧美激情视频网址| 国产欧美一区二区三区四区| 欧美不卡在线播放| 男人av在线| 天天色综合色| aaa国产一区| 久久成人精品视频| av免费播放网址| 日韩欧美久久| 国产成人av一区二区三区在线 | 欧美性bbwbbwbbwhd| 牛牛澡牛牛爽一区二区| 久久久久久久久久久9不雅视频| 亚洲国产另类精品专区| 国产日韩欧美日韩大片| 人成网站免费观看| 99精品小视频| 欧美一区二区三区视频| 国产va免费精品高清在线| 怡红院av亚洲一区二区三区h| 黄色在线观看www| 91麻豆精品在线观看| 欧美孕妇性xx| 在线观看免费毛片| 欧美hd在线| 精品久久一二三区| 国产精品igao激情视频| www.国产精品| 九九视频精品免费| 欧美日韩aaaaa| 吴梦梦av在线| 欧美亚洲韩国| 亚洲国产美女搞黄色| 国产欧美一区二区三区不卡高清| 中文字幕在线观看日本| 国产精品videosex极品| 337p日本欧洲亚洲大胆精品| 日本一道在线观看| 精品影片在线观看的网站| 日韩久久免费av| 高清中文字幕在线| 国产欧美高清视频在线| 日本道色综合久久| 九九九九免费视频| 亚洲不卡av不卡一区二区| 欧美日韩国产影院| 精品欧美一区免费观看α√| 欧美一区自拍| 一个人www欧美| 宅男午夜在线| 成人资源在线| 亚洲一二三级电影| 国产精品久久久久久久免费大片| yellow91字幕网在线| 久久久久久久久一|