Python协程-async原生协程

  
本篇紧接上篇博文,建议先阅读  Python协程-基于生成器的协程
  

原生协程  async/await

        在上一篇中,我们学习了  “基于生成器的协程”,
        我们创建  Future  用来储存结果数据,创建了  Task,用来推进生成器(协程任务)进行:
                当相关事件未发生时,任务则会暂停,事件循环转而去执行其他可执行任务,
                当相关事件发生时,  事件回调则会往  Future  写入数据,  然后再执行  Task  的推进任务推进生成器,
                把  future.result  送入生成器。
  
        从  Python3.4  开始,官方实现了一个新的  异步IO库-asyncio,  以此来实现并简化开发者使用协程。
  
        asyncio  帮助封装的功能:
                1、我们不再需要自己建立  Future/Task,asyncio  已经内置了
                2、我们不再需要自己写循环处理事件,asyncio  已经提供了  io_loop  来完成操作
                3、协程任务不再是生成器,而是协程对象(coruntine),  使用  async  声明函数/方法  为一个协程对象,
                使用  "await  暂没有结果的Future"  将任务挂起
  
  

事件循环驱动  Event_loop

        在前面,我们的的  event_loop  是直接在一个死循环中进行的,很简单地执行  “捕获事件  +  处理事件回调”:

while True:  # 事件循环
    events = sel.select()
    for key, mask in events:  # 获取事件信息,执行回调
        callback = key.data
        callback(key.fileobj, mask)

  
        asyncio  把  event_loop  封装成了一个对象,同样也是在一个死循环  loop.run_forever()中进行,不一样的是,
        它每次循环不仅仅处理发生事件的回调,还处理协程推进任务以及其他事务。  event_loop  内置了  队列  _ready,
        当  Future  获取数据需要执行协程推进回调时,并不是直接执行函数  “协程推进回调”,而是把回调丢进  _ready,
        等到下一次循环时,由  loop  代为执行。
  
        不仅如此,还提供了一个日程列表  _scheduled,将一些定时任务添加至此,时间到了,则会把任务添加进  _ready.
  
        如  asyncio.sleep(3),会添加一个  “在  3  秒后把  future  状态设置为  FINISHED”  的任务,由此让协程挂起  3秒.
  
        再如  Tornado  服务的自动重加载功能:  就是添加了一个  “每  0.5  秒  检查是否有文件更新”  的任务
  
  

Event_loop  核心代码展示

  
        获取  event_loop:  asyncio.get_event_loop()  
                默认通过  thread_local  获得当前线程的  event_loop  对象,若不存在,则会初始化
  
  
        启动  事件循环:  base_events.py:  loop.run_forever()  

    def run_forever(self):
        ...
        events._set_running_loop(self)
        while True:
            self._run_once()
            if self._stopping:
                break
        ...

  
        捕获并处理ready  任务:  base_events.py:  self._run_once()  

    def _run_once(self):
        ...
        event_list = self._selector.select(timeout)  # 捕获事件
        self._process_events(event_list)  # 把事件回调加入 _ready
        ...
        while self._scheduled:  # 处理定时任务,如果到了执行时间,则把任务 放进 _ready
            handle = self._scheduled[0]
            if handle._when >= end_time:
                break
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)
        ...
        ntodo = len(self._ready)
        for i in range(ntodo): # 在此处 处理 ready 的任务
            handle = self._ready.popleft()
            handle._run()

  

创建协程对象和  Task  任务

  
        前面提到了,使用  async  修饰的  函数/方法就是个协程对象,可以使用  asyncio.iscoroutine  或  type  验证

import asyncio
async def test_co():
    await asyncio.sleep(3)
    return ""
 
print(type(test_co()))  # <class 'coroutine'>
print(asyncio.iscoroutine(test_co()))  # True

  
        创建Task任务:  loop.create_task(cor)  

    loop = asyncio.get_event_loop()
    loop.create_task(test_co())

  
          创建Task任务中,调用  call_soon,  向  loop._ready  添加  <Handle  >
          和生成器类似,用于推进协程  

    def call_soon(self, callback, *args, context=None):
        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args, context)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle

  
        然后执行  loop.run_forever()  就可以启动了

    loop = asyncio.get_event_loop()
    loop.create_task(test_co())
    loop.run_forever()

  

await  与  Future  对象

        当  Event_loop  执行一个协程时,什么情况下会将协程挂起呢?
  
        我们知道,在  “基于生成器的协程”  中,  我们是通过  yield  进行的,一般都是  yield  future对象。
        在这里,其实也是类似的做法,await  future对象,  但仅当  future的状态是  pending  时,才会在此处暂停,
        并跳出切换到其他可执行协程;如果是  finish  状态,则认为不需要等待,直接通过。
  
        asyncio  内置的  Future  有个属性  _state,  其有三种状态:
                _PENDING  =  base_futures._PENDING
                _CANCELLED  =  base_futures._CANCELLED
                _FINISHED  =  base_futures._FINISHED
  
        当且仅当  await  后跟的是  一个状态是  PENDING  的  Future  对象时,才会在此处暂停并跳出
        很好理解,意思就是数据还没有准备好
  
        那么,什么时候才会切回来呢?
  
        很显然,当  状态是  FINISHED  时,那什么时候  future  才会变成  FINISHED  呢?
        当相关事件发生,执行的事件回调将会调用  future.set_result,  在这里,就会把  _state  设置为  FINISHED
  

    def set_result(self, result):
        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        self.__schedule_callbacks()

  
        与  “基于生成器的协程”  种类似,在设置完  result  后,就需要  唤醒  协程,这个  “唤醒推进协程的操作”  已经事先在
        添加到  _callbacks  中了  在判断  future._state  为  pending  挂起协程时,就把唤醒操作添加上去了,
        我通过  Debug,看到了它  :
                [(<TaskWakeupMethWrapper  object  at  ...>,  <Context  object  at  ...>)]
  
        set_result  的最后一行是    self.__schedule_callbacks(),  它会把  唤醒操作添加到  loop._ready  中去,
        进而会马上在下次的  _run_once()  中处理

    def __schedule_callbacks(self):
        ...
        for callback, ctx in self._callbacks[:]:
            self._loop.call_soon(callback, self, context=ctx)

  
        唤醒协程,获得  future  的  result
  
        与  “基于生成器的协程”  类似:
                result  =  yield  future    可以通过唤醒函数获得  future.result,  ->  self.gen.send(fut.result)
  
                result  =  await  future  也可以通过  TaskWakeupMethWrapper  获得  future  中的  result
  

await  后还可接协程对象

        await  <coruntine>,  用来实现协程嵌套,与  “基于生成器的协程”  中  使用  yield  from  嵌套生成器是一样的。
  
        同样的,当  await  后跟一个协程对象时,执行过程会直接进入函数,直到遇到  pending  状态的  Future  才会切出来。
        asyncio.sleep(sec)  就是这样的一个协程对象
  
  

使用原生协程构建服务端

  
服务端

from socket import *
import selectors
import asyncio
 
 
def build_listen_socket(port):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind(('127.0.0.1', port))
    s.listen(5)
    s.setblocking(False)  # 设置socket的接口为非阻塞
    return s
 
 
async def handle_data_ready(conn):
    def read(fut, conn, mask):
        data = conn.recv(1024)  # Should be ready
        fut.set_result(data)
 
    future = loop.create_future()
    loop.add_reader(conn.fileno(), read, future, conn, selectors.EVENT_READ)
 
    async def continue_listen(fut):
        while True:
            data = await fut
            if data:
                print('echoing', repr(data), 'to', conn)
                conn.send(data.upper())  # Hope it won't block
                fut = loop.create_future()
                loop.add_reader(conn.fileno(), read, fut, conn, selectors.EVENT_READ)
            else:
                print('closing', conn)
                loop.remove_reader(conn.fileno())
                conn.close()
                break
 
    await continue_listen(future)
 
 
async def handle_incoming(port):
    s = build_listen_socket(port)
 
    def accept(fut, sock, mask):
        conn, addr = sock.accept()  # Should be ready
        fut.set_result((conn, addr))
 
    async def continue_listen(fut):
        while True:
            conn, addr = await fut
            print('accepted', conn, 'from', addr)
            conn.setblocking(False)
            loop.create_task(handle_data_ready(conn))
            fut = loop.create_future()
            loop.add_reader(s.fileno(), accept, fut, s, selectors.EVENT_READ)
 
    future = loop.create_future()
    loop.add_reader(s.fileno(), accept, future, s,
                    selectors.EVENT_READ)  # 注册监听socket fd 并绑定 回调函数 accept
 
    await continue_listen(future)
 
 
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(handle_incoming(8080))
    loop.create_task(handle_incoming(8081))
    loop.run_forever()

  
客户端

import sys
from socket import *
 
port = 8080
if len(sys.argv) > 1:
    port = int(sys.argv[1])
 
c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', port))
 
while True:
    msg = input('>>: ')
    if not msg: continue
    c.send(msg.encode('utf-8'))
    data = c.recv(1024)
    print(data.decode('utf-8'))