版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

前言

我们的工作中有一个服务,服务中有多中worker,它们的角色各不相同

  • workerA 从从上游系统接收任务(HTTP 请求),并将任务写入数据库
  • workerB 把没有处理的任务扫描出来放入消息队列
  • workerC 从消息队列中读取任务进行处理,处理完成把处理结果回写到数据库

在服务的入口,每种worker有不同的数量,且都需要优雅的启动停止。有没有好的编程方式?经过反复思考,我基于观察者模式给出了范例供大家探讨

要点

1. worker接口

每个worker都需要启动和停止,因此worker可以抽象的理解为必须实现Worker接口

type Worker interface{
	Start()
	Stop()
}

2. worker的优雅启动和退出

每1个worker都是1个协程,在所有worker退出之前,服务不能退出,要做到这一点只能使用"sync.WaitGroup" 由于worker(协程)较多,我的想法是把所有的worker统一存在一起,由一个类统一管理

type  WorkerManager struct {
	sync.WaitGroup
	// 保存所有worker
	WorkerSlice []Worker
}
WorkerManager作为观察目标,Worker作为观察者,当观察目标状态发生变化,所有的观察者都会得到通知。
- 当WorkerManager状态变化-start时,调用Worker的start方法,启动Worker
- 当WorkerManager状态变化-stop时,调用Worker的stop方法, 停止Worker



func NewWorkerManager() *WorkerManager {
	workerManager := WorkerManager{}
	workerManager.WorkerSlice = make([]Worker, 0, 10)
	return &workerManager
}

func (wm *WorkerManager) AddWorker(w Worker) {
	wm.WorkerSlice = append(wm.WorkerSlice, w)
}

func (wm *WorkerManager) Start() {
	wm.Add(len(wm.WorkerSlice))
	for _, worker := range wm.WorkerSlice {
		go func(w Worker) {
			defer func() {
				err := recover() // 注意需要recover
				if err != nil {
					fmt.Printf("WorkerManager error, error:%v, stack:%v\n",
						err, string(Stack()))
				}
			}()
			w.Start()
		}(worker)
	}
}

func (wm *WorkerManager) Stop() {
	for _, worker := range wm.WorkerSlice {
		go func(w Worker) {
			defer func() {
				err := recover()
				if err != nil {
					fmt.Printf("WorkerManager error, error:%v, stack:%v\n",
						err, string(Stack()))
				}
			}()

			w.Stop()
			wm.Done()
		}(worker)
	}
}

3. 完整代码示例

github 地址: vearne/worker_manager

package main

import (
	"context"
	"github.com/gin-gonic/gin"
	manager "github.com/vearne/worker_manager"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	// 1. init some worker
	wm := prepareAllWorker()

	// 2. start
	wm.Start()

	// 3. register grace exit
	GracefulExit(wm)

	// 4. block and wait
	wm.Wait()
}

func GracefulExit(wm *manager.WorkerManager) {
	ch := make(chan os.Signal, 1)
	signal.Notify(ch)
	for sig := range ch {
		switch sig {
		case syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT:
			log.Println("got a signal, execute stop", sig)
			close(ch)
			wm.Stop()
		case syscall.SIGPIPE:
			log.Println("got a signal, ignore", sig)
		default:
			log.Println("got a signal, default", sig)
		}
	}
}

func prepareAllWorker() *manager.WorkerManager {
	wm := manager.NewWorkerManager()
	// load worker
	WorkerCount := 2
	for i := 0; i < WorkerCount; i++ {
		wm.AddWorker(NewLoadWorker())
	}
	// web server
	wm.AddWorker(NewWebServer())

	return wm
}

执行

可以切换到工程目录下

go build main.go
# 启动服务
./main
# 服务退出, 发出SIGTERM信号,服务优雅退出
# 请求自行替换pid的值
kill -15 <pid>

控制台输出:

2019/08/23 14:28:41 [start]LoadWorker
2019/08/23 14:28:41 [start]LoadWorker
2019/08/23 14:28:41 [start]WebServer
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.

[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:	export GIN_MODE=release
 - using code:	gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /                         --> main.(*WebServer).Start.func1 (3 handlers)
2019/08/23 14:28:58 got signal
2019/08/23 14:28:58 WebServer exit...
2019/08/23 14:28:58 [end]WebServer exit
2019/08/23 14:28:58 LoadWorker exit...
2019/08/23 14:28:58 LoadWorker execute exit logic
2019/08/23 14:28:58 LoadWorker exit...
2019/08/23 14:28:58 LoadWorker execute exit logic
2019/08/23 14:28:58 [end]LoadWorker
2019/08/23 14:28:58 [end]LoadWorker

参考资料:

  1. 观察者模式

后记

2022年09月06日 项目一直有改动,请以github为准


请我喝瓶饮料

微信支付码