tornado

tornado是异步IO服务框架,当然并不是严格的异步IO。IOCP才是真正的异步IO,主要在windows系统里面。
tornado的异步在于通过Python select模块的epoll/kqueue实现了非阻塞,默认是水平触发(Level Triggered),在承载一定的访问压力下保证性能和稳定。nginx常用来做反向代理,要承担大量的外部访问请求,默认是边缘出发的(Edge Triggered)。

具体实现在于IOLoop,IOStream,asynchronous和gen.coroutine。

IOLoop

IOLoop初始化时创建一个Waker对象,将Waker对象fd的读端注册到事件循环中并设定相应的回调函数,当事件循环阻塞而没有响应描述符出现,会在最大timeout时间之前返回,向管道写入字符’x’唤醒阻塞,同时在IOLoop的stop函数中调用self._waker.wake(),停止事件循环。

add_handler方法使用stack_context提供的wrap方法,返回一个可以直接调用的对象并且保存传入之前的堆栈信息,在执行时恢复,保证了函数的异步调用在正确的运行环境。

IOLoop的核心调度start方法中,IOLoop实例对象调用start后开始epoll事件循环机制,一直运行直到 IOLoop对象调用stop函数。
start方法中主要分三个部分:对超时的相关处理;epoll事件通知阻塞、接收;对epoll返回I/O事件的处理。

  • 为了实现异步,将回调函数延迟到下一轮事件循环中执行。
  • 超时的处理使用heapq,记录每个回调函数的超时时间(deadline),每次弹出deadline最小的回调函数,如果callback标志位为True且已超时,通过_run_callback调用函数;如果没有超时重新设定poll_timeout的值。
  • 通过self._impl.poll(poll_timeout)进行事件阻塞,当有事件通知或超时,poll返回特定的event_pairs。
  • epoll返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象以及执行上下文调用事件处理。

IOStream

IOStream使用deque容器和_merge_prefix(deque, size)函数封装socket的非阻塞IO的读写操作,包括数据的缓存和重组,read_util()接口设置一个标志字符串和回调函数,当IOStream读到标志字符串时自动调用该回调函数,做数据的分片处理。

tornado的tcpserver使用IOStream,httpserver通过tcpserver实现,同理httpclient也是如此。

asynchronous

被@asynchronous装饰的handler,会使http请求成为长连接(tcp长连接要注意粘包问题,http长连接可能自带解决方案),一直处于等待状态,直到调用self.finish()结束请求,优势在于不会阻塞其他的http请求的访问。

1
2
3
4
5
6
7
8
9
10
class DebugHandler(RequestHandler):
@asynchronous
def get(self):
http_client = AsyncHTTPClient()
http_client.fetch("http://example.com", callback=self.on_fetch)
def on_fetch(self, response):
do_something_with_response(response)
self.render("template.html")

注意例子中,self.render方法的实现最后调用了self.finish。

gen.coroutine

被@asynchronous装饰的handler是同步的代码异步执行,tornado3以后,强化coroutine,替代了gen.engine+asynchronous,简化在handler异步编程,避免写回调函数。

1
2
3
4
5
6
7
8
class DebugHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")

注意上面两个例子是等价的。
tornado的coroutine靠python中generator的send和yield挂起实现。第一次next()返回yield结果,并将程序挂起,等待send或者next方法唤醒,继续执行,send可以在function的执行过程中做外部输入同是触发yield,next只能触发yield取返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def Generator():
i = 0
while True:
m = yield i
print 'receive m', m
i += m
if i > 3:
break
g = Generator()
result = g.next()
print 'yield i', result
result = g.send(1)
print 'yield i', result
result = g.send(2)
print 'yield i', result
result = g.send(3)
print 'yield i', result

上面的例子体现yield的神奇。

asynchronous和gen.coroutine混用要注意Callback和Wait/WaitAll里面的key,可以通过Callback在不同的时候启动多个并行的异步操作,并且执行key关联的等待操作。如果Callback没有参数,对应的Wait就没有返回值;如果Callback有固定的参数,对应的Wait的返回值就是参数,类似C#的out参数;如果Callback传入变长参数,返回值就是元组(args, kwargs)。

asynchronous和gen.coroutine混用还可以给IOLoop里的callback安排优先级。

flask

flask是同步IO服务框架,通过werkzeug调用Python SocketServer,SocketServer调用Python select模块的select实现,因此是阻塞的。

MethodView

flask一般情况直接用function组织handler就可以,但是flask也支持类handler来组织,在flask里面叫 MethodView。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class UserAPI(MethodView):
def get(self, user_id):
if user_id is None:
# 返回一个包含所有用户的列表
pass
else:
# 显示一个用户
pass
def post(self):
# 创建一个新用户
pass
def delete(self, user_id):
# 删除一个用户
pass
def put(self, user_id):
# update a single user
pass
user_view = UserAPI.as_view('user_api')
app.add_url_rule('/users/', defaults={'user_id': None},
view_func=user_view, methods=['GET',])
app.add_url_rule('/users/', view_func=user_view, methods=['POST',])
app.add_url_rule('/users/<int:user_id>', view_func=user_view,
methods=['GET', 'PUT', 'DELETE'])

blinker

blinker在执行环境中,提供快捷的object-to-object和broadcast信号

  • 全局信号,通知相关的对象操作
  • 匿名信号
  • 自定义信号
  • 永久或者临时地连接接收者
  • 通过弱引用weak referencing,自动断开连接
  • 因为是内存的数据通信,支持任意数据格式
  • 从接收者手机数据
  • 线程安全

gunicorn

Gunicorn是一个Python WSGI UNIX的HTTP服务器,是pre-fork worker的模型,从Ruby的unicorn项目移植,与各种Web框架兼容,配用简单,资源消耗轻量级,抗压能力强。

  • 本身支持WSGI、Django、Paster
  • 自动辅助进程管理
  • 简单的 Python配置
  • 允许配置多个工作环境
  • 各种服务器的可扩展钩子

诛仙

诛仙之两小无猜

mistake

青梅竹马,语出唐李白《长干行》之一:“郎骑竹马来,绕床弄青梅。同居长干里 ,两小无嫌猜。”后以“青梅竹马”形容男女儿童之间两小无猜的情状,一般长大后都是牵着别人的手,或者偎在别人的怀里,偶尔想念竹马,偶尔想念青梅。

张小凡与田灵儿的青梅竹马情是很平淡有味的。

张小凡的悲惨遭遇从小时候开始,平淡幸福的村庄生活被毁,和林惊羽被带上青云山后,完全处于两种不同的待遇。林惊羽是山上师傅争相抢夺的香饽饽,而张小凡是没人要的蠢蛋,最后被道玄强塞到大竹峰田不易门下,这也算是张小凡幸运的开始。

都说时间能治愈一切,只说对了一半,还要看时间带来了什么。如果时间带来的依然是伤痛,却是会抹掉你过去的伤痛,那是因为你越来越麻木了,新的伤痛让你忘却旧的伤痛,而且是新的伤痛大于旧的伤痛。大竹峰上,田不易虽然恨铁不成钢,但是对徒弟还是关爱的,除了骂骂叨叨,也不会强迫弟子成才。张小凡挨得骂,估计是最多的,这有什么关系,至少还能感受到长辈的关怀,替代了双亲的一种生活存在。一群师兄弟,是不争气,天天都乐呵呵的,也挺好,多年以后的张小凡牛逼哄哄的,又怎么样呢,他还是怀念过往的平淡。

我不喜欢李易峰,唐艺昕挺不错的。《青云志》里面唐艺昕饰演的田灵儿,在张小凡上山后给予了及时的照顾,她的热情,她的活泼,她的烂漫,轻松地将张小凡的注意力从悲伤里面转移出来,使张小凡经历不正常的遭遇后,能正常地成长。她和张小凡练剑的情景,让我想起了《笑傲江湖》的令狐冲和岳灵珊,不一样的是田灵儿是张小凡的师姐,是骄纵的田灵儿在关心张小凡,虽然未必到位,令狐冲是岳灵珊的师兄,令狐冲多半是关心容纳师妹的骄纵,令狐冲当然不用师妹照顾。所以张小凡对田灵儿的依赖要大于令狐冲对岳灵珊的依赖。令狐冲旷达灵动,有一帮酒肉朋友可以解闷。张小凡比较呆笨,没什么朋友,只能自说自话。

李易峰饰演的张小凡,在与田灵儿这一段情的演绎上,还是挺到位的,对田灵儿的默默关怀,对田灵儿蠢蠢欲动的表白,对田灵儿每每诉说关于齐昊的心事的无奈,不需要太多的言语,全在眼神动作里,全在虹桥的寂寞夜色里。

田灵儿虽然没心没肺的直率,但是对于感情,还是比较认真,认准。最后田灵儿也收获了满满的幸福,不像岳灵珊最后惨兮兮的坚贞。

张小凡一直将这段感情埋在心底,望着田灵儿奔向齐昊的背影说:“青梅是酸涩的,就由她去吧,只要她好。”

go二分法和牛顿迭代法求平方根

二分法

  • 求根号5
  • 折半:5/2=2.5
  • 平方校验: 2.5*2.5=6.25>5,并且得到当前上限2.5
  • 再次向下折半:2.5/2=1.25
  • 平方校验:1.25*1.25=1.5625<5,得到当前下限1.25
  • 再次折半:2.5-(2.5-1.25)/2=1.875
  • 平方校验:1.875*1.875=3.515625<5,得到当前下限1.875

牛顿迭代法

newton
可以理解函数f(x) = x²,使f(x) = num的近似解,即x² - num = 0的近似解。

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main
import (
"fmt"
"math"
)
func NewtonSqrt(num float64) float64 {
x := num / 2.0
var y float64 = 0
count := 1
for math.Abs(x-y) > 0.00000001{
// fmt.Println(count, x)
count += 1
y = x
x = (1.0/2.0)*x+(num*1.0)/(x*2.0)
}
return x
}
func BinarySqrt(num float64) float64 {
y := num/2.0
low := 0.0
up := num
count := 1
for math.Abs(y * y - num) > 0.00000001{
// fmt.Println(count, y)
count += 1
if y*y > num{
up = y
y = low+(y-low)/2
}else{
low = y
y = up-(up-y)/2
}
}
return y
}
func main() {
fmt.Println("math sqrt", math.Sqrt(5))
fmt.Println("Newton sqrt", NewtonSqrt(5))
fmt.Println("binary sqrt", BinarySqrt(5))
}

精度0.00000001,在亿分之一;
二分法经过27次迭代;
牛顿法只迭代了3次,是二分法的十倍;
系统sqrt是最快最准的,不知道采用了什么原理实现的?

celery之Chains, Groups, Chords, Map & Starmap, Chunks

celery是一个独立的server进程,可以用redis、rabbitmq、mongo作为queue,flower是可视化的管理后台。
如下调用task装饰的func.apply_async就是异步执行,或者调用send_task。

1
2
3
4
5
6
7
8
9
[lavenderuni@~]# celery multi start w1 -A asynctask -l info -c 1
celery multi v3.1.23 (Cipater)
> Starting nodes...
<class 'asynctask.lib.DefaultRegistry'>
> w1@LavenderUnideMacBook-Pro.local: OK
[lavenderuni@~]# celery multi stop w1 -A asynctask -l info
celery multi v3.1.23 (Cipater)
> w1@LavenderUnideMacBook-Pro.local: DOWN

以上是启动停用命令,项目实例参照asynctask,-l参数是日志输出等级控制(debug是调试模式,会输出更多执行详情信息,生产环境不要轻易开启debug模式),-c是并发进程数目控制。

通过celery的Chains, Groups, Chords, Map & Starmap, Chunks,可以组合出各种复杂的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
app = Celery("task", backend="redis://localhost:6379/1", broker="redis://localhost:6379/0")
@app.task
def add(x, y):
return x + y
@app.task
def tsum(numbers):
return sum(numbers)
@app.task
def trange(limit):
return range(limit)
result = add.apply_async((2, 2), link=add.s(16))

Chains

串行调用,可以将signature任务按照顺序执行,前一个任务的输出是后一个任务的输入,结果是最后一个signature任务的输出。

1
2
3
4
5
6
7
8
9
10
11
from celery import chain
result = chain(add.s(2, 2), add.s(4), add.s(8))() # 2 + 2 + 4 + 8
result.get() # 16
result.parent.parent.graph
# cd959635-47d2-4368-bdf1-ab969f9ce0e4(1)
# 6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)
# 6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)
# 87eac0e5-1f1b-4c0d-a27a-dbf7e7ccd925(2)
# cd959635-47d2-4368-bdf1-ab969f9ce0e4(1)
# 6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)

Groups

并行调用,可以让signature并行执行,返回的结果是所有signature任务返回结果组成的数组。

1
2
3
4
from celery import group
result = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))() # 2 + 2, 4 + 4, 8 + 8
result.get() # [14, 8, 16]

Chords

先并行调用,再串行调用,把并行signature任务的结果列表输入到串行调用,进行汇总,是reduce过程。

1
2
3
4
5
6
7
from celery import group, chain, chord
result = chain(group(add.s(2, 2), add.s(4, 4), add.s(8, 8)), tsum.s())() # sum([2 + 2, 4 + 4, 8 + 8])
result.get() # 28
result = chord((add.s(2, 2), add.s(4, 4), add.s(8, 8)), tsum.s())()
result.get() # 28

Map & Starmap

  • Map,对并行调用的结果各自汇总

    1
    2
    3
    ~tsum.map([range(2), range(2), range(2)]) # [1, 1, 1]
    result = chain(trange.s(2), group(tsum.s(), tsum.s(), tsum.s()))()
    result.get() # [1, 1, 1]
  • Starmap,对并行调用的结果各自汇总,汇总参数是tuple,相当于*args

    1
    ~add.starmap(zip(range(10), range(10))) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Chunks

对大任务进行分割,分成小块执行,提高性能,将结果收集成列表。

1
2
3
4
5
6
7
8
9
10
11
12
result = add.chunks(zip(range(100), range(100)), 10)()
result.get()
#[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
# [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
# [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
# [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
# [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
# [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
# [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
# [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
# [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
# [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]

爬虫之路(3)

任务去重

  • 少量数据,比如几万或者十几万条的情况,使用Map或Set
  • 中量数据,比如几百万或者上千万,使用BloomFilter
  • 大量数据,上亿或者几十亿,Redis+BloomFilter可以解决,getbit、setbit天生支持bitmap,注意redis的bitmap只支持2^32大小,对应到内存也就是512MB,数组的下标最大只能是2^32-1,这个限制可以通过构建多个redis的bitmap突破

bitmap

bitmap用一个bit位来标记某个元素对应的value,而key即是这个元素。由于采用bit存储数据,因此大大节省存储空间,利用少量的时间来换取大量的空间,从而完成大量数据的排序、查询和逻辑运算。

Bloom Filter
将原数据通过随机映射函数映射到一个很长的二进制向量,布隆过滤器可用于检索一个元素是否在一个集合中,时间复杂度和空间复杂度的综合指标远远优于一般算法,但由于哈希冲突和数据量的增加,导致一定的误识别率,如果Bloom Filter报告某个元素不在集合中,则以一定不在;如果报告该元素在集合中,有可能在或者不在;可以增加多个bitmap,关联不同的hash函数,取否定的交集来提高bloom filter的准确率。
python版本BloomFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
>>> from pybloom import BloomFilter
>>> f = BloomFilter(capacity=1000, error_rate=0.001)
>>> [f.add(x) for x in range(10)]
[False, False, False, False, False, False, False, False, False, False]
>>> all([(x in f) for x in range(10)])
True
>>> 10 in f
False
>>> 5 in f
True
>>> f = BloomFilter(capacity=1000, error_rate=0.001)
>>> for i in xrange(0, f.capacity):
... _ = f.add(i)
>>> (1.0 - (len(f) / float(f.capacity))) <= f.error_rate + 2e-18
True
>>> from pybloom import ScalableBloomFilter
>>> sbf = ScalableBloomFilter(mode=ScalableBloomFilter.SMALL_SET_GROWTH)
>>> count = 10000
>>> for i in xrange(0, count):
... _ = sbf.add(i)
...
>>> (1.0 - (len(sbf) / float(count))) <= sbf.error_rate + 2e-18
True

将抓取流程的数据的特征串散列到bit map上面,根据bit map检测去重。

primary key(id)

利用数据库本身支持的唯一性字段,将抓取流程的数据的特征串转成md5,作为mongo的主键_id即可去重,或者其他数据库unique index.

爬虫之路(2)

爬虫队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
# coding=utf-8
class Queue(object):
def __init__(self, .....):
......
def _init(self, ......):
......
def _put(self, ......):
......
def _get(self, ......):
......
def task_done(self, ......):
......
def join(self):
......
if __name__ == '__main__':
pass

队列主要为了支持多线程间的通信,set、get同步,有task_done,有join,能支持查询和热操作就最好。

queue

优先级,单进程阻塞完成,不支持分布式,无状态,不支持条件查询

Python的标准队列支持Queue(FIFO),LifoQueue(LIFO),PriorityQueue(Priority) 三种队列,其中Queue和LifoQueue支持join。PriorityQueue是通过heapq实现的大顶堆、小顶堆来支持优先级。

因此如果程序需要priority和join功能,就通过继承Queue,自定义_put和_get的方式实现。

beanstalkd

beanstalkd支持0到2**32的优先级,值越小,优先级越高,默认优先级为1024;通过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时可以通过读取binlog来恢复之前的job及状态;服务端的原子操作同步支持客户端分布式。

redis-queue

通过多list的原子操作blpop、brpop实现优先级有限的队列。

mongo-queue

通过原子操作find_and_modify和状态字段实现队列,有排序字段支持优先级。

redis队列

redis的list支持brpop、blpop,所以能实现队列,fifo、lifo、priority队列可以直接由lpush/rpush和brpop/blpop的组合实现。

  1. fifo和lifo队列,用一个key的list就行了,只做参考。更复杂的功能如get的block、timeout可以照着redis api实现就行了,队列的block需要用到lock,event之类的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import redis
class FifoRedisQ(object):
def __init__(self, host='localhost', port=6379, db=0, label='test'):
self.r = redis.Redis(host=host, port=port, db=db)
self.label = label
def get(self):
return self.r.brpop(self.label)[-1]
def put(self, item):
self.r.lpush(self.label, item)
class LifoRedisQ(object):
def __init__(self, host='localhost', port=6379, db=0, label='test'):
self.r = redis.Redis(host=host, port=port, db=db)
self.label = label
def get(self):
return self.r.blpop(self.label)[-1]
def put(self, item):
self.r.lpush(self.label, item)
if __name__ == '__main__':
fifoq = FifoRedisQ()
print '----first in first out'
for k in range(0, 10):
fifoq.put(k)
for k in range(0, 10):
print fifoq.get()
print '----last in first out'
lifoq = LifoRedisQ()
for k in range(0, 10):
lifoq.put(k)
for k in range(0, 10):
print lifoq.get()
  1. priority队列,每一个优先级需要用一个key对应一个list,每一个list就是存储该等级的数据channel。有多少种优先级是提前设定好的,不能动态添加。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import redis, random
class PriorityRedisQ(object):
def __init__(self, host='localhost', port=6379, db=0, label='test', weight=[]):
self.r = redis.Redis(host=host, port=port, db=db)
self.label = label
if not weight:
raise Exception("Please set weight of queue.")
self.weight = weight
def get(self):
return self.r.brpop(['_'.join([self.label, str(one)]) for one in self.weight])[-1]
def put(self, priority, item):
self.r.lpush('_'.join([self.label, str(priority)]), (priority, item))
if __name__ == '__main__':
weight = [1, 2, 3, 4]
prq = PriorityRedisQ(weight=weight)
print '----priority'
for k in range(0, 10):
prq.put(random.choice(weight), k)
for k in range(0, 10):
print prq.get()

市面上有那么多好用的队列,你可以不选redis,看具体的业务需要吧,参考webcrawl的redis priority queue

优先级队列 --- 等级队列 vs 权重队列

队列是一种安排数据辅助程序处理的工具,常见的有fifo队列,lifo队列和priority队列。

priority队列里面根据容器实现的局限,可以分为权重队列和等级队列,其中权重队列>=等级队列。权重队列是指每一条入队列的数据的priority都有可能不同,并且priority很随机,不同的情况很多;等级队列是指所有的数据的priority可以分为有限的几类,每一类对应一个数据channel,逻辑层面个人觉得最好是不超过5吧,具体情况还得看工具支持的情况。实际上等级队列更像数据的priority分类取,权重队列则是数据的priority排序取。

权重队列

权重队列的priority基本不受约束,使用起来比较自由,只要容器支持数据的排序和数据同步,就能基于该容器实现权重队列。
例如:

  • 大顶堆和小顶堆支持堆排序,然后通过锁实现数据同步,因此程序语言实现的标准队列的priority队列都是权重队列
  • mongo是数据库,支持字段排序,并且有find_and_modify可以实现数据同步,也可以实现权重队列。
  • beanstalkd的内部协议支持排序,所以是权重队列。

等级队列

等级队列的priority在程序员编码的时候就固定了,是像常量一样执行前设定的,数的清的,不能动态添加设定之外priority数据,一般实现等级队列的容器需要支持数据channel,当然必须是数据同步。
例如:

  • redis的list可以作为数据channel使用,然后通过blpop、brpop实现数据同步,如果sorted set不支持数据同步,就可以实现权重队列。
  • rabbitmq,多么强大的队列,是通过数据channel实现等级队列,3.5.0版本以后有rabbitmq_priority_queue插件,注意设置x-max-priority属性,否则rabbitmq-server服务会crash。

权重队列比等级级队列自如,而且权重队列是可以当等级队列用的,反过来是不行的。在特殊情况权重队列无法替代等级队列,例如priority值一定要用high、low或者其他有意义的字符串来标识,而这类字符串的通用比较又不是正确的priority,除非基础环境支持良好的hack。能不能实现权重队列,就看容器里面的数据是否方便排序了。