Python协程-基于生成器的协程

本篇紧接上篇  Python协程原理-IO多路复用

什么是协程

        在上篇中,我们已经学习了协程的原理-IO多路复用,即将程序分割成多个模块,通过  操作系统的事件驱动  来监控模块
        的可执行状态,当某个模块变为可执行状态时(相关事件发生),  则执行该模块,如:
                发生有新连接  事件,激活执行  "接收新连接模块"  (accept)
                发生数据准备  事件,激活执行  ”接收数据模块“  (read)

        这就是协程,在单线程下,当某个模块阻塞时,可以暂时挂起,交给事件驱动维护,并切换其它可执行的模块.  当发生
        相关事件时,再切回来继续完成模块的执行。

回调机制的缺陷

        在上篇中,模块的执行是通过  回调函数进行的,  如:

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

        在回调函数,又注册了一个  新的事件且绑定了新的回调函数,所以在回调函数内又设置了新的回调函数。
        但如果业务变复杂,  我们需要在新的回调函数内再设置新的回调函数,  那将变成  "俄罗斯套娃",这无疑是程序维护的噩梦

Python  生成器

        在  Python  迭代器和生成器
        已经介绍了生成器的基本用法,可以通过  yield  暂停生成器,并调用  next  恢复生成器的执行

        这套操作,其实很像  协程中的任务挂起  和  恢复执行,但仅仅这样还不足以模拟协程,为了支持协程,python官方为
        生成器设置了  send  方法,既能推进生成器执行,又能向生成器发送数据,
        还设置了  throw  方法,可以向生成器内抛异常,以此来终止生成器的运行

def gen():
    from_send = yield "value"
    print(from_send)


g = gen()
print(g.send(None))  # 得到 "value"
# 再次 send
try:
    g.send("传递值")
except StopIteration:
    g.throw(Exception, "Method throw called!")

Future  对象

        既然我们要用生成器来模拟协程,这就意味着,我们需要
                1、暂时挂起任务:  在这之前,肯定需要先把  fd  注册到  selector  中,这个只需要在  yield  前注册就行
                2、恢复执行:  我们推进生成器执行,如果需要数据,还需要把数据传进去。如何获得生成器对象呢?

        首先,我们是需要等  selector.select()  返回事件信息,然而事件信息只包含:  io的  fd  和  事件回调函数
        如何  操作  生成器对象呢?  显然,事件回调函数,是我们唯一的操作入口。

        容易发现,回调函数是在生成器内设置的,然而,生成器对象却只能在  生成器外初始化获得,那么回调函数怎么去执行
        生成器的相关操作呢?  此时生成器在这里就扮演了重要的角色。

        回调函数需要执行生成器的相关操作,然而生成器对象只能在生成器外获取,那就在生成器外,把  推进操作  绑定到
        一个对象上,这个对象是回调函数能都获取到的,而且可以使用  yield  将其抛到生成器外

        因此,将回调函数就设置在生成器内,这个对象也在生成器内初始化,并通过  yield  抛到生成器外

        这个对象就是  Future  对象,定义为当事件发生后才会使用到的对象。
                self.__callbacks    #  用于储存  推进生成器操作
                self.result              #  用于储存  需要  send  回生成器的数据

class Future:
    def __init__(self):
        self.result = None
        self.__callbacks = []

    def add_done_callbacks(self, func):
        self.__callbacks.append(func)

    def set_result(self, result):
        self.result = result
        for func in self.__callbacks:
            func(self)

Task  对象

        建立  Task  对象用于完成  生成器(协程)  的推进工作

class Task:
    def __init__(self, gene):
        self.generator = gene
        fut = next(self.generator)  # 预激生成器
        fut.add_done_callbacks(self.step)  # 绑定 推进方法

    def step(self, fut):
        #用来 推进 生成器的 执行
        #当监听事件发生后, 相关数据就会填充到 future.result 中,并且回调这个方法 推进 生成器的执行
        try:
            next_fut = self.generator.send(fut.result)
            if next_fut != fut:
                next_fut.add_done_callbacks(self.step)
        except StopIteration as e:
            return

完成代码

服务端

from socket import *
import selectors


class Future:
    def __init__(self):
        self.result = None
        self.__callbacks = []

    def add_done_callbacks(self, func):
        self.__callbacks.append(func)

    def set_result(self, result):
        self.result = result
        for func in self.__callbacks:
            func(self)


class Task:
    def __init__(self, gene):
        self.generator = gene
        fut = next(self.generator)  # 预激生成器
        fut.add_done_callbacks(self.step)  # 绑定 推进方法

    def step(self, fut):
        #用来 推进 生成器的 执行
        #当监听事件发生后, 相关数据就会填充到 future.result 中,并且回调这个方法 推进 生成器的执行
        try:
            next_fut = self.generator.send(fut.result)
            if next_fut != fut:
                next_fut.add_done_callbacks(self.step)
        except StopIteration as e:
            return


def build_listen_socket(port):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind(('127.0.0.1', port))
    s.listen(5)
    s.setblocking(False)  # 设置socket的接口为非阻塞
    return s


def handle_data_ready(conn):
    def read(conn, mask):
        data = conn.recv(1024)  # Should be ready
        future.set_result(data)

    future = Future()
    sel.register(conn, selectors.EVENT_READ, read)
    while True:
        data = yield future
        if data:
            print('echoing', repr(data), 'to', conn)
            conn.send(data.upper())  # Hope it won't block
        else:
            print('closing', conn)
            sel.unregister(conn)
            conn.close()
            break


def handle_incoming(port):
    s = build_listen_socket(port)

    def accept(sock, mask):
        conn, addr = sock.accept()  # Should be ready
        future.set_result((conn, addr))

    future = Future()
    sel.register(s, selectors.EVENT_READ, accept)  # 注册监听socket 并绑定 回调函数 accept

    while True:
        conn, addr = yield future
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        Task(handle_data_ready(conn))


sel = selectors.DefaultSelector()

Task(handle_incoming(8080))
Task(handle_incoming(8081))

while True:  # 事件循环
    events = sel.select()
    for key, mask in events:  # 获取事件信息,执行回调
        callback = key.data
        callback(key.fileobj, mask)

客户端

import sys
from socket import *

port = 8080

if len(sys.argv) > 1:
    port = int(sys.argv[1])

c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', port))

while True:
    msg = input('>>: ')
    if not msg: continue
    c.send(msg.encode('utf-8'))
    data = c.recv(1024)
    print(data.decode('utf-8'))

yield  from

        通过  yield  from    可让生成器中嵌套生成器
        如下面两种方式,是等效的

不使用  yield  from

def handle_incoming(port):
    s = build_listen_socket(port)

    def accept(sock, mask):
        conn, addr = sock.accept()  # Should be ready
        future.set_result((conn, addr))

    future = Future()
    sel.register(s, selectors.EVENT_READ, accept)  # 注册监听socket 并绑定 回调函数 accept

    while True:
        conn, addr = yield future
        print('accepted', conn, 'from', addr)
        conn.setblocking(False)
        Task(handle_data_ready(conn))

使用  yield  from

def handle_data_ready(conn):
    def read(conn, mask):
        data = conn.recv(1024)  # Should be ready
        future.set_result(data)

    future = Future()
    sel.register(conn, selectors.EVENT_READ, read)

    def continue_listen():
        while True:
            data = yield future
            if data:
                print('echoing', repr(data), 'to', conn)
                conn.send(data.upper())  # Hope it won't block
            else:
                print('closing', conn)
                sel.unregister(conn)
                conn.close()
                break

    yield from continue_listen()