python 日志收集服务器

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 python 的日志收集服务是线程安全的(对同一个文件的写入,使用了锁),但是对于多进程的情况,它是无法处理的。python 官方文档推荐的做法是,使用tcp 服务器专门用于日志的收集,以确保对的文件的写入是安全的。这里提供了日志收集服务器基于twisted的实现,可供参考,程序在centos上进行了测试,并可用于生产环境 当client 和server 在一台机器上时,每秒可以处理6000条日志记录 #!/usr/bin/env python # -*- coding:utf-8 -*- ################################## # logserver # 通过socket接收并存储日志信息 # 注:测试程序为test_logserver.py # 修改时间:2013-11-22 ################################### # standard lib import os, sys import time import twisted import logging import traceback import struct import cPickle # use cPickle for speed from logging.handlers import TimedRotatingFileHandler from twisted.internet.protocol import Protocol, Factory # our own code import settings from log_record import initlog # 根据logger name生成一个 def gen_log_name(name): name = name.replace('.', '_') return name + '.log' class ReceiveProtocol(Protocol): HEADER_LENGTH = 4 def __init__(self): self.buf = '' self.state = 'init' self.require_length = ReceiveProtocol.HEADER_LENGTH # 记录当前完成初始化的logger名称 self.logger_dict = {} def connectionMade(self): logger = logging.getLogger('log_server') logger.info('[makeConnection]与logger_client建立socket连接。') def dataReceived(self, data): logger = logging.getLogger('log_server') try: if len(self.buf + data) >= self.require_length: self.buf = self.buf + data # 数据就绪进行相应动作 data = self.buf[0:self.require_length] # 把数据从缓冲区中取走 self.buf = self.buf[self.require_length:] self.solve(data) # 可能一次读到了多条日志记录 if self.buf: self.dataReceived('') else: self.buf += data except BaseException, e: logger.error('处理记录失败' + str(e) + '\n' + traceback.format_exc()) def solve(self, data): logger = logging.getLogger('log_server') statehandler = None try: pto = 'proto_' + self.state statehandler = getattr(self,pto) except AttributeError: logger.error('callback',self.state,'not found') self.transport.loseConnection() statehandler(data) if self.state == 'done': self.transport.loseConnection() def connectionLost(self, reason): logger = logging.getLogger('log_server') logger.info('[connectionLost]与logger_client的socket连接关闭。') # 记录日志 def proto_record(self, data): logRecord = logging.makeLogRecord(cPickle.loads(data)) if logRecord.name not in self.logger_dict: logger = initlog(logRecord.name, settings.PATH, gen_log_name(logRecord.name), logLevel=logging.DEBUG) self.logger_dict[logRecord.name] = logger self.logger_dict[logRecord.name].handle(logRecord) # 修改下一步动作以及所需长度 self.state = 'init' self.require_length = ReceiveProtocol.HEADER_LENGTH # 处理头部信息 def proto_init(self, data): length = struct.unpack('!I', data[0:ReceiveProtocol.HEADER_LENGTH])[0] # 修改下一步动作以及所需长度 self.state = 'record' self.require_length = length if len(self.buf) >= self.require_length: data = self.buf[0:self.require_length] # 把数据从缓冲区中取走 self.buf = self.buf[self.require_length:] self.solve(data) class ReceiveFactory(Factory): def buildProtocol(self, addr): return ReceiveProtocol() def main(): print 'logserver has started.' logger = logging.getLogger('log_server') logger.info('logserver has started.') from twisted.internet import epollreactor epollreactor.install() reactor = twisted.internet.reactor reactor.listenTCP(settings.PORT, ReceiveFactory()) reactor.run() if __name__ == '__main__': main() 完整程序: https://github.com/vearne/log_server/tree/master ...

January 2, 2018 · 2 min

练习题(3) -- 另类的动态规划问题

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 题目如下: 已知 对于数字1 可以表达为 (1) 对于数字2 可以表达为 (1,1) (2) 解释 1 + 1 = 2 对于数字3 可以表达为 (1,1,1) (1, 2) (2, 1) (3) 1 + 1 + 1 = 3 1 + 2 = 3 2 + 1 = 3 求对于数字N 所有表达项 解法提示: 这是一道比较简单的动态规划问题 对于数字3如果我们把它的所有表达想这样书写 以1为开头的序列的后半部分的和是2 动态规划有两个要素,一个是备忘录,一个是递推公式 我们可以得到如下递推公式 注意:这里的加号和乘号不表示数学意义的加发和乘法 准确的说* 号表示连接,+ 号表示f(n) 的表达项是这些表达项的集合 f (n) = 1 * f(n -1) + 2 * f(n -2) … … + (n-1) * f(0) ...

January 2, 2018 · 2 min

一道穷举法算法题

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 题目是在topcoder看到的,但是不小心关了,不记得是题号了。 题目如下: 给定一组数,求哪这组数字能够组成的最长连续子序列,但其中比较特殊的是0可以代替任何数 注意: 其中的数字可以重复 限制条件: 数字的返回是从 0 ~ 1000000 包含0, 1000000 这组数的数量不超过 50 0, 6, 5, 10, 3, 0, 11 返回 5 解释: 如果把两个0,一个当做4, 一个当做7 那么可以得到最长连续子序列是 3, 4, 5, 6, 7 如果把两个0, 一个当做2, 一个当做4 那么可以得到最长连续子序列是 2, 3, 4, 5, 6 100,100,100,101,100,99,97,103 返回 3 解释: 最长连续子序列是 99, 100, 101 0, 0, 0, 1 , 2, 6, 8, 1000 返回 6 解释: 最长连续子序列是 1, 2, 3, 4, 5, 6 ...

January 2, 2018 · 2 min

多码加密 vigenere算法 python 实现

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 基于我自己对 vigenere 的理解,另外vigenere 属于非常弱的一种加密,用于生产环境不是非常安全请注意 # -*- coding:utf-8 -*- ################################## # Vigenere 是一种多码加密法 # author vearne # ***注意***: # 1) 字母表中必须要包含明文中出现的字母 # 2) 密钥不能为空 # ################################## class Vigenere(object): def __init__(self, table='0123456789', key='apple'): # 字母表 self.table = table # 密钥 self.key = key def genNum(self, curr): if curr + 1 >= len(self.key): return 0 else: return curr + 1 def dict(self, chr, move): index = self.table.index(chr) return self.table[(index + move) % len(self.table)] def encrypt(self, cleartext): # i 指向明文, j 指向密钥 j = 0 ll = [] for i in range(len(cleartext)): move = ord(self.key[j]) % len(self.table) # print 'move', move new_chr = self.dict(cleartext[i], move) ll.append(new_chr) j = self.genNum(j) return ''.join(ll) def decrypt(self, ciphertext): # i 指向密文, j 指向密钥 j = 0 ll = [] for i in range(len(ciphertext)): move = ord(self.key[j]) % len(self.table) move = move * (-1) # print 'move', move new_chr = self.dict(ciphertext[i], move) ll.append(new_chr) j = self.genNum(j) return ''.join(ll) if __name__ == '__main__': v = Vigenere(key='apple077226') cleartext = '000000668' print cleartext ciphertext = v.encrypt(cleartext) print ciphertext print '----------------------------------' cleartext = v.decrypt(ciphertext) print cleartext 如果字母表中的字母出现不重复,则可以保证明文跟密文的一一映射,如果出现重复,则会出现明文跟密文的多对一映射。 ...

January 2, 2018 · 1 min

shell 分割文件

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 帮同事写的小程序 140822,1406181801491716879,221.203.75.168,20140822000014 140823,1408051715321587060,101.28.174.242,20140822000127 140823,1408051715321587060,101.28.174.242,20140822000129 140824,1408051715321587060,101.28.174.242,20140822000139 140824,1406031640261808247,110.254.245.82,20140822000205 140825,1305230023521467300,210.73.6.180,20140822000216 140825,1408181402431171048,110.243.255.56,20140822000216 140825,1408131900341325654,110.248.233.239,20140822000216 140825,1407071756131811923,27.213.51.178,20140822000228 140826,1408171201311863011,124.67.26.134,20140822000238 某个数据文件的内容如上,每行的第一部分是时间140822 2014年8月22日 要按不同的日期分割文件, shell脚本如下: cat data.txt | while read line do if [ -n "$line" ] then echo $line echo ${line%%,*} echo $line >> "${line%%,*}.txt" echo "${line##*,}" fi done 实际运行中,速度非常差,无法容忍,由于时间的起止是从140822 至 140826 所以程序改为 date for((i=140818;i<=140826;i++));do echo $i; awk '{if(/^'$i'/)print $0;}' data.txt >$i.data.csv done; date 这样速度有了明显的提升,推断速度提升的原因是减少了IO操作次数,子进程创建和销毁的次数也减少了 尝试使用grep替代awk命令发现速度更快 date for((i=140818;i<=140826;i++));do echo $i; grep "$i," data.txt > "${i}.txt" done; date 使用awk命令耗时为6分钟左右,而使用grep 命令耗时大约为3分多钟

January 2, 2018 · 1 min

谈谈我对python sys.path的理解

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前段时间在配置apache服务器时用到了这个参数,所以就特别查了一下 >>> import sys >>> print sys.path ['', '/home/pig', '/usr/lib/python2.6', '/usr/lib/python2.6/plat-linux2', '/usr/lib/python2.6/lib-tk', '/usr/lib/python2.6/lib-old', '/usr/lib/python2.6/lib-dynload', '/usr/lib/python2.6/dist-packages', '/usr/lib/python2.6/dist-packages/PIL', '/usr/lib/python2.6/dist-packages/gst-0.10', '/usr/lib/pymodules/python2.6', '/usr/lib/python2.6/dist-packages/gtk-2.0', '/usr/lib/pymodules/python2.6/gtk-2.0', '/usr/local/lib/python2.6/dist-packages'] >>> 在python 调试模式中导入sys模块,就可以打印出它的值。 python 的官方文档关于此值是这样说的。 A list of strings that specifies the search path for modules. Initialized from the environment variable PYTHONPATH, plus an installation-dependent default. 也就是说python解释器使用此值来,搜索模块,同时这个列表是由两部分组成的,一部分是从环境变量PYTHONPATH中取出的,另一部分中是安装时的默认值。 我做了测试,默认情况下,是没有这个环境变量的,因此我在环境变量中加入了PYTHONPATH 在~/.bashrc 中加入以下内容 PYTHONPATH="/home/pig" export PYTHONPATH 大家可以看到在上面的运行结果中,已经生效了。除了这个,大家还能看到 1)此列表的第一个元素是一个空字符串,它表示python解释器运行的当前目录 2) /usr/lib/python2.6 像这种都是python相应库的安装目录 最后再来看一个实验,打开两个窗口,各启动一个python 解释器,修改其中其中一个sys.path,看对另一个解释器是否有影响 1) >>> import sys >>> print len(sys.path) 14 >>> sys.path.append('/home/aotian/test') >>> print len(sys.path) 15 >>> >>> import sys >>> print len(sys.path) 14 >>> print len(sys.path) 14 >>> 从对比结果可以看出,刚开始两个解释器的len(sys.path)都等于14,给第一个解释器增加元素,第二个解释器并没有收到影响。 ...

January 2, 2018 · 1 min

关于happybase中 row_prefix 参数

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 使用happybase 访问hbase 时 def scan(self, row_start=None, row_stop=None, row_prefix=None, columns=None, filter=None, timestamp=None, include_timestamp=False, batch_size=1000, scan_batching=None, limit=None, sorted_columns=False): scan 函数中有一个row_prefix 参数,而这个参数在java client 对应函数并没有出现,它到底有什么作用呢 查看源码,我们能看到 if row_prefix is not None: if row_start is not None or row_stop is not None: raise TypeError( "'row_prefix' cannot be combined with 'row_start' " "or 'row_stop'") row_start = row_prefix row_stop = str_increment(row_prefix) str_increment 的具体代码 def str_increment(s): """Increment and truncate a byte string (for sorting purposes) This functions returns the shortest string that sorts after the given string when compared using regular string comparison semantics. This function increments the last byte that is smaller than ``0xFF``, and drops everything after it. If the string only contains ``0xFF`` bytes, `None` is returned. """ for i in xrange(len(s) - 1, -1, -1): if s[i] != '\xff': return s[:i] + chr(ord(s[i]) + 1) return None 看完代码大家应该很明白了,row_prefix 被转换成了row_start 和row_stop。 当有如下场景 ...

January 2, 2018 · 1 min

使用logging模块来记录异常

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 在使用Java的时候,用log4j记录异常很简单,只要把Exception对象传递给log.error方法就可以了,但是在Python中就不行了,如果直接传递异常对象给log.error,那么只会在log里面出现一行异常对象的值。 在Python中正确的记录Log方式应该是这样的: logging.exception(ex) # 指名输出栈踪迹, logging.exception的内部也是包了一层此做法 logging.error(ex, exc_info=1) # 更加严重的错误级别 logging.critical(ex, exc_info=1)

January 2, 2018 · 1 min

利用redis实现带优先级的消息队列

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 前言 以前一直有使用celery的优先级机制(基于redis的任务队列),一直很好奇它的实现机制,在查阅了部分资料后,决定写这篇文章,作为总结。 1. 利用Sorted Set 实现 使用Sorted Set 做优先级队列最大的优点是直观明了。 ZADD key score member [[score member] [score member] ...] score 作为优先级,member 作为相应的任务 在Sorted Set 中,score 小的,位于优先级队列的头部,即优先级较高 由于score 就是menber的优先级,因此非常直观 可以使用 MULTI ZRANGE key 0 0 WITHSCORES ZREMRANGEBYRANK task_list 0 0 EXEC 来获取任务队列中优先级最高的元素 ZRANGE 用于获取任务,ZREMRANGEBYRANK 用于从消息队列中移除 注意:由于Sorted Set本身是一个set,因此消息队列中的消息不能重复,否则新加入的消息会覆盖以前加入的消息 注意:对于score 相同的消息,Sorted Set 会按照字典序进行排序 2. 利用List实现 应该一下就能想到,list 是作为消息队列的最理想的选择,但这里使用list 实现带优先级的消息队列也可以有好几种不同的实现方式。 2.1 准备 首先,如果我们假定消息队列中的消息,从消息队列的右侧推入(RPUSH),从左侧取出(LPOP) 那么单个list 很容易构造成一个FIFO 队列。但是如果优先级只有两级,高和低,那么我们可以把高优先级的消息,使用LPUSH 推入队列左侧,把低优先级的消息,使用RPUSH推入到队列右侧, 这样单个list就可以实现2级的带优先级的消息队列。 2.2 使用BLPOP redis 提供了列表的阻塞式(blocking)弹出原语。 ...

January 2, 2018 · 2 min

python logging 最佳实践

版权声明 本站原创文章 由 萌叔 发表 转载请注明 萌叔 | http://vearne.cc 起因 我经常跟同事开玩笑,我说在一家公司里面,能把日志这个功能搞清楚的都没有几个。所以写篇文章把我知道的部分知识分享一下。 在我目前看到的日志的文档中,Python的官方的文章是最清晰明了,推荐大家都来阅读下 https://docs.python.org/2/howto/logging.html 这个流程图非常重要,希望朋友们能仔细看看。 1. 适用场景 1.1 一般场景 在一般情况下,我们最常用的的handler有两个: RotatingFileHandler 按设定的文件大小切分日志 TimedRotatingFileHandler 按时间切分日志 以上2个handler都是线程安全的,可以用于多线程的场景,对于多进程则需要考虑其它方法 1.2 多进程 对于多进程的场景,python官方文档推荐我们使用 SocketHandler 使用TCP数据传输日志 可以参考我的文章 python 日志收集服务器 DatagramHandler 使用UDP数据传输日志 在单机上,即日志发送client 和 日志收集server 在同一台机器上,压测结果 type record/second TCP 6000 UDP 9000 除此之外还可以考虑一下两种方法: python-logstash 使用logstash,打开logstash的UDP或者TCP的服务端口直接接受数据,收到的数据可以入ElasticSearch 或者直接输出到文件中 自定义新的handler 将日志记录存入redis 的某个队列中,再额外启动一个进程,从redis中把数据读出入到文件中 2. 日志的级别 Level Numeric value CRITICAL 50 ERROR 40 WARNING 30 INFO 20 DEBUG 10 NOTSET 0 一般情况下日志文件只需要分成两个即可 all.log 存储 >= INFO 级别的日志 测试环境可以开到DEBUG 级别 可以跟踪业务流程等等 error.log 存储 >= ERROR 级别的日志 便于快速排查故障 一个常见的日志初始化模块可以这样书写 logger_helper.py ...

January 2, 2018 · 1 min