Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

前言

以前一直有使用celery的优先级机制(基于redis的任务队列),一直很好奇它的实现机制,在查阅了部分资料后,决定写这篇文章,作为总结。

1. 利用Sorted Set 实现

使用Sorted Set 做优先级队列最大的优点是直观明了。

ZADD key score member [[score member] [score member] ...]

score 作为优先级,member 作为相应的任务
在Sorted Set 中,score 小的,位于优先级队列的头部,即优先级较高
由于score 就是menber的优先级,因此非常直观
可以使用

MULTI
ZRANGE key 0 0 WITHSCORES
ZREMRANGEBYRANK task_list 0 0
EXEC

来获取任务队列中优先级最高的元素
ZRANGE 用于获取任务,ZREMRANGEBYRANK 用于从消息队列中移除

注意:由于Sorted Set本身是一个set,因此消息队列中的消息不能重复,否则新加入的消息会覆盖以前加入的消息
注意:对于score 相同的消息,Sorted Set 会按照字典序进行排序

2. 利用List实现

应该一下就能想到,list 是作为消息队列的最理想的选择,但这里使用list 实现带优先级的消息队列也可以有好几种不同的实现方式。

2.1 准备

首先,如果我们假定消息队列中的消息,从消息队列的右侧推入(RPUSH),从左侧取出(LPOP)
那么单个list 很容易构造成一个FIFO 队列。但是如果优先级只有两级,高和低,那么我们可以把高优先级的消息,使用LPUSH 推入队列左侧,把低优先级的消息,使用RPUSH推入到队列右侧, 这样单个list就可以实现2级的带优先级的消息队列。

2.2 使用BLPOP

redis 提供了列表的阻塞式(blocking)弹出原语。

BLPOP key [key ...] timeout

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头元素。
这样我们可以创建三个队列,high,normal, low ,分别代表高优先级,普通优先级,低优先级

BLPOP high normal low

2.3 基于多个key 的LPOP

有时候我们并不想要阻塞式的原语,那么在业务层,我们可以在多个队列中遍历,查找来获取消息

queue_list = ["high", "normal", "low"]
def get_msg():
    for queue in queue_list:
        msg = redis_db.lpop(queue)
        if msg is not None:
            return msg
    return None

RQ -- 异步任务系统,从任务队列中获取任务的代码如下。它封装了redis的LPOP 既支持阻塞式,也支持非阻塞式的获取优先级最高的任务。

@classmethod
def lpop(cls, queue_keys, timeout, connection=None):
    connection = resolve_connection(connection)
    if timeout is not None:  # blocking variant
        if timeout == 0:
            raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0.')
            result = connection.blpop(queue_keys, timeout)
            if result is None:
                raise DequeueTimeout(timeout, queue_keys)
            queue_key, job_id = result
            return queue_key, job_id
        else:  # non-blocking variant
            for queue_key in queue_keys:
                blob = connection.lpop(queue_key)
                if blob is not None:
                    return queue_key, blob
            return None

2.4 扩展

如果我们需要10个优先级的消息队列,可以想到我们需要至少5个队列(参考2.1)
这时候我们的消息队列的命名可能就需要采取某种规则
比如,原打算命名的消息队列的名称为 msg_queue
那么这5个消息队列就可以被命名为
msg_queue-0
msg_queue-1
msg_queue-2
msg_queue-3
msg_queue-4

如果再结合

KEYS pattern

我们就可以得到对任意多个优先级支持的消息队列

# priority 1 ~ 10
# push message into list
def push_message(queue, priority, message):
    num = (priority - 1) / 2
    target_queue = queue + "-" + str(num)
    # direct
    if priority % 2 == 1:
        redis_db.lpush(target_queue, message)
    else:
        redis_db.rpush(target_queue, message)
# fetch  a message
def fetch_message(queue):
    queue_list = redis_db.keys(queue + "-?")
    queue_list = sorted(queue_list)
    for queue in queue_list:
        msg = redis_db.lpop(queue)
        if msg is not None:
            return msg
    return None

注意:采用这种做法,同一优先级的消息,并不满足FIFO

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注