简单的GOLANG 协程池2 (带Cancel功能)
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
前言
有些朋友可能看我的这篇文章
简单的GOLANG 协程池
这是我做的简单协程池,功能还算完备,但是有些任务执行的时间很长,如果想在协程池运行起来以后就退出,就只能死等。这是非常不友好的,因此我又写了一个新的协程池GContextPool
,利用context
支持在任务运行过程中,结束任务。原来的GPool
仍然可以继续使用。
下面我们来看看用法。
1. 安装
go get github.com/vearne/golib/utils
2. 使用
2.1 创建协程池
// 设定协程池的中协程的数量是30个
cxt, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
defer cancel()
var p *utils.GContextPool = utils.NewGContextPool(ctx, 30)
2.2 定义任务处理函数
任务处理函数形如
type JobContextFunc func(ctx context.Context, key interface{}) *GPResult
执行任务
GContextPool.ApplyAsync(f JobContextFunc, slice []interface{})
f JobContextFunc
是目标函数
slice []interface{}
任务参数列表
示例
package main
import (
"context"
"fmt"
"github.com/vearne/golib/utils"
"log"
"strconv"
"time"
)
func JudgeStrWithContext2(ctx context.Context, key interface{}) *utils.GPResult {
num, _ := strconv.Atoi(key.(string))
result := &utils.GPResult{}
var canceled bool = false
for i := 0; i < 60; i++ {
select {
case <-ctx.Done():
canceled = true
result.Value = false
result.Err = fmt.Errorf("normal termination")
default:
time.Sleep(time.Millisecond * 50)
}
}
if !canceled {
if num < 450 {
result.Value = true
} else {
result.Value = false
}
}
return result
}
func main() {
cxt, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
defer cancel()
p := utils.NewGContextPool(cxt,30)
slice := make([]interface{}, 0)
for i := 0; i < 1000; i++ {
slice = append(slice, strconv.Itoa(i))
}
result := make([]*utils.GPResult, 0, 10)
trueCount := 0
falseCount := 0
start := time.Now()
for item := range p.ApplyAsync(JudgeStrWithContext2, slice) {
result = append(result, item)
if item.Err!= nil{
//log.Println("cancel", item.Err)
continue
}
if item.Value.(bool) {
trueCount++
} else {
falseCount++
}
}
log.Printf("cancel, %v, true:%v, false:%v, cost:%v\n", len(result),
trueCount, falseCount, time.Since(start))
}