Delayqueue (python 实现)
版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc
起因:几年前为了开发一个监控系统,需要周期性的对系统状态进行检查,因此需要对检查任务进行添加,排队(按时间),移除等操作,无意中发现java jdk 中有DelayQueue,因此实现了一个python版本
源码如下:
# -*- coding:utf-8 -*-
# python 版的 DelayQueue 类 和 Delayed 接口
#
from Queue import PriorityQueue
from datetime import datetime
import threading
class Delayed(object):
# 返回:计划执行时间
# 单位: datetime
def plan_time(self):
pass
def total_seconds(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6
class DelayQueue(PriorityQueue):
def __init__(self, maxsize):
self.queue = []
# 如果任务没有到达执行时间,则消费者必须等待在此condition上
self.lock = threading.Lock()
self.can_done = threading.Condition(self.lock)
def put_task(self, task):
self.put((task.plan_time, task))
# 检索并移除此队列的头部,如果此队列不存在未到期延迟的元素,则等待它
def take_task(self):
self.can_done.acquire()
try:
task = self.peek()
delta = total_seconds(task.plan_time - datetime.now())
while delta > 0:
self.can_done.wait(delta)
task = self.peek()
delta = total_seconds(task.plan_time - datetime.now())
item = self.get()
self.can_done.notify_all()
return item[1]
finally:
self.can_done.release()
def peek(self):
self.not_empty.acquire()
try:
while not self._qsize():
self.not_empty.wait()
return self.queue[0][1]
finally:
self.not_empty.release()
PS: python 中的 PriorityQueue基于 最小堆 算法的,添加和移除一个元素的耗时都是log2(n)