在前面的文章,已经通过gevent实现高并发的协程,本文将详细讨论Python标准库异步IO——asyncio。在Python3.4中引入了协程的概念以及asyncio。asyncio底层调用yield from语法,将任务变成生成器后挂起,这种方式无法实现协程之间的自动切换,在Python3.5中正式确立引入了async和await 的语法,所有的这些工作都使得Python实现异步编程变得更容易上手。
1、asyncio的基本概念
event_loop 事件循环:每一个需要异步执行的任务都要注册到事件循环中,事件循环负责管理和调度这些任务之间的执行流程(遇到IO则自动切换协程等)。
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
async/await 关键字:在python3.5及以上,用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
在异步的模式里,所有代码逻辑都会运行在一个forever事件循环中(你可以把整个事件循环看成一个总控中心,它监听着当前线程创建的多个协程发发生的事件),它可以同时执行多个协程,这些协程异步地执行,直到遇到 await 关键字,事件循环将会挂起该协程,事件循环这个总控再把当前线程控制权分配给其他协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行。
2、使用asyncio
2.1 使用async关键字和await定义协程
在Python3.5之前,要实现协程方式的写法一般如下:1
2
3
4import asyncio
async def mytask(task_id):
yield from asyncio.sleep(1)
在Python3.5以后,全面使用async关键字和await定义协程,代码显更直观。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import asyncio
import datetime
def get_time():
d=datetime.datetime.now()
return d.strftime('%H:%M:%S')
# async 定义了mytask为协程对象
async def mytask(task_id):
# 这里就像gevent的sleep方法模拟IO,而且该协程会被asyncio自动切换
print('task-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(1) # await 要求该行语句的IO是有返回值的例如response=request.get(url),如果直接使用await time.sleep(2),则无法创建协程对象
print('task-{} done at:{}'.format(task_id,get_time()))
# 创建事件循环对象,该事件循环由当前主线程拥有
loop = asyncio.get_event_loop()
tasks=[mytask(i) for i in range(4)] # 这里mytask()是协程对象,不会离开运行。
loop.run_until_complete(asyncio.wait(tasks)) # 这里实行的逻辑就像gevent.joinall(tasks)一样,表示loop一直运行直到所有的协程tasks都完成
代码中通过async关键字定义一个协程(coroutine),不过该协程不能直接运行,需将它注册到事件循环loop里面,由后者在协程内部发生IO时(asyncio.sleep(2))时候调用协程。asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。
输出,从结果可以看出,5个协程同一时刻并发运行。1
2
3
4
5
6
7
8
9
10task-1 started at:17:12:37
task-0 started at:17:12:37
task-3 started at:17:12:37
task-2 started at:17:12:37
task-4 started at:17:12:37
task-1 done at:17:12:38
task-0 done at:17:12:38
task-3 done at:17:12:38
task-2 done at:17:12:38
task-4 done at:17:12:38
关于await要求该句为返回值:
await asyncio.sleep(2),这里可以看看sleep返回什么1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay == 0:
yield
return result
if loop is None:
loop = events.get_event_loop()
future = loop.create_future()
h = future._loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return (yield from future)
finally:
h.cancel()
如果设为delay值,且loop事件循环已创建(即使代码未创建它也会自动创建),返回的是future对象(yield from future),而这里可以挂起当前协程,直到future完成
2.2 task对象
1 | ...同上 |
查看打印结果:1
2
3
4before register to loop: <Task pending coro=<mytask() running at /opt/asyn.py:9>>
task-1 started at:17:39:06
task-1 done at:17:39:07
after loop completed,task return the result: ok
将协程封装为task对象后,task在注册到事件循环之前为pending状态,1秒后,task 结束,并且通过task.result()可以获取协程结果值。
task对象也可用asyncio.ensure_future(coro)创建(接收coro协程或者future对象),它内部封装了loop.create_task
2.2 future对象
前面定义说了future表示将来执行或没有执行的任务的结果,task是future的子类。
基本的方法有:
• cancel(): 取消future的执行,调度回调函数
• result(): 返回future代表的结果
• exception(): 返回future中的Exception
• add_done_callback(fn): 添加一个回调函数,当future执行的时候会调用这个回调函数。
• set_result(result): 将future标为运行完成,并且设置return值,该方法常用
使用future,可以在协程结束后自行回调函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import asyncio
...同上
async def coru_1(future_obj,N):
print('coru_1 started at:{}'.format(get_time()))
total=sum(range(N))
await asyncio.sleep(2)
future_obj.set_result('coru_1 returns:{}'.format(total))
print('coru_1 done at:{}'.format(get_time()))
async def coru_2(future_obj,N):
print('coru_2 started at:{}'.format(get_time()))
total=sum(range(N))
await asyncio.sleep(2)
future_obj.set_result('coru_2 returns:{}'.format(total))
print('coru_2 done at:{}'.format(get_time()))
def call_back(future_obj):
time.sleep(1)
print('saved to redis at :',get_time(),future_obj,future_obj.result())
if __name__=='__main__':
loop=asyncio.get_event_loop()
f1=asyncio.Future()
f2=asyncio.Future()
tasks=[coru_1(f1,10),coru_2(f2,20)]
f1.add_done_callback(call_back)
f2.add_done_callback(call_back)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
输出1
2
3
4
5
6coru_1 started at:16:52:07
coru_2 started at:16:52:07
coru_1 done at:16:52:09
coru_2 done at:16:52:09
saved to redis at : 16:52:10 <Future finished result='coru_1 returns:45'> coru_1 returns:45
saved to redis at : 16:52:11 <Future finished result='coru_2 returns:190'> coru_2 returns:190
两个协程同时启动且在同一时间结束运行。之后开始回调,可以看到协程1先回调,1秒完成后,再切换到协程2回调。
2.3 获取协程并发执行后的所有返回值
1 | import asyncio |
输出:1
2
3
4
5task-0 started at:15:53:08
task-1 started at:15:53:08
task-2 started at:15:53:08
task-3 started at:15:53:08
['task-0 done at:15:53:09', 'task-1 done at:15:53:09', 'task-2 done at:15:53:09', 'task-3 done at:15:53:09']
以上也无需使用future的回调机制获取协程返回值,直接在loop结束后,从task对象的result方法即可获得协程返回值。
需要注意的是:
用于等待所有协程完成的方法asyncio.wait和asyncio.gather,都是接受多个future或coro组成的列表,区别:asyncio.gather内边调用ensure_future方法将列表中不是task的coro封装为future对象,而wait则没有。
2.4 asyncio.gather vs asyncio.wait
这里再给两个例子说明这两者的区别以及应用场合:
asyncio.gather1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import asyncio
import datetime
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')
async def coro(group_id,coro_id):
print('group{}-task{} started at:{}'.format(group_id,coro_id,get_time()))
await asyncio.sleep(coro_id) # 模拟读取文件的耗时IO
return 'group{}-task{} done at:{}'.format(group_id,coro_id,get_time())
loop=asyncio.get_event_loop()
# 创建三组tasks
tasks1=[asyncio.ensure_future(coro(1,i))for i in range(1,5)]
tasks2=[asyncio.ensure_future(coro(2,i)) for i in range(6,8)]
tasks3=[asyncio.ensure_future(coro(3,i)) for i in range(8,10)]
group1=asyncio.gather(*tasks1) # 对第1组的协程进行分组,group1
group2=asyncio.gather(*tasks2) # 对第2组的协程进行分组,group2
group3=asyncio.gather(*tasks3) # 对第3组的协程进行分组,group3
all_groups=asyncio.gather(group1,group2,group3) # 把3个group再聚合成一个大组,也是就所有协程对象的被聚合到一个大组
loop=asyncio.get_event_loop()
all_group_result=loop.run_until_complete(all_groups)
for index,group in enumerate(all_group_result): # 获取每组协程的输出
print('group {} result:{}'.format(index+1,group))
loop.close()
输出:1
2
3
4
5
6
7
8
9
10
11group1-task1 started at:35:19
group1-task2 started at:35:19
group1-task3 started at:35:19
group1-task4 started at:35:19
group2-task6 started at:35:19
group2-task7 started at:35:19
group3-task8 started at:35:19
group3-task9 started at:35:19
group 1 result:['group1-task1 done at:35:21', 'group1-task2 done at:35:21', 'group1-task3 done at:35:21', 'group1-task4 done at:35:21']
group 2 result:['group2-task6 done at:35:21', 'group2-task7 done at:35:21']
group 3 result:['group3-task8 done at:35:21', 'group3-task9 done at:35:21']
从打印结果可知,每组协程都在同一时刻开始以及同一时刻结束,asyncio.gather就是用于在更高层面对task进行分组,以不同的组管理不同的协程,你可以看出gather是一个粗粒度组织协程,自动收集所有协程结束后的返回值。
asyncio.wait1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37import asyncio
import datetime
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')
async def coro(task_id):
print('coro-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(task_id) # 模拟读取文件的耗时IO
return 'coro-{} done at:{}'.format(task_id,get_time())
tasks=[coro(i) for i in range(1,10)]
loop=asyncio.get_event_loop()
first_complete,unfinished1=loop.run_until_complete(asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED))
# 获取首个已结束的协程返回值,注意这里firt_complete是一个set()
first_done_task=first_complete.pop()
print('首个完成的协程返回值:',first_done_task.result())
print('还未结束的协程数量:',len(unfinished1))
# 将第一阶段未完成的协程注册到loop里面
finished2,unfinished2=loop.run_until_complete(asyncio.wait(unfinished1,timeout=3))
# 获取第二阶段已完成的协程返回值
for t in finished2:
print(t.result())
print('还未结束的协程数量:',len(unfinished2))
# 将第二阶段未完成的协程注册到loop里面
finished3,unfinished3=loop.run_until_complete(asyncio.wait(unfinished2))
# 获取第三阶段已完成的协程返回值
for t in finished3:
print(t.result())
print('还未结束的协程数量:',len(unfinished3))
输出:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21coro-5 started at:23:40
coro-1 started at:23:40
coro-6 started at:23:40
coro-7 started at:23:40
coro-2 started at:23:40
coro-8 started at:23:40
coro-3 started at:23:40
coro-9 started at:23:40
coro-4 started at:23:40
首个完成的协程返回值: coro-1 done at:23:41
还未结束的协程数量: 8
coro-2 done at:23:42
coro-3 done at:23:43
coro-4 done at:23:44
还未结束的协程数量: 5
coro-6 done at:23:46
coro-7 done at:23:47
coro-9 done at:23:49
coro-5 done at:23:45
coro-8 done at:23:48
还未结束的协程数量: 0
从输出结果可以很清看出asyncio.wait
很精确的控制协程运行过程,通过wait(return_when=asyncio.FIRST_COMPLETED)
可拿到运行完成的协程,通过wait(timeout)
控制指定时间后放回已完成的协程。
2.5 嵌套协程的实现(协程内调用协程)
一种易于理解的调用异步函数的方式
在介绍嵌套协程或者闭包协程、协程内调用协程的概念前,先看看普通函数内部调用普通函数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import time
def func1(data):
time.sleep(1)
return data
def func2(data):
time.sleep(1)
return data*2
def func3(data):
time.sleep(1)
return data*3
def gel_all_data():
result1=func1('foo')
result2=func2(result1)
result3=func3(result2)
return (result1,result2,result3)
在同步的编程思维下,大家很容易理解get_all_data函数内部调用func1等三个外部函数来获取相应返回值,其实将同步改为异步的过程很简单:
==在每个函数前面使用关键字async向python解释器声明这是异步函数,如果需要调用外部异步函数,需使用await关键字==,将上面的同步编程改成异步编程的模式如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import asyncio
async def func1(start_data):
await asyncio.sleep(1) # 要使用asyncio的异步sleep方法,它会让出线程控制权给其他协程,而内建的sleep为同步性质
return start_data
async def func2(data):
await asyncio.sleep(1)
return data*2
async def func3(data):
await asyncio.sleep(1)
return data*3
async def get_all_data():
result1=await func1('foo') # 在异步函数内部,使用await关键字调用其他异步函数,并获取该异步函数的返回值。执行流会在此将当前线程控权让出
result2=await func2(result1)# 同上
result3=await func3(result2) # 同上
return(result1,result2,result3)
该异步的get_all_data其实要实现的需求为:一个协程内部调用其他协程,而且可以将返回值放置在不同的协程上,可以实现链式的协程调度,这看起来就是一个协程任务流 。
当熟悉了这种异步的编程模式后,可以玩一些更为进阶的例子:
协程嵌套示例
1 | import asyncio |
输出:1
2
3
4
5
6
7
8
9
10
11outter_coro started at:14:52:59
coro-0 started at:14:52:59
coro-1 started at:14:52:59
coro-2 started at:14:52:59
coro-3 started at:14:52:59
outter_coro done at:14:53:04
coro-1 done at:14:53:04
coro-3 done at:14:53:04
coro-2 done at:14:53:04
coro-0 done at:14:53:04
可以看到外层协程和内层协程同时启动(当然外层协程函数最先执行),而且都在同一个时刻结束。
内层协程返回值只能在外程协程内部获取,能否在
loop.run_until_complete(outter_coro()) 之后,一次性获取协程返回值? 需要改用asyncio.gather方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14async def outter_coro():
print('outter_coro started at:{}'.format(get_time()))
coros=[inner_coro(i) for i in range(4)]
tasks=[asyncio.ensure_future(coro) for coro in coros]
print('outter_coro done at:{}'.format(get_time()))
return await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
all_coro_result=loop.run_until_complete(outter_coro())
for t in all_coro_result:
print(t)
loop.close()
输出:1
2
3
4
5
6
7
8
9
10outter_coro started at:58:44
outter_coro done at:58:44
coro-0 started at:58:44
coro-1 started at:58:44
coro-2 started at:58:44
coro-3 started at:58:44
coro-0 done at:58:46
coro-1 done at:58:46
coro-2 done at:58:46
coro-3 done at:58:46
从外层协程outter_coro的启动时刻和结束时刻都一样可以看出,outter_coro和return await asyncio.gather(*tasks)是异步执行的,且在outter_coro结束后,loop事件循环只需管理coro-0到coro-3这4个协程。
从以上两种嵌套协程返回值的写法,可以看到这样逻辑:
- 外层协程直接返回 awaitable对象给loop,loop就可以在最后获取所有协程的返回值;
1
2
3
4
5
6def outter_coro()
return await asyncio.wait(tasks) # 返回awaitable对象给下文loop,这里用asyncio.wait挂起所有协程
done,pending=loop.run_until_complete(outter_coro())
for task in done:
print(task.result()) - 外层协程没有返回 awaitable对象给loop,loop无法获取所有协程的返回值,只能在外程协程里面获取所有协程返回值
1
2
3
4
5
6def outter_coro()
done,pending=await asyncio.wait(tasks) # 没有返回awaitable对象给下文loop
for task in done:
print(task.result())
loop.run_until_complete(outter_coro())2.6 如何取消运行中协程
future(task)对象主要有以下几个状态:
pending、running、done、cancelled
创建future(task)的时候,task为pending状态:tasks=[asyncio.ensure_future(coro) for coro in coros]
事件循环调用执行的时候且协程未结束时对应tsak为running状态,loop.run_until_complete(outter_coro())
事件循环运行结束后,所有的task为done状态,那么最后一个cancelled状态如何实现呢?例如你想在某些协程未done之前将其cancel掉,如何处理?引用2.4章节的asyncio.wait例子说明:1
2
3done,pending=loop.run_until_complete(outter_coro())
for task in done:
print(task.result())输出结果:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33import asyncio
import datetime
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')
async def coro(task_id):
print('coro-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(task_id) # 模拟读取文件的耗时IO
return 'coro-{} done at:{}'.format(task_id,get_time())
tasks=[coro(i) for i in range(1,10)]
loop=asyncio.get_event_loop()
first_complete,unfinished1=loop.run_until_complete(asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED))
# 获取首个已结束的协程返回值,注意这里firt_complete是一个set()
first_done_task=first_complete.pop()
print('首个完成的协程返回值:',first_done_task.result())
print('还未结束的协程数量:',len(unfinished1))
# 将第一阶段未完成的协程注册到loop里面
finished2,unfinished2=loop.run_until_complete(asyncio.wait(unfinished1,timeout=3))
for t in finished2:
print(t.result())
print('还未结束的协程数量:',len(unfinished2))
for task in unfinished2: # 取消剩余未运行的task
print('cancell unfinished task:',task,'==>is canceled:',task.cancel())从输出结果可看到,8个协程task并发运行,最早结束的是coro-1,接着是coro-2、coro-3、coro-4,因为设定asyncio.wait(unfinished1,timeout=3) 3秒超时,只要超过3秒后,loop返回这些未运行的task,接着再逐个取消,可以看到5个协程被取消,True表示当前协程取消成功。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20coro-4 started at:17:47
coro-5 started at:17:47
coro-1 started at:17:47
coro-6 started at:17:47
coro-7 started at:17:47
coro-2 started at:17:47
coro-8 started at:17:47
coro-3 started at:17:47
coro-9 started at:17:47
首个完成的协程返回值: coro-1 done at:17:48
还未结束的协程数量: 8
coro-2 done at:17:49
coro-3 done at:17:50
coro-4 done at:17:51
还未结束的协程数量: 5
cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py:10> wait_for=<Future cancelled>> ==>is canceled: True
cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py:10> wait_for=<Future cancelled>> ==>is canceled: True
cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py:10> wait_for=<Future cancelled>> ==>is canceled: True
cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py10> wait_for=<Future cancelled>> ==>is canceled: True
cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py:10> wait_for=<Future cancelled>> ==>is canceled: True
2.7 理解loop的相关方法
loop.run_until_complate vs loop.run_forever
loop.run_until_complate
可以在程序的不同位置多次调用,例如在2.4 asyncio.gather vs asyncio.wait
提到的asyncio.wait
用法,同一程序中能出现多个loop.run_until_complate
1
2
3
4
5
6
7# 将第一阶段未完成的协程注册到loop里面
finished2,unfinished2=loop.run_until_complete(asyncio.wait(unfinished1,timeout=3))
print('还未结束的协程数量:',len(unfinished2))
# 将第二阶段未完成的协程注册到loop里面
finished3,unfinished3=loop.run_until_complete(asyncio.wait(unfinished2))
print('还未结束的协程数量:',len(unfinished3))
而对于 loop.run_forever,在同一程序中,只能有一个,因为该事件是在当前线程后台永久运行:1
2
3
4
5
6
7
8
9
10
11
12
13
14import asyncio
async def coro():
print('start a coro')
await asyncio.sleep(1)
print('coro done')
future_obj=asyncio.ensure_future(coro())
loop=asyncio.get_event_loop()
loop.run_forever() # 程序不会退出,loop一直挂在这里,等待其他future对象
future_obj=asyncio.ensure_future(coro()) #程序没有报错,但执行流永远不会到达这里,该句永远不会运行
loop.run_forever() # 程序没有报错,但执行流永远不会到达这里,该语句永远不会运行
输出:1
2
3
4start a coro
coro done
,,,,
# 等待程序退出
loop.stop()
上面提到如果程序仅有loop.run_forever(),那么当future完成后,程序一直没有退出,若要求实现当future完成后,程序也需要正常退出,可以这样处理:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16import asyncio
import datetime
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')
async def coro(n):
print('start a coro at ',get_time())
await asyncio.sleep(n)
print('coro done at ',get_time())
future_obj=asyncio.ensure_future(coro(2))
loop=asyncio.get_event_loop()
loop.stop() # 在run_forever()前,先stop
print('stop后,loop事件还在运行? at ',get_time())
loop.run_forever()
输出1
2stop后,loop事件还在运行? at 43:11
start a coro at 43:11
从输出可以看到,上面的示例代码都是异步并发运行。print('stop后,loop事件还在运行? at ',get_time())
语句跟future任务同时运行
==注意:如果把loop.stop()方法放在run_forever后面,可预见,程序不会退出==1
2
3
4...
loop.run_forever()
loop.stop() # 执行流永远不会到达这一句
print('stop后,loop事件还在运行? at ',get_time()) # 执行流永远不会到达这一句
loop.call_soon/loop.call_later
这两个方法用于在异步函数里面调用同步函数(普通函数),且可以实现立刻调用或者稍后调用:loop.call_soon(callback, *args, context=None)
: 立刻调用,并返回1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16import asyncio
import functools
def callback(name,stat=1):
print('args:',name,'keyword args:',stat)
async def run(loop):
loop.call_soon(callback,'get first callback')
wrapper_func=functools.partial(callback,stat=2)
loop.call_soon(wrapper_func,'get second call back')
loop=asyncio.get_event_loop()
try:
loop.run_until_complete(run(loop))
finally:
loop.close()
打印:1
2args: get first callback keyword args: 1
args: get second call back keyword args: 2
异步调用同步函数其实已经破坏了异步的并发机制,因此很少使用这些非异步的方法。
此外loop.call_soon不支持协程函数传入关键字,因此可以通过偏函数先把关键字参数”传入“callback的kwargs里面,之后在call_soon里面,就可以利用这个被”包装过的callback“再传入位置参数即可(loop.run_until_complete传入关键字参数也一样这么处理)
loop.call_later(delay, callback, *args, context=None)
: 再给定一个时间之后,再调用callback。context默认当前线程的上下文1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import asyncio
import functools
def callback(name,stat=1):
print('args:',name,'keyword args:',stat)
async def run(loop):
loop.call_later(2,callback,'get first callback')
loop.call_soon(callback,'callback soon')
wrapper_func=functools.partial(callback,stat=0)
loop.call_later(1,wrapper_func,'get second callback')
await asyncio.sleep(2) # 这里如果不设sleep,那么call_soon执行后loop马上退出,导致2个有延时运行的callback也退出了。这里要大于等于delay时间最长的call_later
loop=asyncio.get_event_loop()
try:
loop.run_until_complete(run(loop))
finally:
loop.close()
打印:可以看到call_soon最先完成回调,接着才是设为1秒后运行的回调,2秒的回调1
2
3args: callback soon keyword args: 1
args: get second callback keyword args: 0
args: get first callback keyword args: 1
同样,该loop.call_later不常用。
3、asyncio进阶用法
在上面的例子中,一个主线程创建一个永久事件循环(该永久事件不会自动退出,而是run forever,除非主线程运行后没有阻塞或者手动中断程序运行),再把所有的协程注册到该永久事件循环,该方式较为基础用法的异步模式。在这一节,🔚asyncio高级用法,用线程1创建一个forever事件循环,线程2可以向事件循环中动态添加协程,而且不受主线程阻塞。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import asyncio
import datetime
import threading
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')
def start_another_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def read_file(task_id):
print('coro-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(task_id) # 模拟读取文件的耗时IO
print('coro-{} done at:{}'.format(task_id,get_time()))
new_loop = asyncio.new_event_loop()
# start一个新的线程1,用于启动一个永久事件循环
t = threading.Thread(target=start_another_loop,args=(new_loop,))
t.start()
# 当前线程向线程1的loop注册tasks
asyncio.run_coroutine_threadsafe(read_file(5),new_loop)
asyncio.run_coroutine_threadsafe(read_file(5),new_loop)
输出:1
2
3
4coro-5 started at:09:30
coro-2 started at:09:30
coro-2 done at:09:35
coro-5 done at:09:35
这个动态添加协程task对象有何用?如果task的参数是从redis队列实时取得,然后交由run_coroutine_threadsafe向loop注册协程,那么不就实现基于协程producer-consumer模式。
利用redis 队列实现loop循环事件动态添加协程
这种方式,可以实现并发模式,producer:向redis 队列push 数据(这个数据是指协程task需要的参数,例如sleep的),consumer:使用asyncio.run_coroutine_threadsafe(read_file(msg),new_loop)不断消费producer的数据。
1 | import asyncio |
以上代码的逻辑:
创建两个线程,
- 线程1负责启动一个forever事件循环,用于接收另外一个线程2注册的协程对象(task对象)
- 线程2负责不断从redis队列获取任务数据后,再把协程注册到线程1启动的事件循环,从而实现loop循环事件动态添加协程。
该例子只需要2个线程,即可实现高并发模式。
测试:在redis-cli里面,先lpush一个10,再lpush1个2秒,1个4秒,最终线程2按顺序创建3个线程,都会注册到loop里面,注意对比3个协程的完成时间:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15# redis-cli 在coro_queue队列添加数据
127.0.0.1:6379> LPUSH coro_queue 10
(integer) 1
127.0.0.1:6379> LPUSH coro_queue 2
(integer) 1
127.0.0.1:6379> LPUSH coro_queue 4
(integer) 1
# 程序输出
coro-10 started at:56:48
coro-2 started at:56:50
coro-4 started at:56:51
coro-2 done at:56:52
coro-4 done at:56:55
coro-10 done at:56:58
从打印的时刻可以很清楚看到,3个协程并发执行,总共10秒完成。若同步模式,则需要2+4+10=16秒才能完成。
4、asyncio最适合的使用场景
从redis、Nginx、node.js、Tornado、Twisted这些使用IO多路复用技术的中间件或者框架,可以很明确的推出结论:异步逻辑非常适合处理有Network IO且高并发的socket连接场景,因为这些场景往往需要等待IO,例如:访问web接口数据、访问网页、访问数据库,都是client向server发起网络IO。因此本节给出asyncio的3个场景:异步爬虫,高并发的socket服务、数据库连接。
但是:asyncio的周边库似乎有不少坑,而且距离稳定生产环境有一定距离,参考知乎文章吐槽的asyncio,所有很多文章介绍asyncio基本使用场合,或者自行开发的小工具,很少文章能给出一个使用asyncio实现的复杂项目。当然,协程肯定不适合CPU计算场景。
目前star较高的几个协程异步库有:aiohttp、aioredis、aiomysql、aiopg。aiopg:is a library for accessing a PostgreSQL database from the asyncio 。以上四个协程异步库底层通过封装asyncio实现。本节主要介绍aioredi和aiohttp,其他库可参考官方示例。
4.1 aioredis
aioredis实现的api很丰富,支持sentinel连接,运行原生redis命令的接口,但是它不支持cluster集群的连接:Current release (1.3.0) of the library does not support Redis Cluster in a full manner.
单个连接示例:1
2
3
4
5
6
7
8
9
10import asyncio
import aioredis
async def coro(key,value):
redis = await aioredis.create_redis(
'redis://127.0.0.1')
await redis.set(key,value)
val = await redis.get(key)
redis.close()
await redis.wait_closed()
return val
下面使用单例模式和协程with协议,实现基本的async redis 类:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61import aioredis
import asyncio
import datetime
class AsynRedis(object):
_instance = None
def __init__(self,redis_uri='redis://127.0.0.1',pool=False,max_conn=100,encoding='utf-8'):
self._redis_uri = redis_uri
self._encoding = encoding
self._pool=pool
self._max_conn=max_conn
async def __aenter__(self): # with协议入口
await self.get_conn()
return self
async def __aexit__(self, exc_type, exc, tb): #with 协议出口
await self.close()
async def get_conn(self): # 单例模式创建redis连接,可选pool或者单连接
if not self._instance:
if not self._pool:
self._instance = await aioredis.create_redis(self._redis_uri)
self._instance=await aioredis.create_redis_pool(self._redis_uri,maxsize=self._max_conn)
return self._instance
async def asyn_set(self,*args,**kwargs):
response=await self._instance.set(*args,**kwargs)
return response
async def asyn_get(self,key):
value=await self._instance.get(key,encoding=self._encoding)
return value
async def close(self):
if self._instance:
self._instance.close()
await self._instance.wait_closed()
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')
async def redis_coro(index):
async with AsynRedis() as f: # 异步的aenter和aexit实现with协议
print(f'coro-{index} start at {get_time()}')
await asyncio.sleep(1)
key='foo-{}'.format(index)
result=await f.asyn_set(key,get_time()) # 将时刻作为value,用于观察协程并发,获取set操作返回值,True表示set成功。
print(f'coro-{index} done at {get_time()},key is set? {result}')
if __name__=='__main__':
loop=asyncio.get_event_loop()
tasks=[redis_coro(i) for i in range(10)]
try:
loop.run_until_complete(asyncio.wait(tasks))
finally:
loop.close()
输出:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21coro-9 start at 57:15
coro-4 start at 57:15
coro-2 start at 57:15
coro-5 start at 57:15
coro-0 start at 57:15
coro-8 start at 57:15
coro-6 start at 57:15
coro-7 start at 57:15
coro-3 start at 57:15
coro-1 start at 57:15
coro-9 done at 57:16,key is set? True
coro-4 done at 57:16,key is set? True
coro-2 done at 57:16,key is set? True
coro-5 done at 57:16,key is set? True
coro-0 done at 57:16,key is set? True
coro-8 done at 57:16,key is set? True
coro-6 done at 57:16,key is set? True
coro-7 done at 57:16,key is set? True
coro-3 done at 57:16,key is set? True
coro-1 done at 57:16,key is set? True
可以看到10个redis协程同一时刻并发set key,并且同一时刻完成。
在redis-cli查看key,所有的key都value都是同一时刻,说明协程并发运行正确,而在多线程方式,则需要创建10个线程
才可以实现单线程+10协程
的效果,若当并发量高达1万+时,可以想象多线程将消耗大量系统资源以及线程切换,效率必然不高。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15127.0.0.1:6379> keys foo*
1) "foo-8"
2) "foo-3"
3) "foo-9"
4) "foo-0"
5) "foo-1"
6) "foo-7"
7) "foo-5"
8) "foo-6"
9) "foo-2"
10) "foo-4"
127.0.0.1:6379> get foo-8
"57:16"
127.0.0.1:6379> get foo-0
"57:16"
这里用两个魔法方法__aenter__
,__aexit__
实现with协议,但要注意的是,协程的with方法只能在协程函数内部使用,这个with的上下文就是该协程的上下文。如果写在主线程外部,则提示语法出错:
这是因为协程上下文代表协程自己的栈等信息,肯定不是主线程的上下文,所以不能把async with
写在程序的全局位置
注意:aioredis似乎有个bug,当MacOS系统的文件描述最大限制已设为10000,aioredis不管使用单连接或者pool方式,当并发数设为大值例如1000,部分协程完成后,剩余部分协程都会被阻塞(暂未找到原因)。
在项目中,如果要使用aioredis,可以用asyncio的semaphore信号量限制并发数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28import aioredis
import asyncio
import datetime
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')
async def redis_coro(semaphore,index):
async with semaphore:
r=await aioredis.create_redis('redis://127.0.0.1',db=0)
print(f'coro-{index} start at {get_time()}')
await asyncio.sleep(0.05)
key='bar-{}'.format(index)
result=await r.set(key,get_time(),expire=100)
print(f'coro-{index} done at {get_time()},key is set? {result}')
r.close()
await r.wait_closed()
async def run(max_task=100):
sem=asyncio.Semaphore(10)
tasks=[redis_coro(sem,i) for i in range(max_task)]
await asyncio.wait(tasks)
if __name__=='__main__':
loop=asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
4.1 aiohttp
Asynchronous HTTP client/server framework for asyncio and Python
aiohttp应该是除了request库外最强大的HTTP库,而且是异步实现,三个主要功能:
- Supports both Client and HTTP Server.
- Supports both Server WebSockets and Client WebSockets out-of-the-box without the Callback Hell.
- Web-server has Middlewares, Signals and pluggable routing.
除了基本client和server端服务,还支持websocket、web-server模块还支持可插拔的中间件,官方也给了详细的aiohttp demo代码,链接
以下代码逻辑:使用协程并发get html页面,并使用协程方式存储html到文件或者存到redis1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56import datetime,time
import asyncio,aiohttp,aioredis,aiofiles
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')
async def save_to_file(html,index):
"""将html文本存放到本地目录,使用异步aiofiles实现。"""
file_name=f'/opt/test_aiohttp/html-{index}.txt'
async with aiofiles.open(file_name,'wb') as f:
await f.write(html)
print(f'coro-{index} done at {get_time()}')
async def save_to_redis(html,index):
"""将html文本存放到redis,采用协程模式 """
r=await aioredis.create_redis('redis://127.0.0.1',db=0)
coro_key=f'coro-{index}'
result=await r.set(coro_key,html,expire=60)
print(f'coro-{index} done at {get_time()},is saved? {result}')
r.close()
await r.wait_closed()
async def get_html(semaphore,session,url,index,container='file'):
"""获取url对应的html文本,并调用存放文本的协程,这里就是前面章节提到的嵌套协层,用await调用外部协程 """
try:
async with semaphore:# 由外部传入的信号量,控制并发数
print(f'coro-{index} started at {get_time()}')
async with session.get(url,timeout=5) as response:
if response.status==200:
if container=='file':
container_=save_to_file
else:container_=save_to_redis
html_text=await response.read()
await container_(html_text,index)
return True
else:
return False
except Exception as e:
return False
async def run(semaphore,url,max_workers=10):
async with aiohttp.ClientSession() as session:
urls=[url]*max_workers
tasks=[asyncio.ensure_future(get_html(semaphore,session,each_url,index)) for index,each_url in enumerate(urls)]
await asyncio.gather(*tasks)
if __name__=='__main__':
loop=asyncio.get_event_loop()
semaphore=asyncio.Semaphore(5)
loop.run_until_complete(run(semaphore,'http://spark.apachecn.org/#/'))
# 官方推荐使用Zero-sleep保证底层的一些socket连接完成关闭
loop.run_until_complete(asyncio.sleep(0))
loop.close()
打印结果:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22coro-0 started at 49:26
coro-1 started at 49:26
coro-2 started at 49:26
coro-3 started at 49:26
coro-4 started at 49:26
coro-0 done at 49:26
coro-5 started at 49:26
coro-2 done at 49:26
coro-6 started at 49:26
coro-4 done at 49:26
coro-7 started at 49:26
coro-1 done at 49:26
coro-8 started at 49:26
coro-3 done at 49:26
coro-9 started at 49:26
coro-6 done at 49:26
coro-5 done at 49:27
coro-7 done at 49:27
coro-8 done at 49:27
coro-9 done at 49:27
这里使用asyncio.Semaphore(5)控制并发数,从输出可以看出,一开始有5个协程启动,最先5个协程运行结束后,另外5个协程同时启动。因为10个任务,分了2轮进行。
小结
本文内容相对较多且杂,主要是asyncio协程库有较多api,这些api与同步编程的python库有较大的区别,结合asyncio实现的几个第三方协层库来看,可以看到协程在python生态位置较为小众,如果在项目(例如高性能web接口服务)中引入协程异步编程,可以考虑Tornado以及Twisted。(想想Go语言还是相当强大,协程生态成熟)