利用redis实现带优先级的消息队列

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 以前一直有使用celery的优先级机制(基于redis的任务队列),一直很好奇它的实现机制,在查阅了部分资料后,决定写这篇文章,作为总结。 1. 利用Sorted Set 实现 使用Sorted Set 做优先级队列最大的优点是直观明了。 ZADD key score member [[score member] [score member] ...] score 作为优先级,member 作为相应的任务 在Sorted Set 中,score 小的,位于优先级队列的头部,即优先级较高 由于score 就是menber的优先级,因此非常直观 可以使用 MULTI ZRANGE key 0 0 WITHSCORES ZREMRANGEBYRANK task_list 0 0 EXEC 来获取任务队列中优先级最高的元素 ZRANGE 用于获取任务,ZREMRANGEBYRANK 用于从消息队列中移除 注意:由于Sorted Set本身是一个set,因此消息队列中的消息不能重复,否则新加入的消息会覆盖以前加入的消息 注意:对于score 相同的消息,Sorted Set 会按照字典序进行排序 2. 利用List实现 应该一下就能想到,list 是作为消息队列的最理想的选择,但这里使用list 实现带优先级的消息队列也可以有好几种不同的实现方式。 2.1 准备 首先,如果我们假定消息队列中的消息,从消息队列的右侧推入(RPUSH),从左侧取出(LPOP) 那么单个list 很容易构造成一个FIFO 队列。但是如果优先级只有两级,高和低,那么我们可以把高优先级的消息,使用LPUSH 推入队列左侧,把低优先级的消息,使用RPUSH推入到队列右侧, 这样单个list就可以实现2级的带优先级的消息队列。 2.2 使用BLPOP redis 提供了列表的阻塞式(blocking)弹出原语。 ...

January 2, 2018 · 2 min

python logging 最佳实践

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 我经常跟同事开玩笑,我说在一家公司里面,能把日志这个功能搞清楚的都没有几个。所以写篇文章把我知道的部分知识分享一下。 在我目前看到的日志的文档中,Python的官方的文章是最清晰明了,推荐大家都来阅读下 https://docs.python.org/2/howto/logging.html 这个流程图非常重要,希望朋友们能仔细看看。 1. 适用场景 1.1 一般场景 在一般情况下,我们最常用的的handler有两个: RotatingFileHandler 按设定的文件大小切分日志 TimedRotatingFileHandler 按时间切分日志 以上2个handler都是线程安全的,可以用于多线程的场景,对于多进程则需要考虑其它方法 1.2 多进程 对于多进程的场景,python官方文档推荐我们使用 SocketHandler 使用TCP数据传输日志 可以参考我的文章 python 日志收集服务器 DatagramHandler 使用UDP数据传输日志 在单机上,即日志发送client 和 日志收集server 在同一台机器上,压测结果 type record/second TCP 6000 UDP 9000 除此之外还可以考虑一下两种方法: python-logstash 使用logstash,打开logstash的UDP或者TCP的服务端口直接接受数据,收到的数据可以入ElasticSearch 或者直接输出到文件中 自定义新的handler 将日志记录存入redis 的某个队列中,再额外启动一个进程,从redis中把数据读出入到文件中 2. 日志的级别 Level Numeric value CRITICAL 50 ERROR 40 WARNING 30 INFO 20 DEBUG 10 NOTSET 0 一般情况下日志文件只需要分成两个即可 all.log 存储 >= INFO 级别的日志 测试环境可以开到DEBUG 级别 可以跟踪业务流程等等 error.log 存储 >= ERROR 级别的日志 便于快速排查故障 一个常见的日志初始化模块可以这样书写 logger_helper.py ...

January 2, 2018 · 1 min

Python中执行外部命令并捕获双向输出

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc import subprocess # print ’popen3:’ def external_cmd(cmd, msg_in=''): try: proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) stdout_value, stderr_value = proc.communicate(msg_in) return stdout_value, stderr_value except ValueError, err: # log("IOError: %s" % err) return None, None if __name__ == '__main__': stdout_val, stderr_val = external_cmd('ls -l') print 'Standard Output: %s' % stdout_val print 'Standard Error: %s' % stderr_val

January 2, 2018 · 1 min

celery 中任务的结构以及执行

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 最近打算实现异步任务,回想起当年看celery的场景,重新整理下celery的机制 1. 任务入队列 假定一个函数定义如下 def add(a, b, c=0): print a + b + c 任务被序列化后,以字符串的形式入队列 {"body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3Rhc2tzZXRxCU5VAmlkcQpVJDA2ZjMzMWQ1LWFhZTktNGVmNy05NDVmLTNhNzM3NThlNmI2MnELVQdyZXRyaWVzcQxLAFUEdGFza3ENWAkAAAB0YXNrcy5hZGRxDlUDZXRhcQ9OVQZrd2FyZ3NxEH1xEVgBAAAAY0sKc3Uu", "headers": {}, "content-type": "application/x-python-serialize", "properties": {"body_encoding": "base64", "delivery_info": {"priority": 0, "routing_key": "xxxxxxxx", "exchange": "xxxxxxxx"}, "delivery_mode": 2, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"}, "content-encoding": "binary"} 展开 { "body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3Rhc2tzZXRxCU5VAmlkcQpVJDA2ZjMzMWQ1LWFhZTktNGVmNy05NDVmLTNhNzM3NThlNmI2MnELVQdyZXRyaWVzcQxLAFUEdGFza3ENWAkAAAB0YXNrcy5hZGRxDlUDZXRhcQ9OVQZrd2FyZ3NxEH1xEVgBAAAAY0sKc3Uu", "headers": {}, "content-type": "application/x-python-serialize", "properties": { "body_encoding": "base64", "delivery_info": { "priority": 0, "routing_key": "xxxxxxxx", "exchange": "xxxxxxxx" }, "delivery_mode": 2, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3" }, "content-encoding": "binary" } body 中存储有task需要执行的所有信息,默认情况下, 它的编码方式是 dict –> pickle 编码 –> base64编码 –> 字符串 ...

January 2, 2018 · 1 min

DFA 算法(字典树)实现关键词匹配

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因: 从网页中爬去的页面,需要判断是否跟预设的关键词匹配(是否包含预设的关键词),并返回所有匹配到的关键词 。 目前pypi 上两个实现 ahocorasick https://pypi.python.org/pypi/ahocorasick/0.9 esmre https://pypi.python.org/pypi/esmre/0.3.1 但是其实包都是基于DFA 实现的 这里提供源码如下: #!/usr/bin/python2.6 # -*- coding: utf-8 -*- import time class Node(object): def __init__(self): self.children = None # 标记匹配到了关键词 self.flag = False # The encode of word is UTF-8 def add_word(root,word): if len(word) <= 0: return node = root for i in range(len(word)): if node.children == None: node.children = {} node.children[word[i]] = Node() elif word[i] not in node.children: node.children[word[i]] = Node() node = node.children[word[i]] node.flag = True def init(word_list): root = Node() for line in word_list: add_word(root,line) return root # The encode of word is UTF-8 # The encode of message is UTF-8 def key_contain(message, root): res = set() for i in range(len(message)): p = root j = i while (j<len(message) and p.children!=None and message[j] in p.children): if p.flag == True: res.add(message[i:j]) p = p.children[message[j]] j = j + 1 if p.children==None: res.add(message[i:j]) #print '---word---',message[i:j] return res def dfa(): print '----------------dfa-----------' word_list = ['hello', '民警', '朋友','女儿','派出所', '派出所民警'] root = init(word_list) message = '四处乱咬乱吠,吓得家中11岁的女儿躲在屋里不敢出来,直到辖区派出所民警赶到后,才将孩子从屋中救出。最后在征得主人同意后,民警和村民合力将这只发疯的狗打死' x = key_contain(message, root) for item in x: print item if __name__ == '__main__': dfa() 测试结果: ...

January 2, 2018 · 2 min

python datetime类型和time类型互转

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因:写过好几次了,每次都记不住 # -*- coding: utf-8 -*- import time from datetime import datetime def datetime2secs(mydate): ''' datetime.datetime 类型 到 自epoch 的秒数 ''' return time.mktime(mydate.timetuple()) def secs2datetime(ts): return datetime.fromtimestamp(ts) d = datetime(2014,1,1) print d ts = datetime2secs(d) print ts print secs2datetime(ts) 参考资料:http://blog.sina.com.cn/s/blog_b09d460201018o0v.html PS: 非常好的资料,大家看看

January 2, 2018 · 1 min

Delayqueue (python 实现)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因:几年前为了开发一个监控系统,需要周期性的对系统状态进行检查,因此需要对检查任务进行添加,排队(按时间),移除等操作,无意中发现java jdk 中有DelayQueue,因此实现了一个python版本 源码如下: # -*- coding:utf-8 -*- # python 版的 DelayQueue 类 和 Delayed 接口 # from Queue import PriorityQueue from datetime import datetime import threading class Delayed(object): # 返回:计划执行时间 # 单位: datetime def plan_time(self): pass def total_seconds(td): return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 class DelayQueue(PriorityQueue): def __init__(self, maxsize): self.queue = [] # 如果任务没有到达执行时间,则消费者必须等待在此condition上 self.lock = threading.Lock() self.can_done = threading.Condition(self.lock) def put_task(self, task): self.put((task.plan_time, task)) # 检索并移除此队列的头部,如果此队列不存在未到期延迟的元素,则等待它 def take_task(self): self.can_done.acquire() try: task = self.peek() delta = total_seconds(task.plan_time - datetime.now()) while delta > 0: self.can_done.wait(delta) task = self.peek() delta = total_seconds(task.plan_time - datetime.now()) item = self.get() self.can_done.notify_all() return item[1] finally: self.can_done.release() def peek(self): self.not_empty.acquire() try: while not self._qsize(): self.not_empty.wait() return self.queue[0][1] finally: self.not_empty.release() PS: python 中的 PriorityQueue基于 最小堆 算法的,添加和移除一个元素的耗时都是log2(n)

January 2, 2018 · 1 min

python线程池

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因: python的进程池,大家都用过,但是线程池很多人估计都用错了 错误的库 估计不少人都用的是这个库 https://pypi.python.org/pypi/threadpool/1.3.2 但是这个库其实是有问题的,程序有时候会异常退出。其实作者明确指出,这个库已经废弃了 This module is OBSOLETE and is only provided on PyPI to support old projects that still use it. Please DO NOT USE IT FOR NEW PROJECTS! Use modern alternatives like the multiprocessing module in the standard library or even an asynchroneous approach with asyncio. 简单翻译下: 这个模块已经废弃了,请不要再在新项目中使用它 推荐使用标准库的线程池 from multiprocessing.dummy import Pool as ThreadPool def square(x): print x * x worker_count = 10 pool = ThreadPool(worker_count) task_list = range(10) pool.map(square, task_list) pool.close() pool.join() 运行结果 ...

January 2, 2018 · 1 min

golang批量Ping的库

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因:公司有个项目需要批量检查多个IP的网络质量,其中一个指标是丢包率,当然用ping是最合适的方法,但是使用ping命令性能太差,Golang又没有批量ping的包 开发: 目前github只有sparrc写的ping库,不过他的库不支持批量探测多个IP https://github.com/sparrc/go-ping 并且sparrc/go-ping在使用时,如果对多个IP同时探测会出现串包的问题,请谨慎使用。 再他的启发下,我做了扩展,支持批量请求 安装 go get github.com/vearne/go-ping 使用 ipSlice := []string{} ipSlice = append(ipSlice, "123.126.157.222") ipSlice = append(ipSlice, "wwww.baidu.com") ipSlice = append(ipSlice, "github.com") // 对每个地址,共发出4个探测包,每隔1秒发一个, 总超时6秒 bp, err := ping.NewBatchPinger(ipSlice, 4, time.Second*1, time.Second*6) if err != nil { fmt.Println(err) } // 收到ICMP answer时的回调函数 bp.OnRecv = func(pkt *icmp.Echo) { // fmt.Printf("recv icmp_id=%d, icmp_seq=%d\n", pkt.ID, pkt.Seq) } // 所有Ping结束时的回调函数 bp.OnFinish = func(stSlice []*ping.Statistics) { for _, st := range stSlice{ fmt.Printf("\n--- %s ping statistics ---\n", st.Addr) fmt.Printf("%d packets transmitted, %d packets received, %v%% packet loss\n", st.PacketsSent, st.PacketsRecv, st.PacketLoss) fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n", st.MinRtt, st.AvgRtt, st.MaxRtt, st.StdDevRtt) } } bp.Run() 完整的例子 https://github.com/vearne/go-ping/blob/master/cmd/ping/ping2.go ...

January 1, 2018 · 1 min

python 模块==命名空间?

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因: 想利用模块传递某个变量,修改某个变量的值,且在其它模块中也可见 于是我做了这样一个实验: git@github.com:vearne/test_scope.git base.py value = 10 b.py import base def hello(): print 'scope base', base.value, id(base.value) main.py from base import value from b import hello print 'scope base', value, id(value) value = 20 print 'scope local', value, id(value) hello() 运行python main.py 输出结果如下: ['__builtins__', '__doc__', '__file__', '__name__', '__package__', 'hello', 'value'] scope base 10 140195531889072 ['__builtins__', '__doc__', '__file__', '__name__', '__package__', 'hello', 'value'] scope local 20 140195531888832 scope base 10 140195531889072 大家可以看出,value 的值并没有被修改,并且id值(对象的内存地址) 不一致,因此我们得出结论, value 和 base.value 存在在不同位置,是两个不同的对象。 阅读python官方文档 https://docs.python.org/2/tutorial/modules.html 我找到这样一段话 ...

January 1, 2018 · 2 min