Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

前言

我们的工作中有一个服务,服务中有多中worker,它们的角色各不相同

  • workerA 从从上游系统接收任务(HTTP 请求),并将任务写入数据库
  • workerB 把没有处理的任务扫描出来放入消息队列
  • workerC 从消息队列中读取任务进行处理,处理完成把处理结果回写到数据库

在服务的入口,每种worker有不同的数量,且都需要优雅的启动停止。有没有好的编程方式?经过反复思考,我基于观察者模式给出了范例供大家探讨

要点

1. worker接口

每个worker都需要启动和停止,因此worker可以抽象的理解为必须实现Worker接口

type Worker interface{
    Start()
    Stop()
}

2. worker的优雅启动和退出

每1个worker都是1个协程,在所有worker退出之前,服务不能退出,要做到这一点只能使用"sync.WaitGroup"
由于worker(协程)较多,我的想法是把所有的worker统一存在一起,由一个类统一管理

type  WorkerManager struct {
    sync.WaitGroup
    // 保存所有worker
    WorkerSlice []Worker
}
WorkerManager作为观察目标,Worker作为观察者,当观察目标状态发生变化,所有的观察者都会得到通知。
- 当WorkerManager状态变化-start时,调用Worker的start方法,启动Worker
- 当WorkerManager状态变化-stop时,调用Worker的stop方法, 停止Worker



func NewWorkerManager() *WorkerManager {
    workerManager := WorkerManager{}
    workerManager.WorkerSlice = make([]Worker, 0, 10)
    return &workerManager
}

func (wm *WorkerManager) AddWorker(w Worker) {
    wm.WorkerSlice = append(wm.WorkerSlice, w)
}

func (wm *WorkerManager) Start() {
    wm.Add(len(wm.WorkerSlice))
    for _, worker := range wm.WorkerSlice {
        go func(w Worker) {
            defer func() {
                err := recover() // 注意需要recover
                if err != nil {
                    fmt.Printf("WorkerManager error, error:%v, stack:%v\n",
                        err, string(Stack()))
                }
            }()
            w.Start()
        }(worker)
    }
}

func (wm *WorkerManager) Stop() {
    for _, worker := range wm.WorkerSlice {
        go func(w Worker) {
            defer func() {
                err := recover()
                if err != nil {
                    fmt.Printf("WorkerManager error, error:%v, stack:%v\n",
                        err, string(Stack()))
                }
            }()

            w.Stop()
            wm.Done()
        }(worker)
    }
}

3. 完整代码示例

github 地址: vearne/worker_manager

package main

import (
    "context"
    "github.com/gin-gonic/gin"
    manager "github.com/vearne/worker_manager"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // 1. init some worker
    wm := prepareAllWorker()

    // 2. start
    wm.Start()

    // 3. register grace exit
    GracefulExit(wm)

    // 4. block and wait
    wm.Wait()
}

func GracefulExit(wm *manager.WorkerManager) {
    ch := make(chan os.Signal, 1)
    signal.Notify(ch)
    for sig := range ch {
        switch sig {
        case syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT:
            log.Println("got a signal, execute stop", sig)
            close(ch)
            wm.Stop()
        case syscall.SIGPIPE:
            log.Println("got a signal, ignore", sig)
        default:
            log.Println("got a signal, default", sig)
        }
    }
}

func prepareAllWorker() *manager.WorkerManager {
    wm := manager.NewWorkerManager()
    // load worker
    WorkerCount := 2
    for i := 0; i < WorkerCount; i++ {
        wm.AddWorker(NewLoadWorker())
    }
    // web server
    wm.AddWorker(NewWebServer())

    return wm
}

执行

可以切换到工程目录下

go build main.go
# 启动服务
./main
# 服务退出, 发出SIGTERM信号,服务优雅退出
# 请求自行替换pid的值
kill -15 <pid>

控制台输出:

2019/08/23 14:28:41 [start]LoadWorker
2019/08/23 14:28:41 [start]LoadWorker
2019/08/23 14:28:41 [start]WebServer
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.

[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /                         --> main.(*WebServer).Start.func1 (3 handlers)
2019/08/23 14:28:58 got signal
2019/08/23 14:28:58 WebServer exit...
2019/08/23 14:28:58 [end]WebServer exit
2019/08/23 14:28:58 LoadWorker exit...
2019/08/23 14:28:58 LoadWorker execute exit logic
2019/08/23 14:28:58 LoadWorker exit...
2019/08/23 14:28:58 LoadWorker execute exit logic
2019/08/23 14:28:58 [end]LoadWorker
2019/08/23 14:28:58 [end]LoadWorker

参考资料:

  1. 观察者模式

后记

2022年09月06日 项目一直有改动,请以github为准


请我喝瓶饮料

微信支付码