golang基于观察者模式管理多种worker的启停
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
前言
我们的工作中有一个服务,服务中有多中worker,它们的角色各不相同
- workerA 从从上游系统接收任务(HTTP 请求),并将任务写入数据库
- workerB 把没有处理的任务扫描出来放入消息队列
- workerC 从消息队列中读取任务进行处理,处理完成把处理结果回写到数据库
在服务的入口,每种worker有不同的数量,且都需要优雅的启动停止。有没有好的编程方式?经过反复思考,我基于观察者模式给出了范例供大家探讨
要点
1. worker接口
每个worker都需要启动和停止,因此worker可以抽象的理解为必须实现Worker接口
type Worker interface{
Start()
Stop()
}
2. worker的优雅启动和退出
每1个worker都是1个协程,在所有worker退出之前,服务不能退出,要做到这一点只能使用"sync.WaitGroup"
由于worker(协程)较多,我的想法是把所有的worker统一存在一起,由一个类统一管理
type WorkerManager struct {
sync.WaitGroup
// 保存所有worker
WorkerSlice []Worker
}
WorkerManager作为观察目标,Worker作为观察者,当观察目标状态发生变化,所有的观察者都会得到通知。
- 当WorkerManager状态变化-start时,调用Worker的start方法,启动Worker
- 当WorkerManager状态变化-stop时,调用Worker的stop方法, 停止Worker
func NewWorkerManager() *WorkerManager {
workerManager := WorkerManager{}
workerManager.WorkerSlice = make([]Worker, 0, 10)
return &workerManager
}
func (wm *WorkerManager) AddWorker(w Worker) {
wm.WorkerSlice = append(wm.WorkerSlice, w)
}
func (wm *WorkerManager) Start() {
wm.Add(len(wm.WorkerSlice))
for _, worker := range wm.WorkerSlice {
go func(w Worker) {
defer func() {
err := recover() // 注意需要recover
if err != nil {
fmt.Printf("WorkerManager error, error:%v, stack:%v\n",
err, string(Stack()))
}
}()
w.Start()
}(worker)
}
}
func (wm *WorkerManager) Stop() {
for _, worker := range wm.WorkerSlice {
go func(w Worker) {
defer func() {
err := recover()
if err != nil {
fmt.Printf("WorkerManager error, error:%v, stack:%v\n",
err, string(Stack()))
}
}()
w.Stop()
wm.Done()
}(worker)
}
}
3. 完整代码示例
github 地址: vearne/worker_manager
package main
import (
"context"
"github.com/gin-gonic/gin"
manager "github.com/vearne/worker_manager"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
// 1. init some worker
wm := prepareAllWorker()
// 2. start
wm.Start()
// 3. register grace exit
GracefulExit(wm)
// 4. block and wait
wm.Wait()
}
func GracefulExit(wm *manager.WorkerManager) {
ch := make(chan os.Signal, 1)
signal.Notify(ch)
for sig := range ch {
switch sig {
case syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT:
log.Println("got a signal, execute stop", sig)
close(ch)
wm.Stop()
case syscall.SIGPIPE:
log.Println("got a signal, ignore", sig)
default:
log.Println("got a signal, default", sig)
}
}
}
func prepareAllWorker() *manager.WorkerManager {
wm := manager.NewWorkerManager()
// load worker
WorkerCount := 2
for i := 0; i < WorkerCount; i++ {
wm.AddWorker(NewLoadWorker())
}
// web server
wm.AddWorker(NewWebServer())
return wm
}
执行
可以切换到工程目录下
go build main.go
# 启动服务
./main
# 服务退出, 发出SIGTERM信号,服务优雅退出
# 请求自行替换pid的值
kill -15 <pid>
控制台输出:
2019/08/23 14:28:41 [start]LoadWorker
2019/08/23 14:28:41 [start]LoadWorker
2019/08/23 14:28:41 [start]WebServer
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.
[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
- using env: export GIN_MODE=release
- using code: gin.SetMode(gin.ReleaseMode)
[GIN-debug] GET / --> main.(*WebServer).Start.func1 (3 handlers)
2019/08/23 14:28:58 got signal
2019/08/23 14:28:58 WebServer exit...
2019/08/23 14:28:58 [end]WebServer exit
2019/08/23 14:28:58 LoadWorker exit...
2019/08/23 14:28:58 LoadWorker execute exit logic
2019/08/23 14:28:58 LoadWorker exit...
2019/08/23 14:28:58 LoadWorker execute exit logic
2019/08/23 14:28:58 [end]LoadWorker
2019/08/23 14:28:58 [end]LoadWorker
参考资料:
后记
2022年09月06日 项目一直有改动,请以github为准