golang基于观察者模式管理多种worker的启停

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 我们的工作中有一个服务,服务中有多中worker,它们的角色各不相同 workerA 从从上游系统接收任务(HTTP 请求),并将任务写入数据库 workerB 把没有处理的任务扫描出来放入消息队列 workerC 从消息队列中读取任务进行处理,处理完成把处理结果回写到数据库 在服务的入口,每种worker有不同的数量,且都需要优雅的启动停止。有没有好的编程方式?经过反复思考,我基于观察者模式给出了范例供大家探讨 要点 1. worker接口 每个worker都需要启动和停止,因此worker可以抽象的理解为必须实现Worker接口 type Worker interface{ Start() Stop() } 2. worker的优雅启动和退出 每1个worker都是1个协程,在所有worker退出之前,服务不能退出,要做到这一点只能使用"sync.WaitGroup" 由于worker(协程)较多,我的想法是把所有的worker统一存在一起,由一个类统一管理 type WorkerManager struct { sync.WaitGroup // 保存所有worker WorkerSlice []Worker } WorkerManager作为观察目标,Worker作为观察者,当观察目标状态发生变化,所有的观察者都会得到通知。 - 当WorkerManager状态变化-start时,调用Worker的start方法,启动Worker - 当WorkerManager状态变化-stop时,调用Worker的stop方法, 停止Worker func NewWorkerManager() *WorkerManager { workerManager := WorkerManager{} workerManager.WorkerSlice = make([]Worker, 0, 10) return &workerManager } func (wm *WorkerManager) AddWorker(w Worker) { wm.WorkerSlice = append(wm.WorkerSlice, w) } func (wm *WorkerManager) Start() { wm.Add(len(wm.WorkerSlice)) for _, worker := range wm.WorkerSlice { go func(w Worker) { defer func() { err := recover() // 注意需要recover if err != nil { fmt.Printf("WorkerManager error, error:%v, stack:%v\n", err, string(Stack())) } }() w.Start() }(worker) } } func (wm *WorkerManager) Stop() { for _, worker := range wm.WorkerSlice { go func(w Worker) { defer func() { err := recover() if err != nil { fmt.Printf("WorkerManager error, error:%v, stack:%v\n", err, string(Stack())) } }() w.Stop() wm.Done() }(worker) } } 3. 完整代码示例 github 地址: vearne/worker_manager ...

March 7, 2018 · 2 min