用golang实现的定时器

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 我们有一个业务场景(高并发),收到的每个任务都必须在很短的时间内完成,然后回报给调用方,因此每个任务都需要设定定时器。显然golang 默认的timer是不能支持这种场景的,因此我在delayqueue的基础上自己实现了一个库gtimer GitHub地址 timer 用golang实现的定时器,基于delayqueue 实现 实现受到了Java DelayQueue.java的启发 源码地址 DelayQueue.java 依赖的几个结构依次为为 timer -> delayqueue -> priorityqueue -> heap 由于golang的Condition不支持wait一段时间,所以使用golang原生的Timer来替代了Condition在delayqueue中的作用 Installation Install: go get -u github.com/vearne/gtimer Import: import "github.com/vearne/gtimer" Quick Start package main import ( "fmt" log "github.com/sirupsen/logrus" "github.com/vearne/gtimer" "math/rand" "strconv" "sync/atomic" "time" ) const ( PRODUCER_COUNT = 10 CONSUMER_COUNT = 10 TARGET_COUNT = 1000000 ) var ops int64 = 0 func main() { st := gtimer.NewSuperTimer(CONSUMER_COUNT) t1 := time.Now() for i := 0; i < PRODUCER_COUNT; i++ { go push(st, "worker"+strconv.Itoa(i)) } time.Sleep(100 * time.Millisecond) for { v := atomic.LoadInt64(&ops) if v >= TARGET_COUNT { st.Stop() break } else { time.Sleep(100 * time.Millisecond) } } t2 := time.Now() log.Infof("cost:%v\n", t2.Sub(t1)) } func DefaultAction(t time.Time, value string) { // fmt.Printf("trigger_time:%v, value:%v\n", t, value) atomic.AddInt64(&ops, 1) } func push(timer *gtimer.SuperTimer, name string) { r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i < 1000000; i++ { now := time.Now() t := now.Add(time.Millisecond * time.Duration(r.Int63n(300))) value := fmt.Sprintf("%v:value:%v", name, strconv.Itoa(i)) // create a delayed task item := gtimer.NewDelayedItemFunc(t, value, DefaultAction) timer.Add(item) } } use NewDelayedItemFunc, we can create a task ...

February 13, 2018 · 2 min

Delayqueue (python 实现)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因:几年前为了开发一个监控系统,需要周期性的对系统状态进行检查,因此需要对检查任务进行添加,排队(按时间),移除等操作,无意中发现java jdk 中有DelayQueue,因此实现了一个python版本 源码如下: # -*- coding:utf-8 -*- # python 版的 DelayQueue 类 和 Delayed 接口 # from Queue import PriorityQueue from datetime import datetime import threading class Delayed(object): # 返回:计划执行时间 # 单位: datetime def plan_time(self): pass def total_seconds(td): return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 class DelayQueue(PriorityQueue): def __init__(self, maxsize): self.queue = [] # 如果任务没有到达执行时间,则消费者必须等待在此condition上 self.lock = threading.Lock() self.can_done = threading.Condition(self.lock) def put_task(self, task): self.put((task.plan_time, task)) # 检索并移除此队列的头部,如果此队列不存在未到期延迟的元素,则等待它 def take_task(self): self.can_done.acquire() try: task = self.peek() delta = total_seconds(task.plan_time - datetime.now()) while delta > 0: self.can_done.wait(delta) task = self.peek() delta = total_seconds(task.plan_time - datetime.now()) item = self.get() self.can_done.notify_all() return item[1] finally: self.can_done.release() def peek(self): self.not_empty.acquire() try: while not self._qsize(): self.not_empty.wait() return self.queue[0][1] finally: self.not_empty.release() PS: python 中的 PriorityQueue基于 最小堆 算法的,添加和移除一个元素的耗时都是log2(n)

January 2, 2018 · 1 min