手撸了一个Golang协程池

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 传送门: vearne/executor 0. 引言 本项目受到Java Executors的启发 Executors,并参考了ExecutorService的接口设计。 支持取消单个任务,或者取消整个集群的任务。 并提供了一个可以动态扩容和缩容的实现DynamicGPool。 欢迎提PR。下面是更详细的说明: 1. 特性 支持取消单个任务,也可以取消协程池上的所有任务 Future.Cancel() ExecutorService.Cancel() 你也可以使用context.Context取消task或者pool。 可以创建多种不同类型的协程池(SingleGPool|FixedGPool|DynamicGPool) 2. 多种类型的协程池 类别 说明 备注 SingleGPool 单个worker协程池 FixedGPool 固定数量worker协程池 DynamicGPool worker数量可以动态变化的协程池 min:最少协程数量 max:最大协程数量 2.1 SingleGPool NewSingleGPool(ctx context.Context, opts ...option) ExecutorService 2.2 FixedGPool NewFixedGPool(ctx context.Context, size int, opts ...option) ExecutorService 2.3 DynamicGPool NewDynamicGPool(ctx context.Context, min int, max int, opts ...dynamicOption) ExecutorService 2.3.1 扩容规则 提交任务时,如果任务队列已经满了,则尝试增加worker去执行任务。 2.3.2 缩容规则: 条件: 如果处于忙碌状态的worker少于worker总数的1/4,则认为满足条件 执行meetCondNum次连续检测,每次间隔detectInterval。如果每次都满足条件,触发缩容。 缩容动作尝试减少一半的worker 3. 注意 由于executor使用了channel作为作为任务队列,所以提交任务时,可能会发生阻塞。 ...

September 26, 2022 · 2 min

简单的GOLANG 协程池2 (带Cancel功能)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 有些朋友可能看我的这篇文章 简单的GOLANG 协程池 这是我做的简单协程池,功能还算完备,但是有些任务执行的时间很长,如果想在协程池运行起来以后就退出,就只能死等。这是非常不友好的,因此我又写了一个新的协程池GContextPool,利用context支持在任务运行过程中,结束任务。原来的GPool仍然可以继续使用。 下面我们来看看用法。 1. 安装 go get github.com/vearne/golib/utils 2. 使用 2.1 创建协程池 // 设定协程池的中协程的数量是30个 cxt, cancel := context.WithTimeout(context.Background(), 1 * time.Second) defer cancel() var p *utils.GContextPool = utils.NewGContextPool(ctx, 30) 2.2 定义任务处理函数 任务处理函数形如 type JobContextFunc func(ctx context.Context, key interface{}) *GPResult 执行任务 GContextPool.ApplyAsync(f JobContextFunc, slice []interface{}) f JobContextFunc 是目标函数 slice []interface{} 任务参数列表 示例 ctx_pool.go package main import ( "context" "fmt" "github.com/vearne/golib/utils" "log" "strconv" "time" ) func JudgeStrWithContext2(ctx context.Context, key interface{}) *utils.GPResult { num, _ := strconv.Atoi(key.(string)) result := &utils.GPResult{} var canceled bool = false for i := 0; i < 60; i++ { select { case <-ctx.Done(): canceled = true result.Value = false result.Err = fmt.Errorf("normal termination") default: time.Sleep(time.Millisecond * 50) } } if !canceled { if num < 450 { result.Value = true } else { result.Value = false } } return result } func main() { cxt, cancel := context.WithTimeout(context.Background(), 1 * time.Second) defer cancel() p := utils.NewGContextPool(cxt,30) slice := make([]interface{}, 0) for i := 0; i < 1000; i++ { slice = append(slice, strconv.Itoa(i)) } result := make([]*utils.GPResult, 0, 10) trueCount := 0 falseCount := 0 start := time.Now() for item := range p.ApplyAsync(JudgeStrWithContext2, slice) { result = append(result, item) if item.Err!= nil{ //log.Println("cancel", item.Err) continue } if item.Value.(bool) { trueCount++ } else { falseCount++ } } log.Printf("cancel, %v, true:%v, false:%v, cost:%v\n", len(result), trueCount, falseCount, time.Since(start)) } 请我喝瓶饮料

November 19, 2018 · 2 min

简单的Golang 协程池

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 引言 很多时候,还是需要用到协程池的,简单撸了一个,方便工作中的需要,有bug请反馈 1. 安装 go get github.com/vearne/golib/utils 2. 使用 2.1 创建协程池 // 协程池的大小是30 var p *utils.GPool = utils.NewGPool(30) 2.2 定义任务处理函数 任务处理函数形如 type JobFunc func(param interface{}) *GPResult GPResult type GPResult struct { Value interface{} Err error } 执行任务 ApplyAsync(f JobFunc, slice []interface{}) 示例 pool.go package main import ( "github.com/vearne/golib/utils" "log" "strconv" ) func Judge(key interface{}) *utils.GPResult { result := &utils.GPResult{} num, _ := strconv.Atoi(key.(string)) if num < 450 { result.Value = true } else { result.Value = false } return result } func main() { p := utils.NewGPool(30) slice := make([]interface{}, 0) for i := 0; i < 1000; i++ { slice = append(slice, strconv.Itoa(i)) } result := make([]bool, 0, 10) trueCount := 0 falseCount := 0 for item := range p.ApplyAsync(Judge, slice) { value := item.Value.(bool) result = append(result, value) if value { trueCount++ } else { falseCount++ } } log.Printf("cancel, %v, true:%v, false:%v\n", len(result), trueCount, falseCount) } 致谢 程序的API形式以及思路参考了python的multiprocessing模块,再此表示感谢 ...

September 5, 2018 · 1 min