yield-bytes

沉淀、分享与无限进步

深入解析asyncio与协程

  在前面的文章,已经通过gevent实现高并发的协程,本文将详细讨论Python标准库异步IO——asyncio。在Python3.4中引入了协程的概念以及asyncio。asyncio底层调用yield from语法,将任务变成生成器后挂起,这种方式无法实现协程之间的自动切换,在Python3.5中正式确立引入了async和await 的语法,所有的这些工作都使得Python实现异步编程变得更容易上手。

1、asyncio的基本概念

  • event_loop 事件循环:每一个需要异步执行的任务都要注册到事件循环中,事件循环负责管理和调度这些任务之间的执行流程(遇到IO则自动切换协程等)。

  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。

  • future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别

  • async/await 关键字:在python3.5及以上,用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

  在异步的模式里,所有代码逻辑都会运行在一个forever事件循环中(你可以把整个事件循环看成一个总控中心,它监听着当前线程创建的多个协程发发生的事件),它可以同时执行多个协程,这些协程异步地执行,直到遇到 await 关键字,事件循环将会挂起该协程,事件循环这个总控再把当前线程控制权分配给其他协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行。

2、使用asyncio

2.1 使用async关键字和await定义协程

在Python3.5之前,要实现协程方式的写法一般如下:

1
2
3
4
import asyncio
@asyncio.coroutine
async def mytask(task_id):
yield from asyncio.sleep(1)

在Python3.5以后,全面使用async关键字和await定义协程,代码显更直观。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
import datetime

def get_time():
d=datetime.datetime.now()
return d.strftime('%H:%M:%S')

# async 定义了mytask为协程对象
async def mytask(task_id):
# 这里就像gevent的sleep方法模拟IO,而且该协程会被asyncio自动切换
print('task-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(1) # await 要求该行语句的IO是有返回值的例如response=request.get(url),如果直接使用await time.sleep(2),则无法创建协程对象
print('task-{} done at:{}'.format(task_id,get_time()))

# 创建事件循环对象,该事件循环由当前主线程拥有
loop = asyncio.get_event_loop()
tasks=[mytask(i) for i in range(4)] # 这里mytask()是协程对象,不会离开运行。
loop.run_until_complete(asyncio.wait(tasks)) # 这里实行的逻辑就像gevent.joinall(tasks)一样,表示loop一直运行直到所有的协程tasks都完成

  代码中通过async关键字定义一个协程(coroutine),不过该协程不能直接运行,需将它注册到事件循环loop里面,由后者在协程内部发生IO时(asyncio.sleep(2))时候调用协程。asyncio.get_event_loop方法可以创建一个事件循环,然后使用run_until_complete将协程注册到事件循环,并启动事件循环。
输出,从结果可以看出,5个协程同一时刻并发运行。
1
2
3
4
5
6
7
8
9
10
task-1 started at:17:12:37
task-0 started at:17:12:37
task-3 started at:17:12:37
task-2 started at:17:12:37
task-4 started at:17:12:37
task-1 done at:17:12:38
task-0 done at:17:12:38
task-3 done at:17:12:38
task-2 done at:17:12:38
task-4 done at:17:12:38

关于await要求该句为返回值:
await asyncio.sleep(2),这里可以看看sleep返回什么
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@coroutine
def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay == 0:
yield
return result

if loop is None:
loop = events.get_event_loop()
future = loop.create_future()
h = future._loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return (yield from future)
finally:
h.cancel()

如果设为delay值,且loop事件循环已创建(即使代码未创建它也会自动创建),返回的是future对象(yield from future),而这里可以挂起当前协程,直到future完成

2.2 task对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...同上
# async 定义了mytask为协程对象
async def mytask(task_id):
# 这里就像gevent的sleep方法模拟IO,而且该协程会被asyncio自动切换
print('task-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(1)
print('task-{} done at:{}'.format(task_id,get_time()))
return 'ok'

coro=mytask(1)
loop = asyncio.get_event_loop()
task=loop.create_task(coro) # 将协程对象封装为task对象
print('before register to loop:',task)
loop.run_until_complete(future=task)
print('after loop completed,task return the result:',task.result())

查看打印结果:

1
2
3
4
before register to loop: <Task pending coro=<mytask() running at /opt/asyn.py:9>>
task-1 started at:17:39:06
task-1 done at:17:39:07
after loop completed,task return the result: ok

将协程封装为task对象后,task在注册到事件循环之前为pending状态,1秒后,task 结束,并且通过task.result()可以获取协程结果值。
task对象也可用asyncio.ensure_future(coro)创建(接收coro协程或者future对象),它内部封装了loop.create_task

2.2 future对象

前面定义说了future表示将来执行或没有执行的任务的结果,task是future的子类。
基本的方法有:
• cancel(): 取消future的执行,调度回调函数
• result(): 返回future代表的结果
• exception(): 返回future中的Exception
• add_done_callback(fn): 添加一个回调函数,当future执行的时候会调用这个回调函数。
• set_result(result): 将future标为运行完成,并且设置return值,该方法常用
使用future,可以在协程结束后自行回调函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio
...同上
async def coru_1(future_obj,N):
print('coru_1 started at:{}'.format(get_time()))
total=sum(range(N))
await asyncio.sleep(2)
future_obj.set_result('coru_1 returns:{}'.format(total))
print('coru_1 done at:{}'.format(get_time()))

async def coru_2(future_obj,N):
print('coru_2 started at:{}'.format(get_time()))
total=sum(range(N))
await asyncio.sleep(2)
future_obj.set_result('coru_2 returns:{}'.format(total))
print('coru_2 done at:{}'.format(get_time()))

def call_back(future_obj):
time.sleep(1)
print('saved to redis at :',get_time(),future_obj,future_obj.result())

if __name__=='__main__':
loop=asyncio.get_event_loop()
f1=asyncio.Future()
f2=asyncio.Future()
tasks=[coru_1(f1,10),coru_2(f2,20)]
f1.add_done_callback(call_back)
f2.add_done_callback(call_back)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


输出
1
2
3
4
5
6
coru_1 started at:16:52:07
coru_2 started at:16:52:07
coru_1 done at:16:52:09
coru_2 done at:16:52:09
saved to redis at : 16:52:10 <Future finished result='coru_1 returns:45'> coru_1 returns:45
saved to redis at : 16:52:11 <Future finished result='coru_2 returns:190'> coru_2 returns:190

两个协程同时启动且在同一时间结束运行。之后开始回调,可以看到协程1先回调,1秒完成后,再切换到协程2回调。

2.3 获取协程并发执行后的所有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import datetime

def get_time():
d=datetime.datetime.now()
return d.strftime('%H:%M:%S')

async def read_file(task_id):
print('task-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(2) # 模拟读取文件的耗时IO
return 'task-{} done at:{}'.format(task_id,get_time())

loop = asyncio.get_event_loop()
coros=[read_file(i) for i in range(5)] # 创建多个协程
tasks=[asyncio.ensure_future(coro) for coro in coros]# 将协程封装为task对象
loop.run_until_complete(asyncio.wait(tasks))
# 或者loop.run_until_complete(asyncio.gether(*tasks))

# 重点在这里,当所有的协程结束后,可批量获取所有协程的返回结果
get_all_result=[ t.result() for t in tasks]
print(get_all_result)

输出:

1
2
3
4
5
task-0 started at:15:53:08
task-1 started at:15:53:08
task-2 started at:15:53:08
task-3 started at:15:53:08
['task-0 done at:15:53:09', 'task-1 done at:15:53:09', 'task-2 done at:15:53:09', 'task-3 done at:15:53:09']

以上也无需使用future的回调机制获取协程返回值,直接在loop结束后,从task对象的result方法即可获得协程返回值。
需要注意的是:
用于等待所有协程完成的方法asyncio.wait和asyncio.gather,都是接受多个future或coro组成的列表,区别:asyncio.gather内边调用ensure_future方法将列表中不是task的coro封装为future对象,而wait则没有。

2.4 asyncio.gather vs asyncio.wait

这里再给两个例子说明这两者的区别以及应用场合:
asyncio.gather

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import datetime

def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')

async def coro(group_id,coro_id):
print('group{}-task{} started at:{}'.format(group_id,coro_id,get_time()))
await asyncio.sleep(coro_id) # 模拟读取文件的耗时IO
return 'group{}-task{} done at:{}'.format(group_id,coro_id,get_time())

loop=asyncio.get_event_loop()

# 创建三组tasks
tasks1=[asyncio.ensure_future(coro(1,i))for i in range(1,5)]
tasks2=[asyncio.ensure_future(coro(2,i)) for i in range(6,8)]
tasks3=[asyncio.ensure_future(coro(3,i)) for i in range(8,10)]


group1=asyncio.gather(*tasks1) # 对第1组的协程进行分组,group1
group2=asyncio.gather(*tasks2) # 对第2组的协程进行分组,group2
group3=asyncio.gather(*tasks3) # 对第3组的协程进行分组,group3

all_groups=asyncio.gather(group1,group2,group3) # 把3个group再聚合成一个大组,也是就所有协程对象的被聚合到一个大组

loop=asyncio.get_event_loop()
all_group_result=loop.run_until_complete(all_groups)
for index,group in enumerate(all_group_result): # 获取每组协程的输出
print('group {} result:{}'.format(index+1,group))
loop.close()

输出:
1
2
3
4
5
6
7
8
9
10
11
group1-task1 started at:35:19
group1-task2 started at:35:19
group1-task3 started at:35:19
group1-task4 started at:35:19
group2-task6 started at:35:19
group2-task7 started at:35:19
group3-task8 started at:35:19
group3-task9 started at:35:19
group 1 result:['group1-task1 done at:35:21', 'group1-task2 done at:35:21', 'group1-task3 done at:35:21', 'group1-task4 done at:35:21']
group 2 result:['group2-task6 done at:35:21', 'group2-task7 done at:35:21']
group 3 result:['group3-task8 done at:35:21', 'group3-task9 done at:35:21']

从打印结果可知,每组协程都在同一时刻开始以及同一时刻结束,asyncio.gather就是用于在更高层面对task进行分组,以不同的组管理不同的协程,你可以看出gather是一个粗粒度组织协程,自动收集所有协程结束后的返回值。

asyncio.wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import asyncio
import datetime

def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')

async def coro(task_id):
print('coro-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(task_id) # 模拟读取文件的耗时IO
return 'coro-{} done at:{}'.format(task_id,get_time())

tasks=[coro(i) for i in range(1,10)]

loop=asyncio.get_event_loop()

first_complete,unfinished1=loop.run_until_complete(asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED))
# 获取首个已结束的协程返回值,注意这里firt_complete是一个set()

first_done_task=first_complete.pop()
print('首个完成的协程返回值:',first_done_task.result())
print('还未结束的协程数量:',len(unfinished1))
# 将第一阶段未完成的协程注册到loop里面
finished2,unfinished2=loop.run_until_complete(asyncio.wait(unfinished1,timeout=3))

# 获取第二阶段已完成的协程返回值
for t in finished2:
print(t.result())
print('还未结束的协程数量:',len(unfinished2))

# 将第二阶段未完成的协程注册到loop里面
finished3,unfinished3=loop.run_until_complete(asyncio.wait(unfinished2))
# 获取第三阶段已完成的协程返回值
for t in finished3:
print(t.result())
print('还未结束的协程数量:',len(unfinished3))


输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
coro-5 started at:23:40
coro-1 started at:23:40
coro-6 started at:23:40
coro-7 started at:23:40
coro-2 started at:23:40
coro-8 started at:23:40
coro-3 started at:23:40
coro-9 started at:23:40
coro-4 started at:23:40
首个完成的协程返回值: coro-1 done at:23:41
还未结束的协程数量: 8
coro-2 done at:23:42
coro-3 done at:23:43
coro-4 done at:23:44
还未结束的协程数量: 5
coro-6 done at:23:46
coro-7 done at:23:47
coro-9 done at:23:49
coro-5 done at:23:45
coro-8 done at:23:48
还未结束的协程数量: 0

从输出结果可以很清看出asyncio.wait很精确的控制协程运行过程,通过wait(return_when=asyncio.FIRST_COMPLETED)可拿到运行完成的协程,通过wait(timeout)控制指定时间后放回已完成的协程。

2.5 嵌套协程的实现(协程内调用协程)
一种易于理解的调用异步函数的方式

  在介绍嵌套协程或者闭包协程、协程内调用协程的概念前,先看看普通函数内部调用普通函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time

def func1(data):
time.sleep(1)
return data

def func2(data):
time.sleep(1)
return data*2

def func3(data):
time.sleep(1)
return data*3

def gel_all_data():
result1=func1('foo')
result2=func2(result1)
result3=func3(result2)
return (result1,result2,result3)


  在同步的编程思维下,大家很容易理解get_all_data函数内部调用func1等三个外部函数来获取相应返回值,其实将同步改为异步的过程很简单:
==在每个函数前面使用关键字async向python解释器声明这是异步函数,如果需要调用外部异步函数,需使用await关键字==,将上面的同步编程改成异步编程的模式如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
async def func1(start_data):
await asyncio.sleep(1) # 要使用asyncio的异步sleep方法,它会让出线程控制权给其他协程,而内建的sleep为同步性质
return start_data

async def func2(data):
await asyncio.sleep(1)
return data*2

async def func3(data):
await asyncio.sleep(1)
return data*3

async def get_all_data():
result1=await func1('foo') # 在异步函数内部,使用await关键字调用其他异步函数,并获取该异步函数的返回值。执行流会在此将当前线程控权让出
result2=await func2(result1)# 同上
result3=await func3(result2) # 同上
return(result1,result2,result3)

该异步的get_all_data其实要实现的需求为:一个协程内部调用其他协程,而且可以将返回值放置在不同的协程上,可以实现链式的协程调度,这看起来就是一个协程任务流 。
当熟悉了这种异步的编程模式后,可以玩一些更为进阶的例子:

协程嵌套示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import datetime

def get_time():
d=datetime.datetime.now()
return d.strftime('%H:%M:%S')

async def inner_coro(task_id):
print('coro-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(5) # 模拟读取文件的耗时IO
return 'coro-{} done at:{}'.format(task_id,get_time())

async def outter_coro():
print('outter_coro started at:{}'.format(get_time()))
coros=[inner_coro(i) for i in range(4)]
tasks=[asyncio.ensure_future(coro) for coro in coros]
inner_tasks,pendings=await asyncio.wait(tasks) # 这句实现了协程中再调用协程
print('outter_coro done at:{}'.format(get_time()))
# 使用asyncio.wait(tasks)可以在外层协程里面获取嵌套协程的运行返回值
for task in inner_tasks:
print(task.result())

loop = asyncio.get_event_loop()
try:
loop.run_until_complete(outter_coro())
except Exception as e:
loop.close()

输出:

1
2
3
4
5
6
7
8
9
10
11
outter_coro started at:14:52:59
coro-0 started at:14:52:59
coro-1 started at:14:52:59
coro-2 started at:14:52:59
coro-3 started at:14:52:59

outter_coro done at:14:53:04
coro-1 done at:14:53:04
coro-3 done at:14:53:04
coro-2 done at:14:53:04
coro-0 done at:14:53:04

可以看到外层协程和内层协程同时启动(当然外层协程函数最先执行),而且都在同一个时刻结束。
内层协程返回值只能在外程协程内部获取,能否在
loop.run_until_complete(outter_coro()) 之后,一次性获取协程返回值? 需要改用asyncio.gather方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
async def outter_coro():
print('outter_coro started at:{}'.format(get_time()))
coros=[inner_coro(i) for i in range(4)]
tasks=[asyncio.ensure_future(coro) for coro in coros]
print('outter_coro done at:{}'.format(get_time()))
return await asyncio.gather(*tasks)

loop = asyncio.get_event_loop()
all_coro_result=loop.run_until_complete(outter_coro())
for t in all_coro_result:
print(t)

loop.close()


输出:
1
2
3
4
5
6
7
8
9
10
outter_coro started at:58:44
outter_coro done at:58:44
coro-0 started at:58:44
coro-1 started at:58:44
coro-2 started at:58:44
coro-3 started at:58:44
coro-0 done at:58:46
coro-1 done at:58:46
coro-2 done at:58:46
coro-3 done at:58:46

从外层协程outter_coro的启动时刻和结束时刻都一样可以看出,outter_coro和return await asyncio.gather(*tasks)是异步执行的,且在outter_coro结束后,loop事件循环只需管理coro-0到coro-3这4个协程。

从以上两种嵌套协程返回值的写法,可以看到这样逻辑:

  • 外层协程直接返回 awaitable对象给loop,loop就可以在最后获取所有协程的返回值;
    1
    2
    3
    4
    5
    6
    def outter_coro()
    return await asyncio.wait(tasks) # 返回awaitable对象给下文loop,这里用asyncio.wait挂起所有协程

    done,pending=loop.run_until_complete(outter_coro())
    for task in done:
    print(task.result())
  • 外层协程没有返回 awaitable对象给loop,loop无法获取所有协程的返回值,只能在外程协程里面获取所有协程返回值
    1
    2
    3
    4
    5
    6
    def outter_coro()
    done,pending=await asyncio.wait(tasks) # 没有返回awaitable对象给下文loop
    for task in done:
    print(task.result())

    loop.run_until_complete(outter_coro())
    2.6 如何取消运行中协程
    future(task)对象主要有以下几个状态:
    pending、running、done、cancelled
    创建future(task)的时候,task为pending状态:
    tasks=[asyncio.ensure_future(coro) for coro in coros]
    事件循环调用执行的时候且协程未结束时对应tsak为running状态,
    loop.run_until_complete(outter_coro())
    事件循环运行结束后,所有的task为done状态,
    1
    2
    3
    done,pending=loop.run_until_complete(outter_coro())
    for task in done:
    print(task.result())
    那么最后一个cancelled状态如何实现呢?例如你想在某些协程未done之前将其cancel掉,如何处理?引用2.4章节的asyncio.wait例子说明:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import asyncio
    import datetime

    def get_time():
    d=datetime.datetime.now()
    return d.strftime('%M:%S')

    async def coro(task_id):
    print('coro-{} started at:{}'.format(task_id,get_time()))
    await asyncio.sleep(task_id) # 模拟读取文件的耗时IO
    return 'coro-{} done at:{}'.format(task_id,get_time())

    tasks=[coro(i) for i in range(1,10)]

    loop=asyncio.get_event_loop()

    first_complete,unfinished1=loop.run_until_complete(asyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED))
    # 获取首个已结束的协程返回值,注意这里firt_complete是一个set()

    first_done_task=first_complete.pop()
    print('首个完成的协程返回值:',first_done_task.result())
    print('还未结束的协程数量:',len(unfinished1))
    # 将第一阶段未完成的协程注册到loop里面
    finished2,unfinished2=loop.run_until_complete(asyncio.wait(unfinished1,timeout=3))
    for t in finished2:
    print(t.result())


    print('还未结束的协程数量:',len(unfinished2))

    for task in unfinished2: # 取消剩余未运行的task
    print('cancell unfinished task:',task,'==>is canceled:',task.cancel())

    输出结果:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    coro-4 started at:17:47
    coro-5 started at:17:47
    coro-1 started at:17:47
    coro-6 started at:17:47
    coro-7 started at:17:47
    coro-2 started at:17:47
    coro-8 started at:17:47
    coro-3 started at:17:47
    coro-9 started at:17:47
    首个完成的协程返回值: coro-1 done at:17:48
    还未结束的协程数量: 8
    coro-2 done at:17:49
    coro-3 done at:17:50
    coro-4 done at:17:51
    还未结束的协程数量: 5
    cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py:10> wait_for=<Future cancelled>> ==>is canceled: True
    cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py:10> wait_for=<Future cancelled>> ==>is canceled: True
    cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py:10> wait_for=<Future cancelled>> ==>is canceled: True
    cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py10> wait_for=<Future cancelled>> ==>is canceled: True
    cancell unfinished task: <Task pending coro=<coro() running at /opt/cancel_task.py:10> wait_for=<Future cancelled>> ==>is canceled: True
    从输出结果可看到,8个协程task并发运行,最早结束的是coro-1,接着是coro-2、coro-3、coro-4,因为设定asyncio.wait(unfinished1,timeout=3) 3秒超时,只要超过3秒后,loop返回这些未运行的task,接着再逐个取消,可以看到5个协程被取消,True表示当前协程取消成功。
2.7 理解loop的相关方法
loop.run_until_complate vs loop.run_forever

  loop.run_until_complate可以在程序的不同位置多次调用,例如在2.4 asyncio.gather vs asyncio.wait 提到的asyncio.wait用法,同一程序中能出现多个loop.run_until_complate

1
2
3
4
5
6
7
# 将第一阶段未完成的协程注册到loop里面
finished2,unfinished2=loop.run_until_complete(asyncio.wait(unfinished1,timeout=3))
print('还未结束的协程数量:',len(unfinished2))

# 将第二阶段未完成的协程注册到loop里面
finished3,unfinished3=loop.run_until_complete(asyncio.wait(unfinished2))
print('还未结束的协程数量:',len(unfinished3))

而对于 loop.run_forever,在同一程序中,只能有一个,因为该事件是在当前线程后台永久运行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def coro():
print('start a coro')
await asyncio.sleep(1)
print('coro done')


future_obj=asyncio.ensure_future(coro())
loop=asyncio.get_event_loop()
loop.run_forever() # 程序不会退出,loop一直挂在这里,等待其他future对象

future_obj=asyncio.ensure_future(coro()) #程序没有报错,但执行流永远不会到达这里,该句永远不会运行
loop.run_forever() # 程序没有报错,但执行流永远不会到达这里,该语句永远不会运行

输出:
1
2
3
4
start a coro
coro done
,,,,
# 等待程序退出

loop.stop()

上面提到如果程序仅有loop.run_forever(),那么当future完成后,程序一直没有退出,若要求实现当future完成后,程序也需要正常退出,可以这样处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import datetime
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')

async def coro(n):
print('start a coro at ',get_time())
await asyncio.sleep(n)
print('coro done at ',get_time())

future_obj=asyncio.ensure_future(coro(2))
loop=asyncio.get_event_loop()
loop.stop() # 在run_forever()前,先stop
print('stop后,loop事件还在运行? at ',get_time())
loop.run_forever()

输出
1
2
stop后,loop事件还在运行? at  43:11
start a coro at 43:11

从输出可以看到,上面的示例代码都是异步并发运行。
print('stop后,loop事件还在运行? at ',get_time())语句跟future任务同时运行
==注意:如果把loop.stop()方法放在run_forever后面,可预见,程序不会退出==
1
2
3
4
...
loop.run_forever()
loop.stop() # 执行流永远不会到达这一句
print('stop后,loop事件还在运行? at ',get_time()) # 执行流永远不会到达这一句

loop.call_soon/loop.call_later

这两个方法用于在异步函数里面调用同步函数(普通函数),且可以实现立刻调用或者稍后调用:
loop.call_soon(callback, *args, context=None): 立刻调用,并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import functools

def callback(name,stat=1):
print('args:',name,'keyword args:',stat)

async def run(loop):
loop.call_soon(callback,'get first callback')
wrapper_func=functools.partial(callback,stat=2)
loop.call_soon(wrapper_func,'get second call back')

loop=asyncio.get_event_loop()
try:
loop.run_until_complete(run(loop))
finally:
loop.close()

打印:
1
2
args: get first callback keyword args: 1
args: get second call back keyword args: 2

  异步调用同步函数其实已经破坏了异步的并发机制,因此很少使用这些非异步的方法。
  此外loop.call_soon不支持协程函数传入关键字,因此可以通过偏函数先把关键字参数”传入“callback的kwargs里面,之后在call_soon里面,就可以利用这个被”包装过的callback“再传入位置参数即可(loop.run_until_complete传入关键字参数也一样这么处理)

loop.call_later(delay, callback, *args, context=None): 再给定一个时间之后,再调用callback。context默认当前线程的上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import functools

def callback(name,stat=1):
print('args:',name,'keyword args:',stat)

async def run(loop):
loop.call_later(2,callback,'get first callback')
loop.call_soon(callback,'callback soon')
wrapper_func=functools.partial(callback,stat=0)
loop.call_later(1,wrapper_func,'get second callback')
await asyncio.sleep(2) # 这里如果不设sleep,那么call_soon执行后loop马上退出,导致2个有延时运行的callback也退出了。这里要大于等于delay时间最长的call_later
loop=asyncio.get_event_loop()
try:
loop.run_until_complete(run(loop))
finally:
loop.close()

打印:可以看到call_soon最先完成回调,接着才是设为1秒后运行的回调,2秒的回调
1
2
3
args: callback soon keyword args: 1
args: get second callback keyword args: 0
args: get first callback keyword args: 1

同样,该loop.call_later不常用。

3、asyncio进阶用法

  在上面的例子中,一个主线程创建一个永久事件循环(该永久事件不会自动退出,而是run forever,除非主线程运行后没有阻塞或者手动中断程序运行),再把所有的协程注册到该永久事件循环,该方式较为基础用法的异步模式。在这一节,🔚asyncio高级用法,用线程1创建一个forever事件循环,线程2可以向事件循环中动态添加协程,而且不受主线程阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import datetime
import threading

def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')

def start_another_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()


async def read_file(task_id):
print('coro-{} started at:{}'.format(task_id,get_time()))
await asyncio.sleep(task_id) # 模拟读取文件的耗时IO
print('coro-{} done at:{}'.format(task_id,get_time()))


new_loop = asyncio.new_event_loop()
# start一个新的线程1,用于启动一个永久事件循环
t = threading.Thread(target=start_another_loop,args=(new_loop,))
t.start()

# 当前线程向线程1的loop注册tasks
asyncio.run_coroutine_threadsafe(read_file(5),new_loop)
asyncio.run_coroutine_threadsafe(read_file(5),new_loop)


输出:
1
2
3
4
coro-5 started at:09:30
coro-2 started at:09:30
coro-2 done at:09:35
coro-5 done at:09:35

这个动态添加协程task对象有何用?如果task的参数是从redis队列实时取得,然后交由run_coroutine_threadsafe向loop注册协程,那么不就实现基于协程producer-consumer模式。

利用redis 队列实现loop循环事件动态添加协程

  这种方式,可以实现并发模式,producer:向redis 队列push 数据(这个数据是指协程task需要的参数,例如sleep的),consumer:使用asyncio.run_coroutine_threadsafe(read_file(msg),new_loop)不断消费producer的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import asyncio
import threading
import redis
import datetime,time

class MyCoro(object):
def __init__(self,host='127.0.0.1',port=6379,key='coro_queue',max_redis_conns=1000,semaphore=2):
self.r_pool=redis.ConnectionPool(host=host,port=port,max_connections=max_redis_conns)
self.r_conn=redis.Redis(connection_pool=self.r_pool)
self.r_queue_key=key
self.semaphore=semaphore
self.new_loop=asyncio.new_event_loop()
self.start()

@staticmethod
def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')


async def coro(self,task_id):
""" 协程的worker,这里模拟IO耗时操作 """
print('coro-{} started at:{}'.format(task_id,self.get_time()))
await asyncio.sleep(task_id) # 模拟读取文件的耗时IO
print('coro-{} done at:{}'.format(task_id,self.get_time()))
#return 'coro-{} done at:{}'.format(task_id,get_time()

def forever_loop(self,loop_obj):
"""用于主线程启动一个永久事件循环,接收来自另外一个线程注册的协程对象"""
asyncio.set_event_loop(loop_obj)
loop_obj.run_forever()

def start_forever_loop(self):
"""用一个主线程去启动一个永久事件循环"""
t=threading.Thread(target=self.forever_loop,args=(self.new_loop,))
t.start()

def forever_consumer(self):
"""由另外一个子线层启动,该线程不断从redis队列获取数据,并用run_coroutine_threadsafe不断向new_loop注册task对象"""
while True:
task_id=self.r_conn.rpop(self.r_queue_key)
if not task_id:
time.sleep(1)
continue
task_id=task_id.decode('utf-8')
asyncio.run_coroutine_threadsafe(self.coro(int(task_id)),self.new_loop)

def start_forever_consumer(self):
"""用一个子线程用于向事件循环注册协程对象 """
t=threading.Thread(target=self.forever_consumer)
t.start()
t.join()# 这里要阻塞当前线程,否则就无法实现不但从redis队列获取任务了。若不阻塞,主线程start()后,子线程start()后,程序立即结束

def start(self):
"""在一个方法里面,同时启动两个线程,简化api"""
self.start_forever_loop()
self.forever_consumer()

if __name__=='__main__':
coro=MyCoro()
coro.start()

以上代码的逻辑:
创建两个线程,

  • 线程1负责启动一个forever事件循环,用于接收另外一个线程2注册的协程对象(task对象)
  • 线程2负责不断从redis队列获取任务数据后,再把协程注册到线程1启动的事件循环,从而实现loop循环事件动态添加协程。
    该例子只需要2个线程,即可实现高并发模式。

测试:在redis-cli里面,先lpush一个10,再lpush1个2秒,1个4秒,最终线程2按顺序创建3个线程,都会注册到loop里面,注意对比3个协程的完成时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# redis-cli 在coro_queue队列添加数据
127.0.0.1:6379> LPUSH coro_queue 10
(integer) 1
127.0.0.1:6379> LPUSH coro_queue 2
(integer) 1
127.0.0.1:6379> LPUSH coro_queue 4
(integer) 1

# 程序输出
coro-10 started at:56:48
coro-2 started at:56:50
coro-4 started at:56:51
coro-2 done at:56:52
coro-4 done at:56:55
coro-10 done at:56:58

从打印的时刻可以很清楚看到,3个协程并发执行,总共10秒完成。若同步模式,则需要2+4+10=16秒才能完成。

4、asyncio最适合的使用场景

  从redis、Nginx、node.js、Tornado、Twisted这些使用IO多路复用技术的中间件或者框架,可以很明确的推出结论:异步逻辑非常适合处理有Network IO且高并发的socket连接场景,因为这些场景往往需要等待IO,例如:访问web接口数据、访问网页、访问数据库,都是client向server发起网络IO。因此本节给出asyncio的3个场景:异步爬虫,高并发的socket服务、数据库连接。
  但是:asyncio的周边库似乎有不少坑,而且距离稳定生产环境有一定距离,参考知乎文章吐槽的asyncio,所有很多文章介绍asyncio基本使用场合,或者自行开发的小工具,很少文章能给出一个使用asyncio实现的复杂项目。当然,协程肯定不适合CPU计算场景。
  目前star较高的几个协程异步库有:aiohttpaioredisaiomysqlaiopg。aiopg:is a library for accessing a PostgreSQL database from the asyncio 。以上四个协程异步库底层通过封装asyncio实现。本节主要介绍aioredi和aiohttp,其他库可参考官方示例。

4.1 aioredis

  aioredis实现的api很丰富,支持sentinel连接,运行原生redis命令的接口,但是它不支持cluster集群的连接:Current release (1.3.0) of the library does not support Redis Cluster in a full manner.
单个连接示例:

1
2
3
4
5
6
7
8
9
10
import asyncio
import aioredis
async def coro(key,value):
redis = await aioredis.create_redis(
'redis://127.0.0.1')
await redis.set(key,value)
val = await redis.get(key)
redis.close()
await redis.wait_closed()
return val

下面使用单例模式和协程with协议,实现基本的async redis 类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import aioredis
import asyncio
import datetime

class AsynRedis(object):
_instance = None

def __init__(self,redis_uri='redis://127.0.0.1',pool=False,max_conn=100,encoding='utf-8'):
self._redis_uri = redis_uri
self._encoding = encoding
self._pool=pool
self._max_conn=max_conn

async def __aenter__(self): # with协议入口
await self.get_conn()
return self

async def __aexit__(self, exc_type, exc, tb): #with 协议出口
await self.close()

async def get_conn(self): # 单例模式创建redis连接,可选pool或者单连接
if not self._instance:
if not self._pool:
self._instance = await aioredis.create_redis(self._redis_uri)
self._instance=await aioredis.create_redis_pool(self._redis_uri,maxsize=self._max_conn)
return self._instance

async def asyn_set(self,*args,**kwargs):
response=await self._instance.set(*args,**kwargs)
return response

async def asyn_get(self,key):
value=await self._instance.get(key,encoding=self._encoding)
return value

async def close(self):
if self._instance:
self._instance.close()
await self._instance.wait_closed()


def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')

async def redis_coro(index):
async with AsynRedis() as f: # 异步的aenter和aexit实现with协议
print(f'coro-{index} start at {get_time()}')
await asyncio.sleep(1)
key='foo-{}'.format(index)
result=await f.asyn_set(key,get_time()) # 将时刻作为value,用于观察协程并发,获取set操作返回值,True表示set成功。
print(f'coro-{index} done at {get_time()},key is set? {result}')

if __name__=='__main__':
loop=asyncio.get_event_loop()
tasks=[redis_coro(i) for i in range(10)]
try:
loop.run_until_complete(asyncio.wait(tasks))
finally:
loop.close()


输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
coro-9 start at 57:15
coro-4 start at 57:15
coro-2 start at 57:15
coro-5 start at 57:15
coro-0 start at 57:15
coro-8 start at 57:15
coro-6 start at 57:15
coro-7 start at 57:15
coro-3 start at 57:15
coro-1 start at 57:15

coro-9 done at 57:16,key is set? True
coro-4 done at 57:16,key is set? True
coro-2 done at 57:16,key is set? True
coro-5 done at 57:16,key is set? True
coro-0 done at 57:16,key is set? True
coro-8 done at 57:16,key is set? True
coro-6 done at 57:16,key is set? True
coro-7 done at 57:16,key is set? True
coro-3 done at 57:16,key is set? True
coro-1 done at 57:16,key is set? True

可以看到10个redis协程同一时刻并发set key,并且同一时刻完成。
在redis-cli查看key,所有的key都value都是同一时刻,说明协程并发运行正确,而在多线程方式,则需要创建10个线程才可以实现单线程+10协程的效果,若当并发量高达1万+时,可以想象多线程将消耗大量系统资源以及线程切换,效率必然不高。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
127.0.0.1:6379> keys foo*
1) "foo-8"
2) "foo-3"
3) "foo-9"
4) "foo-0"
5) "foo-1"
6) "foo-7"
7) "foo-5"
8) "foo-6"
9) "foo-2"
10) "foo-4"
127.0.0.1:6379> get foo-8
"57:16"
127.0.0.1:6379> get foo-0
"57:16"

这里用两个魔法方法__aenter__,__aexit__实现with协议,但要注意的是,协程的with方法只能在协程函数内部使用,这个with的上下文就是该协程的上下文。如果写在主线程外部,则提示语法出错:
在这里插入图片描述这是因为协程上下文代表协程自己的栈等信息,肯定不是主线程的上下文,所以不能把async with写在程序的全局位置
注意:aioredis似乎有个bug,当MacOS系统的文件描述最大限制已设为10000,aioredis不管使用单连接或者pool方式,当并发数设为大值例如1000,部分协程完成后,剩余部分协程都会被阻塞(暂未找到原因)。

在项目中,如果要使用aioredis,可以用asyncio的semaphore信号量限制并发数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import aioredis
import asyncio
import datetime

def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')

async def redis_coro(semaphore,index):
async with semaphore:
r=await aioredis.create_redis('redis://127.0.0.1',db=0)
print(f'coro-{index} start at {get_time()}')
await asyncio.sleep(0.05)
key='bar-{}'.format(index)
result=await r.set(key,get_time(),expire=100)
print(f'coro-{index} done at {get_time()},key is set? {result}')
r.close()
await r.wait_closed()

async def run(max_task=100):
sem=asyncio.Semaphore(10)
tasks=[redis_coro(sem,i) for i in range(max_task)]
await asyncio.wait(tasks)

if __name__=='__main__':
loop=asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

4.1 aiohttp

  Asynchronous HTTP client/server framework for asyncio and Python
aiohttp应该是除了request库外最强大的HTTP库,而且是异步实现,三个主要功能:

  • Supports both Client and HTTP Server.
  • Supports both Server WebSockets and Client WebSockets out-of-the-box without the Callback Hell.
  • Web-server has Middlewares, Signals and pluggable routing.
    除了基本client和server端服务,还支持websocket、web-server模块还支持可插拔的中间件,官方也给了详细的aiohttp demo代码,链接

以下代码逻辑:使用协程并发get html页面,并使用协程方式存储html到文件或者存到redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import datetime,time
import asyncio,aiohttp,aioredis,aiofiles

def get_time():
d=datetime.datetime.now()
return d.strftime('%M:%S')

async def save_to_file(html,index):
"""将html文本存放到本地目录,使用异步aiofiles实现。"""
file_name=f'/opt/test_aiohttp/html-{index}.txt'
async with aiofiles.open(file_name,'wb') as f:
await f.write(html)
print(f'coro-{index} done at {get_time()}')

async def save_to_redis(html,index):
"""将html文本存放到redis,采用协程模式 """
r=await aioredis.create_redis('redis://127.0.0.1',db=0)
coro_key=f'coro-{index}'
result=await r.set(coro_key,html,expire=60)
print(f'coro-{index} done at {get_time()},is saved? {result}')
r.close()
await r.wait_closed()

async def get_html(semaphore,session,url,index,container='file'):
"""获取url对应的html文本,并调用存放文本的协程,这里就是前面章节提到的嵌套协层,用await调用外部协程 """
try:
async with semaphore:# 由外部传入的信号量,控制并发数
print(f'coro-{index} started at {get_time()}')
async with session.get(url,timeout=5) as response:
if response.status==200:
if container=='file':
container_=save_to_file
else:container_=save_to_redis
html_text=await response.read()
await container_(html_text,index)
return True
else:
return False
except Exception as e:
return False


async def run(semaphore,url,max_workers=10):
async with aiohttp.ClientSession() as session:
urls=[url]*max_workers
tasks=[asyncio.ensure_future(get_html(semaphore,session,each_url,index)) for index,each_url in enumerate(urls)]
await asyncio.gather(*tasks)

if __name__=='__main__':
loop=asyncio.get_event_loop()
semaphore=asyncio.Semaphore(5)
loop.run_until_complete(run(semaphore,'http://spark.apachecn.org/#/'))

# 官方推荐使用Zero-sleep保证底层的一些socket连接完成关闭
loop.run_until_complete(asyncio.sleep(0))
loop.close()

打印结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
coro-0 started at 49:26
coro-1 started at 49:26
coro-2 started at 49:26
coro-3 started at 49:26
coro-4 started at 49:26

coro-0 done at 49:26
coro-5 started at 49:26
coro-2 done at 49:26
coro-6 started at 49:26
coro-4 done at 49:26
coro-7 started at 49:26
coro-1 done at 49:26
coro-8 started at 49:26
coro-3 done at 49:26
coro-9 started at 49:26

coro-6 done at 49:26
coro-5 done at 49:27
coro-7 done at 49:27
coro-8 done at 49:27
coro-9 done at 49:27

这里使用asyncio.Semaphore(5)控制并发数,从输出可以看出,一开始有5个协程启动,最先5个协程运行结束后,另外5个协程同时启动。因为10个任务,分了2轮进行。

小结

  本文内容相对较多且杂,主要是asyncio协程库有较多api,这些api与同步编程的python库有较大的区别,结合asyncio实现的几个第三方协层库来看,可以看到协程在python生态位置较为小众,如果在项目(例如高性能web接口服务)中引入协程异步编程,可以考虑Tornado以及Twisted。(想想Go语言还是相当强大,协程生态成熟)