Fork me on GitHub

版权声明 本站原创文章 由 萌叔 发表
转载请注明 萌叔 | https://vearne.cc

起因

最近打算实现异步任务,回想起当年看celery的场景,重新整理下celery的机制

1. 任务入队列

假定一个函数定义如下

def add(a, b,  c=0):
    print a + b + c

任务被序列化后,以字符串的形式入队列

{"body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3Rhc2tzZXRxCU5VAmlkcQpVJDA2ZjMzMWQ1LWFhZTktNGVmNy05NDVmLTNhNzM3NThlNmI2MnELVQdyZXRyaWVzcQxLAFUEdGFza3ENWAkAAAB0YXNrcy5hZGRxDlUDZXRhcQ9OVQZrd2FyZ3NxEH1xEVgBAAAAY0sKc3Uu", "headers": {}, "content-type": "application/x-python-serialize", "properties": {"body_encoding": "base64", "delivery_info": {"priority": 0, "routing_key": "xxxxxxxx", "exchange": "xxxxxxxx"}, "delivery_mode": 2, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"}, "content-encoding": "binary"}

展开

{
    "body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3Rhc2tzZXRxCU5VAmlkcQpVJDA2ZjMzMWQ1LWFhZTktNGVmNy05NDVmLTNhNzM3NThlNmI2MnELVQdyZXRyaWVzcQxLAFUEdGFza3ENWAkAAAB0YXNrcy5hZGRxDlUDZXRhcQ9OVQZrd2FyZ3NxEH1xEVgBAAAAY0sKc3Uu",
    "headers": {},
    "content-type": "application/x-python-serialize",
    "properties": {
        "body_encoding": "base64",
        "delivery_info": {
            "priority": 0,
            "routing_key": "xxxxxxxx",
            "exchange": "xxxxxxxx"
        },
        "delivery_mode": 2,
        "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"
    },
    "content-encoding": "binary"
}

body 中存储有task需要执行的所有信息,默认情况下, 它的编码方式是
dict --> pickle 编码 --> base64编码 --> 字符串

解码后的body 形如

{
    'utc': False,
    'chord': None,
    'args': [
        15,
        20
    ],
    'retries': 0,
    'expires': None,
    'task': u'tasks.add',
    'callbacks': None,
    'errbacks': None,
    'taskset': None,
    'kwargs': {
        u'c': 10
    },
    'eta': None,
    'id': '06f331d5-aae9-4ef7-945f-3a73758e6b62'
}

其中重要的字段是
task u'tasks.add' 是函数的全路径,包含包路径
args [15, 20] 函数参数
kwargs { u'c': 10} 函数参数

2. 任务执行

函数的执行可以直接简化成

def add(a, b,  c=0):
    print a + b + c

globals()['add'](*[15, 20], **{'c':10})

3. 后记

当然celery本身比这复杂的多,任务编码的方式可以自己指定,worker要能执行某个任务,任务的信息是需要提前注册的。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注