Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | http://vearne.cc

前言

我们的工作中有一个服务,服务中有多中worker,它们的角色各不相同

  • workerA 从从上游系统接收任务(HTTP 请求),并将任务写入数据库
  • workerB 把没有处理的任务扫描出来放入消息队列
  • workerC 从消息队列中读取任务进行处理,处理完成把处理结果回写到数据库

在服务的入口,每种worker有不同的数量,且都需要优雅的启动停止。有没有好的编程方式?经过反复思考,我基于观察者模式给出了范例供大家探讨

要点

1. worker接口

每个worker都需要启动和停止,因此worker可以抽象的理解为必须实现Worker接口

type Worker interface{
    Start()
    Stop()
}

2. worker的优雅启动和退出

每1个worker都是1个协程,在所有worker退出之前,服务不能退出,要做到这一点只能使用”sync.WaitGroup”
由于worker(协程)较多,我的想法是把所有的worker统一存在一起,由一个类统一管理

type  WorkerManager struct {
    sync.WaitGroup
    // 保存所有worker
    WorkerSlice []Worker
}
WorkerManager作为观察目标,Worker作为观察者,当观察目标状态发生变化,所有的观察者都会得到通知。
- 当WorkerManager状态变化-start时,调用Worker的start方法,启动Worker
- 当WorkerManager状态变化-stop时,调用Worker的stop方法, 停止Worker



func NewWorkerManager() *WorkerManager {
    workerManager := WorkerManager{}
    workerManager.WorkerSlice = make([]Worker, 0, 10)
    return &workerManager
}

func (wm *WorkerManager) AddWorker(w Worker) {
    wm.WorkerSlice = append(wm.WorkerSlice, w)
}

func (wm *WorkerManager) Start() {
    wm.Add(len(wm.WorkerSlice))
    for _, worker := range wm.WorkerSlice {
        go func(w Worker) {
            defer func() {
                err := recover() // 注意需要recover
                if err != nil {
                    fmt.Printf("WorkerManager error, error:%v, stack:%v\n",
                        err, string(Stack()))
                }
            }()
            w.Start()
        }(worker)
    }
}

func (wm *WorkerManager) Stop() {
    for _, worker := range wm.WorkerSlice {
        go func(w Worker) {
            defer func() {
                err := recover()
                if err != nil {
                    fmt.Printf("WorkerManager error, error:%v, stack:%v\n",
                        err, string(Stack()))
                }
            }()

            w.Stop()
            wm.Done()
        }(worker)
    }
}

3. 完整代码示例

github 地址: https://github.com/vearne/example/blob/master/golang/worker_manage/main.go

package main

import (
    "context"
    "github.com/gin-gonic/gin"
    manager "github.com/vearne/worker_manager"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // 1. init some worker
    wm := prepareAllWorker()

    // 2. start
    wm.Start()

    // 3. register grace exit
    GracefulExit(wm)

    // 4. block and wait
    wm.Wait()
}

func GracefulExit(wm *manager.WorkerManager) {
    ch := make(chan os.Signal, 1)
    signal.Notify(ch)
    for sig := range ch {
        switch sig {
        case syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT:
            log.Println("got a signal, execute stop", sig)
            close(ch)
            wm.Stop()
        case syscall.SIGPIPE:
            log.Println("got a signal, ignore", sig)
        default:
            log.Println("got a signal, default", sig)
        }
    }
}

func prepareAllWorker() *manager.WorkerManager {
    wm := manager.NewWorkerManager()
    // load worker
    WorkerCount := 2
    for i := 0; i < WorkerCount; i++ {
        wm.AddWorker(NewLoadWorker())
    }
    // web server
    wm.AddWorker(NewWebServer())

    return wm
}

执行

可以切换到工程目录下

go build main.go
# 启动服务
./main
# 服务退出, 发出SIGTERM信号,服务优雅退出
# 请求自行替换pid的值
kill -15 <pid>

控制台输出:

2019/08/23 14:28:41 [start]LoadWorker
2019/08/23 14:28:41 [start]LoadWorker
2019/08/23 14:28:41 [start]WebServer
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.

[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /                         --> main.(*WebServer).Start.func1 (3 handlers)
2019/08/23 14:28:58 got signal
2019/08/23 14:28:58 WebServer exit...
2019/08/23 14:28:58 [end]WebServer exit
2019/08/23 14:28:58 LoadWorker exit...
2019/08/23 14:28:58 LoadWorker execute exit logic
2019/08/23 14:28:58 LoadWorker exit...
2019/08/23 14:28:58 LoadWorker execute exit logic
2019/08/23 14:28:58 [end]LoadWorker
2019/08/23 14:28:58 [end]LoadWorker

参考资料:

  1. 观察者模式

如果我的文章对你有帮助,你可以给我打赏以促使我拿出更多的时间和精力来分享我的经验和思考总结。

微信支付码