本篇紧接上篇博文,建议先阅读 Python协程-基于生成器的协程
原生协程 async/await
在上一篇中,我们学习了 “基于生成器的协程”,
我们创建 Future 用来储存结果数据,创建了 Task,用来推进生成器(协程任务)进行:
当相关事件未发生时,任务则会暂停,事件循环转而去执行其他可执行任务,
当相关事件发生时, 事件回调则会往 Future 写入数据, 然后再执行 Task 的推进任务推进生成器,
把 future.result 送入生成器。
从 Python3.4 开始,官方实现了一个新的 异步IO库-asyncio, 以此来实现并简化开发者使用协程。
asyncio 帮助封装的功能:
1、我们不再需要自己建立 Future/Task,asyncio 已经内置了
2、我们不再需要自己写循环处理事件,asyncio 已经提供了 io_loop 来完成操作
3、协程任务不再是生成器,而是协程对象(coruntine), 使用 async 声明函数/方法 为一个协程对象,
使用 "await 暂没有结果的Future" 将任务挂起
事件循环驱动 Event_loop
在前面,我们的的 event_loop 是直接在一个死循环中进行的,很简单地执行 “捕获事件 + 处理事件回调”:
while True: # 事件循环
events = sel.select()
for key, mask in events: # 获取事件信息,执行回调
callback = key.data
callback(key.fileobj, mask)
asyncio 把 event_loop 封装成了一个对象,同样也是在一个死循环 loop.run_forever()中进行,不一样的是,
它每次循环不仅仅处理发生事件的回调,还处理协程推进任务以及其他事务。 event_loop 内置了 队列 _ready
,
当 Future 获取数据需要执行协程推进回调时,并不是直接执行函数 “协程推进回调”,而是把回调丢进 _ready,
等到下一次循环时,由 loop 代为执行。
不仅如此,还提供了一个日程列表 _scheduled
,将一些定时任务添加至此,时间到了,则会把任务添加进 _ready.
如 asyncio.sleep(3),会添加一个 “在 3 秒后把 future 状态设置为 FINISHED” 的任务,由此让协程挂起 3秒.
再如 Tornado 服务的自动重加载功能: 就是添加了一个 “每 0.5 秒 检查是否有文件更新” 的任务
Event_loop 核心代码展示
获取 event_loop: asyncio.get_event_loop()
默认通过 thread_local 获得当前线程的 event_loop 对象,若不存在,则会初始化
启动 事件循环: base_events.py: loop.run_forever()
def run_forever(self):
...
events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
break
...
捕获并处理ready 任务: base_events.py: self._run_once()
def _run_once(self):
...
event_list = self._selector.select(timeout) # 捕获事件
self._process_events(event_list) # 把事件回调加入 _ready
...
while self._scheduled: # 处理定时任务,如果到了执行时间,则把任务 放进 _ready
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
...
ntodo = len(self._ready)
for i in range(ntodo): # 在此处 处理 ready 的任务
handle = self._ready.popleft()
handle._run()
创建协程对象和 Task 任务
前面提到了,使用 async 修饰的 函数/方法就是个协程对象,可以使用 asyncio.iscoroutine 或 type 验证
import asyncio
async def test_co():
await asyncio.sleep(3)
return ""
print(type(test_co())) # <class 'coroutine'>
print(asyncio.iscoroutine(test_co())) # True
创建Task任务: loop.create_task(cor)
loop = asyncio.get_event_loop()
loop.create_task(test_co())
创建Task任务中,调用 call_soon, 向 loop._ready 添加 <Handle
和生成器类似,用于推进协程
def call_soon(self, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
然后执行 loop.run_forever() 就可以启动了
loop = asyncio.get_event_loop()
loop.create_task(test_co())
loop.run_forever()
await 与 Future 对象
当 Event_loop 执行一个协程时,什么情况下会将协程挂起呢?
我们知道,在 “基于生成器的协程” 中, 我们是通过 yield 进行的,一般都是 yield future对象。
在这里,其实也是类似的做法,await future对象, 但仅当 future的状态是 pending 时,才会在此处暂停,
并跳出切换到其他可执行协程;如果是 finish 状态,则认为不需要等待,直接通过。
asyncio 内置的 Future 有个属性 _state, 其有三种状态:
_PENDING = base_futures._PENDING
_CANCELLED = base_futures._CANCELLED
_FINISHED = base_futures._FINISHED
当且仅当 await 后跟的是 一个状态是 PENDING 的 Future 对象时,才会在此处暂停并跳出
很好理解,意思就是数据还没有准备好
那么,什么时候才会切回来呢?
很显然,当 状态是 FINISHED 时,那什么时候 future 才会变成 FINISHED 呢?
当相关事件发生,执行的事件回调将会调用 future.set_result, 在这里,就会把 _state 设置为 FINISHED
def set_result(self, result):
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
与 “基于生成器的协程” 种类似,在设置完 result 后,就需要 唤醒 协程,这个 “唤醒推进协程的操作” 已经事先在
添加到 _callbacks 中了 在判断 future._state 为 pending 挂起协程时,就把唤醒操作添加上去了,
我通过 Debug,看到了它 :
[(<TaskWakeupMethWrapper object at ...>, <Context object at ...>)]
set_result 的最后一行是 self.__schedule_callbacks(), 它会把 唤醒操作添加到 loop._ready 中去,
进而会马上在下次的 _run_once() 中处理
def __schedule_callbacks(self):
...
for callback, ctx in self._callbacks[:]:
self._loop.call_soon(callback, self, context=ctx)
唤醒协程,获得 future 的 result
与 “基于生成器的协程” 类似:
result = yield future 可以通过唤醒函数获得 future.result, -> self.gen.send(fut.result)
result = await future 也可以通过 TaskWakeupMethWrapper 获得 future 中的 result
await 后还可接协程对象
await <coruntine>, 用来实现协程嵌套,与 “基于生成器的协程” 中 使用 yield from 嵌套生成器是一样的。
同样的,当 await 后跟一个协程对象时,执行过程会直接进入函数,直到遇到 pending 状态的 Future 才会切出来。
asyncio.sleep(sec) 就是这样的一个协程对象
使用原生协程构建服务端
服务端
from socket import *
import selectors
import asyncio
def build_listen_socket(port):
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('127.0.0.1', port))
s.listen(5)
s.setblocking(False) # 设置socket的接口为非阻塞
return s
async def handle_data_ready(conn):
def read(fut, conn, mask):
data = conn.recv(1024) # Should be ready
fut.set_result(data)
future = loop.create_future()
loop.add_reader(conn.fileno(), read, future, conn, selectors.EVENT_READ)
async def continue_listen(fut):
while True:
data = await fut
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data.upper()) # Hope it won't block
fut = loop.create_future()
loop.add_reader(conn.fileno(), read, fut, conn, selectors.EVENT_READ)
else:
print('closing', conn)
loop.remove_reader(conn.fileno())
conn.close()
break
await continue_listen(future)
async def handle_incoming(port):
s = build_listen_socket(port)
def accept(fut, sock, mask):
conn, addr = sock.accept() # Should be ready
fut.set_result((conn, addr))
async def continue_listen(fut):
while True:
conn, addr = await fut
print('accepted', conn, 'from', addr)
conn.setblocking(False)
loop.create_task(handle_data_ready(conn))
fut = loop.create_future()
loop.add_reader(s.fileno(), accept, fut, s, selectors.EVENT_READ)
future = loop.create_future()
loop.add_reader(s.fileno(), accept, future, s,
selectors.EVENT_READ) # 注册监听socket fd 并绑定 回调函数 accept
await continue_listen(future)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(handle_incoming(8080))
loop.create_task(handle_incoming(8081))
loop.run_forever()
客户端
import sys
from socket import *
port = 8080
if len(sys.argv) > 1:
port = int(sys.argv[1])
c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', port))
while True:
msg = input('>>: ')
if not msg: continue
c.send(msg.encode('utf-8'))
data = c.recv(1024)
print(data.decode('utf-8'))