Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

起因:几年前为了开发一个监控系统,需要周期性的对系统状态进行检查,因此需要对检查任务进行添加,排队(按时间),移除等操作,无意中发现java jdk 中有DelayQueue,因此实现了一个python版本

源码如下:

# -*- coding:utf-8 -*-
# python 版的 DelayQueue 类 和 Delayed 接口 
# 
from Queue import PriorityQueue
from datetime import datetime
import threading

class Delayed(object):
    # 返回:计划执行时间
    # 单位: datetime
    def plan_time(self):
        pass

def total_seconds(td):
    return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6


class DelayQueue(PriorityQueue):
    def __init__(self, maxsize):
        self.queue = []
        # 如果任务没有到达执行时间,则消费者必须等待在此condition上
        self.lock = threading.Lock()
        self.can_done = threading.Condition(self.lock)

    def put_task(self, task):
        self.put((task.plan_time, task))

    # 检索并移除此队列的头部,如果此队列不存在未到期延迟的元素,则等待它 
    def take_task(self):
        self.can_done.acquire()
        try:
            task = self.peek()
            delta = total_seconds(task.plan_time - datetime.now())
            while delta > 0:
                self.can_done.wait(delta)

                task = self.peek()
                delta = total_seconds(task.plan_time - datetime.now())

            item = self.get()
            self.can_done.notify_all()
            return item[1]
        finally:
            self.can_done.release()

    def peek(self):
        self.not_empty.acquire()
        try:
            while not self._qsize():
                self.not_empty.wait()

            return self.queue[0][1]
        finally:
            self.not_empty.release()

PS: python 中的 PriorityQueue基于 最小堆 算法的,添加和移除一个元素的耗时都是log2(n)

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据