yield-bytes

沉淀、分享与无限进步

gevent与协程

1、 yield 实现协程

1.1 yield 同步执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
def consumer():
send_msg=''
while 1:
# 3、consumer通过yield拿到producer发来的消息,又通过yield把结果send_msg返回给producer
output=yield send_msg
print('[consumer] consuming {}'.format(output))
send_msg='ok'

def producer(consumer_obj,num):
# 1、启动consumer()生成器
next(consumer_obj)
for i in range(1,num+1):
print('[producer] producing {}'.format(i))

# 2、通过send()切换到consumer()执行
receive_msg=consumer_obj.send(i)
print('[producer] received a message {}'.format(receive_msg))
consumer_obj.close()

if __name__=='__main__':
c=consumer()
producer(c,5)

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[producer] producing 1
[consumer] consuming 1
[producer] received a message ok
[producer] producing 2
[consumer] consuming 2
[producer] received a message ok
[producer] producing 3
[consumer] consuming 3
[producer] received a message ok
[producer] producing 4
[consumer] consuming 4
[producer] received a message ok
[producer] producing 5
[consumer] consuming 5
[producer] received a message ok

整个过程无锁,由一个线程执行,producer和consumer协作完成任务,但是以上无法实现并发,生产1个,消费1个,也即1个生产者对应1个消费者

1.2 启动多个yield模拟consumer并发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import time,datetime

def get_time():
d=datetime.datetime.now()
return '%s:%s:%s'%(d.hour,d.minute,d.second)

def consumer(consumer_index):
print('consumer-{} started at {}'.format(consumer_index,get_time()))
send_msg=''
# 消费者保持监听producer的发来的信息
while True:
# 3、consumer通过yield拿到producer发来的消息,又通过yield把结果send_msg返回给producer
output=yield send_msg
print('[consumer-{}] consuming {} at {}'.format(consumer_index,output,get_time()))
time.sleep(1) # 模拟IO耗时操作
send_msg='ack'


def producer(consumer_obj,consumer_num,count):
# 1、启动n个consumer()生成器,相当于用协程方式模拟并发
consumers=[consumer_obj(i) for i in range(consumer_num) ]
for each_cons in consumers:
next(each_cons)


for i in range(count):
print('[producer] producing {} at {}'.format(i,get_time()))
# 2、对每个consumer_obj使用send()切换到consumer()执行
for index,each_cons in enumerate(consumers):
receive_msg=each_cons.send(i)
print('[producer] received {} from consumer-{}'.format(receive_msg,index,get_time()))

for each_cons in consumers:
each_cons.close()

if __name__=='__main__':
producer(consumer,5,2)

1个producer,5个consumer

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
consumer-0 started at 21:12:45
consumer-1 started at 21:12:45
consumer-2 started at 21:12:45
consumer-3 started at 21:12:45
consumer-4 started at 21:12:45
[producer] producing 0 at 21:12:45
[consumer-0] consuming 0 at 21:12:45
[producer] received ack from consumer-0
[consumer-1] consuming 0 at 21:12:46
[producer] received ack from consumer-1
[consumer-2] consuming 0 at 21:12:47
[producer] received ack from consumer-2
[consumer-3] consuming 0 at 21:12:48
[producer] received ack from consumer-3
[consumer-4] consuming 0 at 21:12:49
[producer] received ack from consumer-4
[producer] producing 1 at 21:12:50
[consumer-0] consuming 1 at 21:12:50
[producer] received ack from consumer-0
[consumer-1] consuming 1 at 21:12:51
[producer] received ack from consumer-1
[consumer-2] consuming 1 at 21:12:52
[producer] received ack from consumer-2
[consumer-3] consuming 1 at 21:12:53
[producer] received ack from consumer-3
[consumer-4] consuming 1 at 21:12:54
[producer] received ack from consumer-4

以上运行过程确实是协程运行,但yield无法自动切换协程,上面的运行过程打印出的实际可以发现代码同步执行:
5个consumer同时启动,当producer生产1个数据,consumer-0消费数据,而consumer内部有IO耗时操作(time.sleep(1)模拟IO),此时代码逻辑没有把线程当前控制权从consumer-0自动切换到consumer-1,consumer-1等待前面1秒后,才能接着干活。

2 、greenlet实现的协程

2.1 简单gevent协程例子

  greenlet是一个用C实现的协程模块,相比于上面使用python的yield实现协程,greenlet可以无需将函数声明为generator的前提下,用手动方式在任意函数之间切换。但在实际使用,往往不会直接使用greenlet,因为它遇到有IO地方是不会自动切换,而Gevent库可以实现这个需求,gevent是对greenlet的封装,实现自动切换,大体的设计逻辑如下:
  当一个greenlet(你可以认为这个greenlet是一个协程对象,类比于线程对象thread)遇到IO操作时,比如访问读取文件或者网络socket连接,它会自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换原来位置继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import gevent,datetime
def f1():
print('f1 started at',get_time())
# gevent模拟IO耗时操作,并且gevent会在此保留现场后自动切换到其它函数
gevent.sleep(4)
print('f1 done at',get_time())

def f2():
print('f2 started at',get_time())
gevent.sleep(2)
print('f2 done at',get_time())

def f3():
print('f3 started at',get_time())
gevent.sleep(3)
print('f3 done at',get_time())
if __name__=='__main__':
gevent.joinall([gevent.spawn(f1),gevent.spawn(f2),gevent.spawn(f3)])

打印结果

1
2
3
4
5
6
f1 started at  09:21:17.066251
f2 started at 09:21:17.066355
f3 started at 09:21:17.066388
f2 done at 09:21:19.067741
f3 done at 09:21:20.067747
f1 done at 09:21:21.067812

可以看到,gevent在同一时刻运行3个函数,并且,f2先完成,接着f3完成,最后IO耗时最长的f1完成,三个函数共同完成耗时为4秒,说明三个函数并发执行了。如果是同步运行,整个过程耗时为4+2+3=9秒耗时,协程优势凸显。

2.2 gevent 高并发测试
1
2
3
4
5
6
7
8
9
10
11
def task(task_index):
gevent.sleep(1)
print('task-{} done at {} '.format(task_index,datetime.datetime.now()))

def syn(n):
for i in range(n):
task(i)

if __name__=='__main__':
syn(4)

同步情况下,耗时4秒

1
2
3
4
task-0 done at  09:41:14.075001 
task-1 done at 09:41:15.076049
task-2 done at 09:41:16.077101
task-3 done at 09:41:17.078055

gevent实现的协程异步

1
2
3
4
5
def asyn(n):
coroutines=[gevent.spawn(task,i) for i in range(n)]
gevent.joinall(coroutines)
if __name__=='__main__':
asyn(4)

原本需要4秒的执行流,现在只需1秒完成所有任务。

1
2
3
4
task-0 done at  09:44:30.535495 
task-1 done at 09:44:30.535749
task-2 done at 09:44:30.535801
task-3 done at 09:44:30.535833

尝试启动10万个任务,用line_profiler 查看函数中耗时操作(line_profiler 目前不兼容3.7,最好用pyenv 切换到3.6进行测试)。只需要在asyn函数上加@profile装饰器即可

创建asyn.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent,datetime,time
def task(task_index):
gevent.sleep(1)
#print('task-{} done at {} '.format(task_index,datetime.datetime.now()))

@profile
def asyn(n):
threads=[gevent.spawn(task,i) for i in range(n)]
gevent.joinall(threads)

if __name__=='__main__':
start=time.time()
asyn(100000)
cost=time.time()-start
print(cost)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(spk) [root@nn spv]# kernprof -l -v asyn.py 
5.805598974227905
Wrote profile results to asyn.py.lprof
Timer unit: 1e-06 s

Total time: 5.73735 s
File: asyn.py
Function: asyn at line 6

Line # Hits Time Per Hit % Time Line Contents
==============================================================
6 @profile
7 def asyn(n):
8 1 1204525.0 1204525.0 21.0 threads=[gevent.spawn(task,i) for i in range(n)]
9 1 4532823.0 4532823.0 79.0 gevent.joinall(threads)

非常清晰看到,理论上:10万个任务使用协程实现并发运行,总耗时1秒,但实际上,因为需要创建大量greenlet对象,列表创建10万个项耗时1.2秒,gevent joinall 10万个greenlet对象耗时4.5秒,所以整个程序完成总耗时实际为5.7秒左右。

使用memory_profiler库查看asyn.py内存使用情况,使用也简单与line_profiler相似,使用@profile装饰器来标识需要追踪的函数即可。使用协程,10万个对象消耗300多M,鉴于其并发效率高,而且所有的执行都只在一个线程实现 了,因此内存消耗可接受。

1
2
3
4
5
6
7
8
9
10
(spk) [root@nn spv]# python -m memory_profiler asyn.py
Filename: asyn.py

Line # Mem usage Increment Line Contents
================================================
6 36.137 MiB 36.137 MiB @profile
7 def asyn(n):
8 229.531 MiB 0.773 MiB threads=[gevent.spawn(task,i) for i in range(n)]
9 366.270 MiB 136.738 MiB gevent.joinall(threads)

注意:内存的单位MiB,表示的mebibyte,

MB/s的意思是每秒中传输10^6 byte的数据,以10为底数的指数

MiB/s的意思是每秒中传输2^20 byte的数据,以2为底数的指数

1 MiB =0.9765625 MB

创建100万个task,再看看kernprof -l -v asyn.py ,内存方面使用top可以直观看到asyn.py 占用了2G*0.816=1632 MiB

1
2
3
4
5
KiB Swap:  2097148 total,  1039272 free,  1057876 used.    14348 avail Mem 

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
30 root 20 0 0 0 0 S 45.1 0.0 0:47.13 kswapd0
29380 root 20 0 2319772 1.4g 56 R 42.8 81.6 1:04.53 kernprof

将协程并发数设为100万,总共耗时为534秒,时间略长,使用gevent在单台服务器上,并发数不要设太离谱,1000个并发足以应付普通项目的需求,例如爬虫,例如做服务端接收客户端发来的socket流数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(spk) [root@nn spv]# kernprof -l -v asyn.py           
534.7752296924591
Wrote profile results to asyn.py.lprof
Timer unit: 1e-06 s

Total time: 532.165 s
File: asyn.py
Function: asyn at line 6

Line # Hits Time Per Hit % Time Line Contents
==============================================================
6 @profile
7 def asyn(n):
8 1 82927089.0 82927089.0 15.6 threads=[gevent.spawn(task,i) for i in range(n)]
9 1 449238172.0 449238172.0 84.4 gevent.joinall(threads)
2.3 理解gevent的monkey.patch_all()

  在接下有关gevent的实际项目中,py程序都会引用monkey.patch_all()这个方法,它的作用是用非阻塞模块替换python自带的阻塞模块,这就是所谓”猴子补丁”,原理是运行时用非阻塞的对象属性替换对应阻塞对象的属性,或者用自己实现的同名非阻塞模块,替换对应的阻塞模块。
  注意:这里说的模块就是“有完整功能的一个.py文件“或者”由多个py文件组成的一个完整功能的模块“
  例如下面要实现这么一个需求:server.py运行时,将thread.py模块的synfoo函数替换为自定义的mythread.py模块里面asynfoo函数

thread.py 模块

1
2
def foo():
print('builtin method synfoo of thread.py')

自定义的mythread.py模块

1
2
def foo():
print('builtin method asynfoo of mythread.py')

server.py程序

1
2
3
4
5
6
7
8
9
10
11
12
import sys
import thread
# 在本程序的modules字典里面删除原thread模块
del sys.modules['thread']

# 使用异步的模块替换当前thread模块
sys.modules['thread'] = __import__('mythread')

# 重新加载thread
import thread
thread.foo() #这里的thread已经是mythread模块
print(thread)

输出:

1
2
3
4
# server.py运行时已经成功将内建同步模块替换为异步的模块
builtin method asynfoo of mythread.py
# thread的指向自定义模块mythread
<module 'mythread' from '/opt/spv/mythread.py'>

以上就是monkey.patch_all()大致逻辑,gevent可以把python内建的多个模块在程序运行时替换为它写的异步模块,默认是把内建的socket、thread、queue等模块替换为非阻塞同名模块

(site-packages/gevent/monkey.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, httplib=False
subprocess=True, sys=False, aggressive=True, Event=False,
builtins=True, signal=True):
_warnings, first_time = _check_repatching(**locals())
if not _warnings and not first_time:
return
if os:
patch_os()
if time:
patch_time()
if thread:
patch_thread(Event=Event, _warnings=_warnings)
# sys must be patched after thread. in other cases threading._shutdown will be
# initiated to _MainThread with real thread ident
if sys:
patch_sys()
if socket:
patch_socket(dns=dns, aggressive=aggressive)
if select:
patch_select(aggressive=aggressive)
if ssl:
patch_ssl()
if httplib:
raise ValueError('gevent.httplib is no longer provided, httplib must be False')
if subprocess:
patch_subprocess()
if builtins:
patch_builtins()

如果不想gevent对某个内建模块覆盖为非阻塞,可以将该模块设为False:monkey.patch_all(thread=False)
或者在monkey.patch_all(thread=False) 语句后面追加import threading
目的是内建同步模块再次覆盖前面的gevent异步模块,例如server.py文件

1
2
3
4
5
6
7
8
9
10
11
12
from gevent import monkey
monkey.patch_all(thread=False)
# 或者 import threading
def task():
pass
def syn():
t=threading.thread(target=task,args=())
t.start()
...

if __name__=='__main__'
gevent.joinall...

由此可知,gevent的patch_all针对模块的覆盖是有顺序的,因为当使用gevent时,import的模块顺序很重要,内建模块在patch_all前面或者在patch_all后面,对应是同步还是异步模块导入。

2.2.1 locals()方法

  locals()返回一个字典,它可以获取当前模块所有的局部变量以及当前模块引入的其他模块,例如myFoo.py模块

1
2
3
4
5
6
7
8
9
10
11
import threading
import socket
local_module_dict=locals()
class Foo():
pass
print(local_module_dict['threading'])
print(local_module_dict['socket'])
输出
# <module 'threading' from '/root/.pyenv/versions/3.7.5/lib/python3.7/threading.py'>
# <module 'socket' from '/root/.pyenv/versions/3.7.5/lib/python3.7/socket.py'>

gevent通过locals()获取这些模块后,将需要替换的模块都替换gevent自己实现的非阻塞模块

2.2.2 gevent 替换非阻塞的模块的思路

  在前面的例子中,使用gevent.sleep()可以让协程自动切换实现异步方式执行,如果使用内建的time.sleep(),则变成同步执行,下面看看gevent如何使用patch_time()方法为sleep打补丁:

1
2
3
4
5
6
7
8
9
10
11
12
def patch_time():
"""Replace :func:`time.sleep` with :func:`gevent.sleep`."""
from gevent.hub import sleep
import time
# 用gevent.hub.sleep方法替换内建的sleep方法
patch_item(time, 'sleep', sleep)

def patch_item(module, attr, newitem):
olditem = getattr(module, attr, _NONE)
if olditem is not _NONE:
saved.setdefault(module.__name__, {}).setdefault(attr, olditem)
setattr(module, attr, newitem)

实现原理跟2.2提到monke.patch_all()一样

1
2
3
4
5
6
7
from gevent.hub import sleep # 先导入gevent的sleep
import time # 再导入内建time模块
print(getattr(time,'sleep')) # 获取原内建sleep
# <function time.sleep>
setattr(time,'sleep',sleep) # 将gevent的异步sleep方法替换原sleep方法
print(getattr(time,'sleep')) # 打印运行是sleep方法看看是内建的还是gevent实现的
# <function gevent.hub.sleep(seconds=0, ref=True)>

对于模块的导入,则需要使用getattr自省模式创建一个对象,然后通过__import__引入

以gevent替换os为例:

1
2
def patch_os():
patch_module('os')

具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 用gevent的os替换内建的os模块,这里的name就是'os'
def patch_module(name, items=None):
# 通过__import__方法导入用gevent的os
gevent_module = getattr(__import__('gevent.' + name), name)
module_name = getattr(gevent_module, '__target__', name)
module = __import__(module_name)
if items is None:
# 获取gevent_module里面跟os相关的方法
items = getattr(gevent_module, '__implements__', None)
if items is None:
raise AttributeError('%r does not have __implements__' % gevent_module)
for attr in items:
# 用 gevent自己实现的os里面方法替换内建os指定方法
patch_item(module, attr, getattr(gevent_module, attr))
return module

相信到了这里,已经可以理解 monkey.patch_all()为何要在gevent的程序头部引入,常见“模板”如下:

1
2
3
import threading
from gevent import monkey
monkey.patch_all()

很多文章在讨论gevent的协程时,基本都是一句话“这是打补丁,将阻塞模块替换为非阻塞模块”简单带过,对于大部分人来说,这种说明一般会感到疑惑。

3、gevent examples

  本章主要结合一些场景给出gevent用法,参考了gevent官网给出的examples:地址

3.1 使用协程高并发爬网页
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from __future__ import print_function
import random,datetime
import gevent
from gevent import monkey

# patches stdlib (including socket and ssl modules) to cooperate with other greenlets
# 将标准lib打补丁,例如下面的https请求需要用到ssl模块,将该内建的ssl模块替换为gevent的ssl
monkey.patch_all()

import requests
import time

# 这里给出的https协议来说明gevent可进行SSL的相关任务处理
def run (workers=1000):
start=time.time()
url_pool = [
'https://www.baidu.com/',
'https://www.apple.com/',
'https://www.qq.com/'
]

urls=[ random.choice(url_pool) for _ in range(workers)]
def print_head(url):
print('Starting {} at {}'.format(url,datetime.datetime.now()))
data = requests.get(url).text # gevent会在发生IO的位置实现协程自动切换
#print('%s: %s bytes: %r' % (url, len(data), data[:2]))
jobs = [gevent.spawn(print_head, _url) for _url in urls]
gevent.wait(jobs) # 阻塞主线程,让所有协程得以持续运行
cost=time.time()-start
print('cost:',cost)

if __name__=='__main__':
run()

以上1000个请求,只需运行一个线程,非常轻量且“低功耗”,而多线程方式,则需创建1000个线程,这就是协程的优势。

3.2 gevent实现的socket高并发

server.py端逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import socket,datetime
import gevent
from gevent import socket,monkey
monkey.patch_all()
import sys

class DecodeErr(Exception):
pass


class Server(object):
def __init__(self,host='0.0.0.0',port=8090,conns=100):
self._s=socket.socket()
self._s.bind((host,port))
self._s.listen(conns)

def parse_request(self,conn):
while True:
# 接收客户端发送的是比特字节,需要decode为str类型
msg=conn.recv(1024).decode('utf-8')
if not msg:
break
print('got the msg:{} at {}'.format(msg,self.recv_time()))

# 发送给client需要byte类型
conn.send(bytes(msg,encoding='utf-8'))
if msg =='quit':
conn.shutdown(socket.SHUT_RDWR)
conn.close()
break

@staticmethod
def recv_time():
d=datetime.datetime.now()
return '%s:%s:%s'%(d.hour,d.minute,d.second)

def serve_forever(self):
while True:
try:
client_conn,client_ip=self._s.accept()
# 创建一个新的Greenlet服务新的请求
g=gevent.spawn(self.parse_request,client_conn)
print('new client connected:{} {} serving...'.format(client_ip,g.name))
except Exception as e:
pass

if __name__=='__main__':
conns=int(sys.argv[1])
server=Server(conns=conns)
server.serve_forever()

另外打开2个终端使用 telnet 188.0.0.10 8090

输出:
可以看到每个client请求都是由新的greenlet来服务,这个Greenlet就是协程对象<Greenlet at 0x7f7dc87efa70: parse_request(<gevent._socket3.socket object, fd=7, family=2, ty)>,而非多线程对象。

1
2
3
4
5
6
7
[root@nn spv]# python server.py 100
new client connected:('188.0.0.10', 21042) Greenlet-0 serving...
got the msg:foo
at 10:8:3
new client connected:('188.0.0.10', 21044) Greenlet-1 serving...
got the msg:bar
at 10:8:37

这里需要注意:
在parse_request里面关闭client的连接用conn.shutdown(socket.SHUT_WR)
shutdown 方法的 how 参数接受如下参数值:

  • SHUT_RD:关闭 socket 的输入部分,程序还可通过该 socket 输出数据。(tcp半开状态)
  • SHUT_WR: 关闭该 socket 的输出部分,程序还可通过该 socket 读取数据。((tcp半开)状态)
  • SHUT_RDWR:全关闭。该 socket 既不能读取数据,也不能写入数据。

conn.close():关闭完整tcp连接通道
close方法不是立即释放,如果想立即释放,需在close之前使用shutdown方法
server.py 使用gevent实现可接受高并发连接,下面给出gevent版的client,高并发socket请求

client.py端代码*

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import datetime
import gevent
from gevent import socket,monkey
monkey.patch_all()
import sys

class Client(object):
def __init__(self,server_ip,port,workers=10):
self.server_ip=server_ip
self.port=port
self.workers=workers

@staticmethod
def recv_time():
d=datetime.datetime.now()
return '%s:%s:%s'%(d.hour,d.minute,d.second)

def asyn_sock(self,msg):
client=socket.socket()
client.connect((self.server_ip,self.port))
bmsg=bytes(msg,encoding='utf-8')
client.sendall(bmsg)
recv_data=client.recv(1024).decode('utf-8')
print('gevent object:{} data:{} at:{}'.format(gevent.getcurrent(),recv_data,self.recv_time()))
client.close()


def start(self):
threads=[gevent.spawn(self.asyn_sock,str(i)) for i in range(self.workers)]
gevent.joinall(threads)

if __name__=='__main__':
workers=int(sys.argv[1])
c=Client(server_ip='188.0.0.10',port=8090,workers=workers)
c.start()

服务器端启动100个连接数,客户端并发10个请求。

服务端打印如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@nn spv]# python server.py 100
new client connected:('188.0.0.10', 21246) Greenlet-0 serving...
new client connected:('188.0.0.10', 21248) Greenlet-1 serving...
new client connected:('188.0.0.10', 21250) Greenlet-2 serving...
new client connected:('188.0.0.10', 21252) Greenlet-3 serving...
new client connected:('188.0.0.10', 21254) Greenlet-4 serving...
new client connected:('188.0.0.10', 21256) Greenlet-5 serving...
new client connected:('188.0.0.10', 21258) Greenlet-6 serving...
new client connected:('188.0.0.10', 21260) Greenlet-7 serving...
new client connected:('188.0.0.10', 21262) Greenlet-8 serving...
new client connected:('188.0.0.10', 21264) Greenlet-9 serving...
got the msg:0 at 10:11:5
got the msg:1 at 10:11:5
got the msg:2 at 10:11:5
got the msg:3 at 10:11:5
got the msg:4 at 10:11:5
got the msg:5 at 10:11:5
got the msg:6 at 10:11:5
got the msg:7 at 10:11:5
got the msg:8 at 10:11:5
got the msg:9 at 10:11:5

客户端打印如下:

1
2
3
4
5
6
7
8
9
10
11
[root@nn spv]# python asyn.py 10
gevent object:<Greenlet at 0x7ff41a9a85f0: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('9')> data:9 at:10:11:5
gevent object:<Greenlet at 0x7ff41a9a84d0: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('8')> data:8 at:10:11:5
gevent object:<Greenlet at 0x7ff41a9a83b0: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('7')> data:7 at:10:11:5
gevent object:<Greenlet at 0x7ff41a9a8290: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('6')> data:6 at:10:11:5
gevent object:<Greenlet at 0x7ff41a9a8170: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('5')> data:5 at:10:11:5
gevent object:<Greenlet at 0x7ff41a9a8050: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('4')> data:4 at:10:11:5
gevent object:<Greenlet at 0x7ff41b586ef0: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('3')> data:3 at:10:11:5
gevent object:<Greenlet at 0x7ff41b586b90: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('2')> data:2 at:10:11:5
gevent object:<Greenlet at 0x7ff41b586cb0: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('1')> data:1 at:10:11:5
gevent object:<Greenlet at 0x7ff41b586a70: <bound method Client.asyn_sock of <__main__.Client object at 0x7ff41a9da210>>('0')> data:0 at:10:11:5

可以看到不管是服务器和客户端,都是由多个greenlet协程对象负责请求或者负责服务。
如果server.py端并发数设为10000,client.py并发也设为10000,那么会出现以下情况:

1
2
3
init__
OSError: [Errno 24] Too many open files
During handling of the above exception, another exception occurred:

这里因为centos限制用户级别在打开文件描述符的数量,可查看默认值:限制至多打开1024个文件

1
2
[root@nn spv]# ulimit -n
1024

linux 一般会在以下几个文件对系统资源做限制,例如用户级别(此外还有系统级别)的限制: /etc/security/limits.conf,和/etc/security/limits.d/目录,/etc/security/limits.d/里面配置会覆盖/etc/security/limits.conf的配置:

系统限制用户的资源有:所创建的内核文件的大小、进程数据块的大小、Shell
进程创建文件的大小、内存锁住的大小、常驻内存集的大小、打开文件描述符的数量、分配堆栈的最大大小、CPU 时间、单个用户的最大线程数、Shell
进程所能使用的最大虚拟内存。同时,它支持硬资源和软资源的限制。

提升并发性能:临时修改:ulimit -n 100000;永久性修改:root权限下,在/etc/security/limits.conf中添加如下两行,*表示所有用户,重启/或者注销重登陆生效

1
2
* soft nofile 102400
* hard nofile 104800

注意hard limit必须大于soft limit

这里将linux设为ulimit -n 100000,10万个描述符! python server.py 20000个并发,python client.py 10000并发请求打过去,3秒内完成,而且这是因为程序加入print打印语句影响性能,去掉所有print语句,2万个客户端并发不到2秒内完成,gevent或者说底层Greenlet的并发性能非常强。

3.3 gevent数据库操作

  这里将给出协程方式、多线程方式连接mysql数据库某实际项目备份表,15个字段,2万多条数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import pymysql
import gevent
from gevent import socket,monkey
monkey.patch_all()
import time

def timeit(func):
def wrapper(*args,**kwargs):
start=time.time()
func(*args,**kwargs)
cost=time.time()-start
print('{} cost:{}'.format(func.__name__,cost))
return wrapper


def read_db(index):
"""负责读数据"""
#print('start:',index)
db = pymysql.connect(host = '****', user = '****', passwd = '****', db= '****')
cursor = db.cursor()
sql='select count(1) from `article '
cursor.execute(sql)
nums = cursor.fetchall()
#print('total itmes:',nums)
cursor.close()
db.close()
#print('end:',index)

@timeit
def gevent_read(workers):
# 创建多个greenlets协程对象
greenlets = [gevent.spawn(read_db,i) for i in range(workers)]
gevent.joinall(greenlets)

if __name__=='__main__':
# 5次测试。这里每次间隔1秒,让客户端连接mysql的connections及时关闭,避免释放不及时导致超过数据库端的允许连接数
for _ in range(5):
time.sleep(1)
gevent_read(100)

从代码逻辑可以看出,gevent使用协程非常简单,在头部引入相关模块,再使用gevent.spawn创建多个greenlets对象,最后joinall。以下是测试结果

1
2
3
4
5
6
[root@nn spv]# python asyn_mysql.py 
gevent_read cost:2.702486276626587
gevent_read cost:2.120276689529419
gevent_read cost:2.1487138271331787
gevent_read cost:2.61714243888855
gevent_read cost:2.1180896759033203
3.4 gevent 多线程

gevent也有自己线程池,使用的python的thread,两者没区别,如果用了多线程,那么gevent其实就没多大意义了,因为不是协程模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from __future__ import print_function
import time
import gevent
from gevent.threadpool import ThreadPool

def timeit(func):
def wrapper(*args,**kwargs):
start=time.time()
func(*args,**kwargs)
cost=time.time()-start
print('{} cost:{}'.format(func.__name__,cost))
return wrapper

@timeit
def gpool(workers):
pool = ThreadPool(workers)
for _ in range(10):
pool.spawn(time.sleep, 1)
gevent.wait()

if __name__=='__main__':
gpool(4)

输出3秒,10个任务,线程池只有4个worker,因此需分三轮工作,因为耗时3秒

1
2
[root@nn spv]# python gpool.py 
gpool cost:3.006455183029175
3.5 gevent 其他examples

这里不再一一列出,可以参考gevent github的example目录

不过建议看看geventsendfile.pywsgiserver_ssl.py
第一是零拷贝技术的协程,第二个是基于https的协程webserver

4、greenlet/eventlet/gevent的关系

  Greelent实现了一个比较易用(相比yeild)的协程切换的库。但是greenlet没有自己的调度过程,所以一般不会直接使用。
  Eventlet在Greenlet的基础上实现了自己的GreenThread,实际上就是greenlet类的扩展封装,而与Greenlet的不同是,Eventlet实现了自己调度器称为Hub,Hub类似于Tornado的IOLoop,是单实例的。在Hub中有一个event loop,根据不同的事件来切换到对应的GreenThread。同时Eventlet还实现了一系列的补丁来使Python标准库中的socket等等module来支持GreenThread的切换。Eventlet的Hub可以被定制来实现自己调度过程。
  Gevent基于libev和Greenlet。不同于Eventlet的用python实现的hub调度,Gevent通过Cython调用libev来实现一个高效的event loop调度循环。同时类似于Eventlet,Gevent也有自己的monkey_patch,在打了补丁后,完全可以使用python线程的方式来无感知的使用协程,减少了开发成本。
  这里也顺便给出greenlet/eventlet/gevent和其他可以实现协程模式库的对比表格,该表来自Gruvi作者的项目介绍页。Gruvi是一个轻量且特别的协程库,项目作者因为不太认同常见python协程库的实现方式,而且也不认同不推荐使用monkey patch方式,所有他写了Gruvi,专注green thread:项目地址

Feature Gruvi Asyncio Gevent Eventlet
IO library libuv stdlib libev stdlib / libevent
IO abstraction Transports / Protocols Transports / Protocols Green sockets Green sockets
Threading fibers yield from greenlet greenlet
Resolver threadpool threadpool threadpool / c-ares blocking / dnspython
Python: 2.x YES (2.7) YES (2.6+, via Trollius) YES YES
Python: 3.x YES (3.3+) YES YES NO
Python: PyPy NO NO YES YES
Platform: Linux FAST FAST FAST FAST
Platform: Mac OSX FAST FAST FAST FAST
Platform: Windows FAST (IOCP) FAST (IOCP) SLOW (select) SLOW (select)
SSL: Posix FAST FAST FAST FAST
SSL: Windows FAST (IOCP) FAST (IOCP 3.5+) SLOW (select) SLOW (select)
SSL: Contexts YES (also Py2.7) YES (also Py2.6+) NO NO
HTTP FAST (via http-parser) NO (external) SLOW (stdlib) SLOW (stdlib)
Monkey Patching NO NO YES YES

本博客也会为Gruvi写一篇文章,主要是欣赏作者阐述的设计理念。从对比表格来看,Asyncio各方面都出色,而且完全由Python标准库实现,后面也有关于Asyncio深入讨论的文章。

5、gevent 不适用的场合

这里参考Stack Overflow的文章《Asyncio vs. Gevent 》

it wasn’t perfect:

  • Back then, it didn’t work well on Windows (and it still has some limitations today). gevent在Windows 表现不佳
  • It couldn’t monkey-patch C extensions, so we coudn’t use MySQLdb, for example. Luckily, there were many pure Python alternatives, like PyMySQL. 由于gevent的 monkey-patch替换原理,参考上面2.2,它只支持对存python库打补丁,对于C语言实现的python库,例如MySQLdb,则不支持。

这里篇文章大致意思是建议用asyncio,因为它是标准库,有着非常详细的文档以及稳定的python官方维护。gevent也可以用,但是自己要清楚项目演进的后续维护情况。

Supported Platforms

gevent 1.3 runs on Python 2.7 and Python 3. Releases 3.4, 3.5 and
3.6 of Python 3 are supported. (Users of older versions of Python 2
need to install gevent 1.0.x (2.5), 1.1.x (2.6) or 1.2.x (<=2.7.8);
gevent 1.2 can be installed on Python 3.3.) gevent requires the
greenlet library and will install
the cffi library by default on Windows.

6、协程原理解析

  前面具体的gevent代码示例,对深入理解协程有一定帮助,因为在本文中,把原理性的讨论放在最后一节显得更为合理。谈到协程又不得不把进程、线程以及堆、栈相关概念抛出,以便从全局把握协程、线程和进程。

6.1 进程与内存分配

  进程是系统资源分配的最小单位,Linux系统由一个个在后台运行process提供所有功能的组成,你可以用ll /proc |wc -l或者ps aus|less查看系统运行的进程。进程自己是需要占用系统资源的,例如cpu、内存、网络,这里我们关注其
程序的内存分配。
这里以一个由C /C++编译的程序占用的内存分为以下几个部分为例说明,这段内容参考文章《堆栈的区别》

  • 栈区(stack): 由编译器自动分配释放 ,存放函数的参数值,局部变量的值等。其操作方式类似于数据结构中的栈。

  • 堆区(heap):一般由程序员(在代码里面自行申请内存)分配释放, 若程序员不释放,程序结束时可能由OS回收 。注意它与数据结构中的堆是两回事,分配方式倒是类似于链表。

  • 全局区(静态区)(static):全局变量和静态变量的存储是放在一块的,初始化的全局变量和静态变量在一块区域, 未初始化的全局变量和未初始化的静态变量在相邻的另一块区域。 - 程序结束后有系统释放

  • 文字常量区 :常量字符串就是放在这里的。 程序结束后由系统释放

  • 程序代码区:存放函数体的二进制代码

  可以想象,系统创建一个新的进程都进行以上的复杂内存分配工作,而进程结束后系统还得进行大量内存回收清理工作,如果系统有成千上万个进程创建、切换以及销毁,可想而知,非常消耗资源,”疲于奔命,顾不上其他重要请求“(这就是Apache服务器的并发性的劣势,看看Nginx有多强大)。所以多进程做并发业务,显然不是一个理想方案。

6.2 线程

  关于进程的描述,其实很多文章可以找到相关讨论,这里以线程和进程的区别作为说明:

  • 本质区别:进程是操作系统资源分配(分配CPU、内存、网络)的基本单位,而线程是任务(进行某种代码逻辑)调度和执行的基本单位

  • 资源占用区别:每个进程都有独立的代码和程序上下文环境,进程之间的切换消耗较大系统资源(投入大,代价较高);这里顺便说明为何代价高?因为进程之间切换涉及到用户空间(用户态)和内核空间(内核态)的切换。一个进程里面可以有多个线程运行,同一类线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的消耗的是当前进程占有的资源,代价较小,但也不低。

  • 内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源。

  • 所处环境:在操作系统中能同时运行多个进程(程序);而在同一个进程中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)

6.3 协程

  终于谈到本章的主角:协程,英文coroutine,它比线程更加轻量,你可以这样认为:一个进程可以拥有多个线程一样,而一个线程也可以拥有多个协程。
==协程与进程的区别==:

  • 执行流的调度者不同,进程是内核调度,而协程是在用户态调度,也就是说进程的上下文是在内核态保存恢复的,而协程是在用户态保存恢复的,很显然用户态的代价更低
  • 进程会被强占,而协程不会,也就是说协程如果不主动让出CPU,那么其他的协程,就没有执行的机会。
  • 对内存的占用不同,实际上协程可以只需要4K的栈就足够了,而进程占用的内存要大的多
    • 从操作系统的角度讲,多协程的程序是单进程,单协程

==协程与线程的区别==
  一个线程里面可以包含多个协程,线程之间需要上下文切换成本相对协程来说是比较高的,尤其在开启线程较多时,线程的切换更多的是靠操作系统来控制,而协程之间的切换和运行由用户程序代码自行控制或者类似gevent这种自动切换,因此协程不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行),这将为用户可以设计出非常高性能的并发编程模式。如下图所示一个主线程负责使用gevent自动调度(自动切换运行)2个协程,大致逻辑如下:

  • 主线程(MainThread,也是根协程或者当前线程)创建(spawn)两个协程,只有有协程遇到IO event时候就把控制权交给当前线程,直到这个协程的IO event已经完成,主线程将控制权给这个协程。
    在这里插入图片描述

    7、小结

      本文开启了Python的异步编程文章讨论篇章,算是比较进阶的内容,因为异步模式可让实际项目确实受益不少,在本博客之后有关异步的内容有:asyncio、文件描述符与IO多路复用。