python logging 最佳实践

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 我经常跟同事开玩笑,我说在一家公司里面,能把日志这个功能搞清楚的都没有几个。所以写篇文章把我知道的部分知识分享一下。 在我目前看到的日志的文档中,Python的官方的文章是最清晰明了,推荐大家都来阅读下 https://docs.python.org/2/howto/logging.html 这个流程图非常重要,希望朋友们能仔细看看。 1. 适用场景 1.1 一般场景 在一般情况下,我们最常用的的handler有两个: RotatingFileHandler 按设定的文件大小切分日志 TimedRotatingFileHandler 按时间切分日志 以上2个handler都是线程安全的,可以用于多线程的场景,对于多进程则需要考虑其它方法 1.2 多进程 对于多进程的场景,python官方文档推荐我们使用 SocketHandler 使用TCP数据传输日志 可以参考我的文章 python 日志收集服务器 DatagramHandler 使用UDP数据传输日志 在单机上,即日志发送client 和 日志收集server 在同一台机器上,压测结果 type record/second TCP 6000 UDP 9000 除此之外还可以考虑一下两种方法: python-logstash 使用logstash,打开logstash的UDP或者TCP的服务端口直接接受数据,收到的数据可以入ElasticSearch 或者直接输出到文件中 自定义新的handler 将日志记录存入redis 的某个队列中,再额外启动一个进程,从redis中把数据读出入到文件中 2. 日志的级别 Level Numeric value CRITICAL 50 ERROR 40 WARNING 30 INFO 20 DEBUG 10 NOTSET 0 一般情况下日志文件只需要分成两个即可 all.log 存储 >= INFO 级别的日志 测试环境可以开到DEBUG 级别 可以跟踪业务流程等等 error.log 存储 >= ERROR 级别的日志 便于快速排查故障 一个常见的日志初始化模块可以这样书写 logger_helper.py ...

January 2, 2018 · 1 min

Python中执行外部命令并捕获双向输出

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc import subprocess # print ’popen3:’ def external_cmd(cmd, msg_in=''): try: proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) stdout_value, stderr_value = proc.communicate(msg_in) return stdout_value, stderr_value except ValueError, err: # log("IOError: %s" % err) return None, None if __name__ == '__main__': stdout_val, stderr_val = external_cmd('ls -l') print 'Standard Output: %s' % stdout_val print 'Standard Error: %s' % stderr_val

January 2, 2018 · 1 min

celery 中任务的结构以及执行

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 最近打算实现异步任务,回想起当年看celery的场景,重新整理下celery的机制 1. 任务入队列 假定一个函数定义如下 def add(a, b, c=0): print a + b + c 任务被序列化后,以字符串的形式入队列 {"body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3Rhc2tzZXRxCU5VAmlkcQpVJDA2ZjMzMWQ1LWFhZTktNGVmNy05NDVmLTNhNzM3NThlNmI2MnELVQdyZXRyaWVzcQxLAFUEdGFza3ENWAkAAAB0YXNrcy5hZGRxDlUDZXRhcQ9OVQZrd2FyZ3NxEH1xEVgBAAAAY0sKc3Uu", "headers": {}, "content-type": "application/x-python-serialize", "properties": {"body_encoding": "base64", "delivery_info": {"priority": 0, "routing_key": "xxxxxxxx", "exchange": "xxxxxxxx"}, "delivery_mode": 2, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"}, "content-encoding": "binary"} 展开 { "body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4lVBGFyZ3NxBF1xBShLD0sUZVUFY2hvcmRxBk5VCWNhbGxiYWNrc3EHTlUIZXJyYmFja3NxCE5VB3Rhc2tzZXRxCU5VAmlkcQpVJDA2ZjMzMWQ1LWFhZTktNGVmNy05NDVmLTNhNzM3NThlNmI2MnELVQdyZXRyaWVzcQxLAFUEdGFza3ENWAkAAAB0YXNrcy5hZGRxDlUDZXRhcQ9OVQZrd2FyZ3NxEH1xEVgBAAAAY0sKc3Uu", "headers": {}, "content-type": "application/x-python-serialize", "properties": { "body_encoding": "base64", "delivery_info": { "priority": 0, "routing_key": "xxxxxxxx", "exchange": "xxxxxxxx" }, "delivery_mode": 2, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3" }, "content-encoding": "binary" } body 中存储有task需要执行的所有信息,默认情况下, 它的编码方式是 dict –> pickle 编码 –> base64编码 –> 字符串 ...

January 2, 2018 · 1 min

DFA 算法(字典树)实现关键词匹配

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因: 从网页中爬去的页面,需要判断是否跟预设的关键词匹配(是否包含预设的关键词),并返回所有匹配到的关键词 。 目前pypi 上两个实现 ahocorasick https://pypi.python.org/pypi/ahocorasick/0.9 esmre https://pypi.python.org/pypi/esmre/0.3.1 但是其实包都是基于DFA 实现的 这里提供源码如下: #!/usr/bin/python2.6 # -*- coding: utf-8 -*- import time class Node(object): def __init__(self): self.children = None # 标记匹配到了关键词 self.flag = False # The encode of word is UTF-8 def add_word(root,word): if len(word) <= 0: return node = root for i in range(len(word)): if node.children == None: node.children = {} node.children[word[i]] = Node() elif word[i] not in node.children: node.children[word[i]] = Node() node = node.children[word[i]] node.flag = True def init(word_list): root = Node() for line in word_list: add_word(root,line) return root # The encode of word is UTF-8 # The encode of message is UTF-8 def key_contain(message, root): res = set() for i in range(len(message)): p = root j = i while (j<len(message) and p.children!=None and message[j] in p.children): if p.flag == True: res.add(message[i:j]) p = p.children[message[j]] j = j + 1 if p.children==None: res.add(message[i:j]) #print '---word---',message[i:j] return res def dfa(): print '----------------dfa-----------' word_list = ['hello', '民警', '朋友','女儿','派出所', '派出所民警'] root = init(word_list) message = '四处乱咬乱吠,吓得家中11岁的女儿躲在屋里不敢出来,直到辖区派出所民警赶到后,才将孩子从屋中救出。最后在征得主人同意后,民警和村民合力将这只发疯的狗打死' x = key_contain(message, root) for item in x: print item if __name__ == '__main__': dfa() 测试结果: ...

January 2, 2018 · 2 min

python datetime类型和time类型互转

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因:写过好几次了,每次都记不住 # -*- coding: utf-8 -*- import time from datetime import datetime def datetime2secs(mydate): ''' datetime.datetime 类型 到 自epoch 的秒数 ''' return time.mktime(mydate.timetuple()) def secs2datetime(ts): return datetime.fromtimestamp(ts) d = datetime(2014,1,1) print d ts = datetime2secs(d) print ts print secs2datetime(ts) 参考资料:http://blog.sina.com.cn/s/blog_b09d460201018o0v.html PS: 非常好的资料,大家看看

January 2, 2018 · 1 min

Delayqueue (python 实现)

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因:几年前为了开发一个监控系统,需要周期性的对系统状态进行检查,因此需要对检查任务进行添加,排队(按时间),移除等操作,无意中发现java jdk 中有DelayQueue,因此实现了一个python版本 源码如下: # -*- coding:utf-8 -*- # python 版的 DelayQueue 类 和 Delayed 接口 # from Queue import PriorityQueue from datetime import datetime import threading class Delayed(object): # 返回:计划执行时间 # 单位: datetime def plan_time(self): pass def total_seconds(td): return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6 class DelayQueue(PriorityQueue): def __init__(self, maxsize): self.queue = [] # 如果任务没有到达执行时间,则消费者必须等待在此condition上 self.lock = threading.Lock() self.can_done = threading.Condition(self.lock) def put_task(self, task): self.put((task.plan_time, task)) # 检索并移除此队列的头部,如果此队列不存在未到期延迟的元素,则等待它 def take_task(self): self.can_done.acquire() try: task = self.peek() delta = total_seconds(task.plan_time - datetime.now()) while delta > 0: self.can_done.wait(delta) task = self.peek() delta = total_seconds(task.plan_time - datetime.now()) item = self.get() self.can_done.notify_all() return item[1] finally: self.can_done.release() def peek(self): self.not_empty.acquire() try: while not self._qsize(): self.not_empty.wait() return self.queue[0][1] finally: self.not_empty.release() PS: python 中的 PriorityQueue基于 最小堆 算法的,添加和移除一个元素的耗时都是log2(n)

January 2, 2018 · 1 min

python线程池

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因: python的进程池,大家都用过,但是线程池很多人估计都用错了 错误的库 估计不少人都用的是这个库 https://pypi.python.org/pypi/threadpool/1.3.2 但是这个库其实是有问题的,程序有时候会异常退出。其实作者明确指出,这个库已经废弃了 This module is OBSOLETE and is only provided on PyPI to support old projects that still use it. Please DO NOT USE IT FOR NEW PROJECTS! Use modern alternatives like the multiprocessing module in the standard library or even an asynchroneous approach with asyncio. 简单翻译下: 这个模块已经废弃了,请不要再在新项目中使用它 推荐使用标准库的线程池 from multiprocessing.dummy import Pool as ThreadPool def square(x): print x * x worker_count = 10 pool = ThreadPool(worker_count) task_list = range(10) pool.map(square, task_list) pool.close() pool.join() 运行结果 ...

January 2, 2018 · 1 min

golang批量Ping的库

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因:公司有个项目需要批量检查多个IP的网络质量,其中一个指标是丢包率,当然用ping是最合适的方法,但是使用ping命令性能太差,Golang又没有批量ping的包 开发: 目前github只有sparrc写的ping库,不过他的库不支持批量探测多个IP https://github.com/sparrc/go-ping 并且sparrc/go-ping在使用时,如果对多个IP同时探测会出现串包的问题,请谨慎使用。 再他的启发下,我做了扩展,支持批量请求 安装 go get github.com/vearne/go-ping 使用 ipSlice := []string{} ipSlice = append(ipSlice, "123.126.157.222") ipSlice = append(ipSlice, "wwww.baidu.com") ipSlice = append(ipSlice, "github.com") // 对每个地址,共发出4个探测包,每隔1秒发一个, 总超时6秒 bp, err := ping.NewBatchPinger(ipSlice, 4, time.Second*1, time.Second*6) if err != nil { fmt.Println(err) } // 收到ICMP answer时的回调函数 bp.OnRecv = func(pkt *icmp.Echo) { // fmt.Printf("recv icmp_id=%d, icmp_seq=%d\n", pkt.ID, pkt.Seq) } // 所有Ping结束时的回调函数 bp.OnFinish = func(stSlice []*ping.Statistics) { for _, st := range stSlice{ fmt.Printf("\n--- %s ping statistics ---\n", st.Addr) fmt.Printf("%d packets transmitted, %d packets received, %v%% packet loss\n", st.PacketsSent, st.PacketsRecv, st.PacketLoss) fmt.Printf("round-trip min/avg/max/stddev = %v/%v/%v/%v\n", st.MinRtt, st.AvgRtt, st.MaxRtt, st.StdDevRtt) } } bp.Run() 完整的例子 https://github.com/vearne/go-ping/blob/master/cmd/ping/ping2.go ...

January 1, 2018 · 1 min

python 模块==命名空间?

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因: 想利用模块传递某个变量,修改某个变量的值,且在其它模块中也可见 于是我做了这样一个实验: git@github.com:vearne/test_scope.git base.py value = 10 b.py import base def hello(): print 'scope base', base.value, id(base.value) main.py from base import value from b import hello print 'scope base', value, id(value) value = 20 print 'scope local', value, id(value) hello() 运行python main.py 输出结果如下: ['__builtins__', '__doc__', '__file__', '__name__', '__package__', 'hello', 'value'] scope base 10 140195531889072 ['__builtins__', '__doc__', '__file__', '__name__', '__package__', 'hello', 'value'] scope local 20 140195531888832 scope base 10 140195531889072 大家可以看出,value 的值并没有被修改,并且id值(对象的内存地址) 不一致,因此我们得出结论, value 和 base.value 存在在不同位置,是两个不同的对象。 阅读python官方文档 https://docs.python.org/2/tutorial/modules.html 我找到这样一段话 ...

January 1, 2018 · 2 min

python 使用multiprocessing需要注意的问题

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 我们在编写程序的时候经常喜欢这样写代码 import MySQLdb import time from multiprocessing import Process conn = MySQLdb.connect('localhost', 'vearne', 'xx', 'test') def f(name): for i in xrange(10): cursor = conn.cursor() sql = "insert into car(name) values(%s)" param = [(name)] print param #time.sleep(1) n = cursor.execute(sql,param) cursor.close() conn.commit() if __name__ == '__main__': for i in xrange(10): p = Process(target=f, args=('bob',)) p.start() 上面的程序有问题吗? 以上的程序在单进程的情况下,应该是没有问题,但是在多进程的情况下,它是有错误的。 首先看看下面的源码 class Process(object): ''' Process objects represent activity that is run in a separate process The class is analagous to `threading.Thread` ''' _Popen = None def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): assert group is None, 'group argument must be None for now' count = _current_process._counter.next() self._identity = _current_process._identity + (count,) self._authkey = _current_process._authkey self._daemonic = _current_process._daemonic self._tempdir = _current_process._tempdir self._parent_pid = os.getpid() self._popen = None self._target = target self._args = tuple(args) self._kwargs = dict(kwargs) self._name = name or type(self).__name__ + '-' + \ ':'.join(str(i) for i in self._identity) def run(self): ''' Method to be run in sub-process; can be overridden in sub-class ''' if self._target: self._target(*self._args, **self._kwargs) def start(self): ''' Start child process ''' assert self._popen is None, 'cannot start a process twice' assert self._parent_pid == os.getpid(), \ 'can only start a process object created by current process' assert not _current_process._daemonic, \ 'daemonic processes are not allowed to have children' _cleanup() if self._Popen is not None: Popen = self._Popen else: from .forking import Popen self._popen = Popen(self) # -- 创建 Popen 对象 -- _current_process._children.add(self) # 省略部分代码 ... ... def _bootstrap(self): # -- _bootstrap 函数 -- from . import util global _current_process try: self._children = set() self._counter = itertools.count(1) try: sys.stdin.close() sys.stdin = open(os.devnull) except (OSError, ValueError): pass _current_process = self util._finalizer_registry.clear() util._run_after_forkers() util.info('child process calling self.run()') try: self.run() # -- 调用run函数 -- exitcode = 0 finally: util._exit_function() except SystemExit, e: if not e.args: exitcode = 1 elif isinstance(e.args[0], int): exitcode = e.args[0] else: sys.stderr.write(str(e.args[0]) + '\n') sys.stderr.flush() exitcode = 0 if isinstance(e.args[0], str) else 1 except: exitcode = 1 import traceback sys.stderr.write('Process %s:\n' % self.name) sys.stderr.flush() traceback.print_exc() util.info('process exiting with exitcode %d' % exitcode) return exitcode from .forking import Popen 定义 ...

January 1, 2018 · 4 min