本篇紧接上篇 Python协程原理-IO多路复用
什么是协程
在上篇中,我们已经学习了协程的原理-IO多路复用,即将程序分割成多个模块,通过 操作系统的事件驱动 来监控模块
的可执行状态,当某个模块变为可执行状态时(相关事件发生), 则执行该模块,如:
发生有新连接 事件,激活执行 "接收新连接模块" (accept)
发生数据准备 事件,激活执行 ”接收数据模块“ (read)
这就是协程,在单线程下,当某个模块阻塞时,可以暂时挂起,交给事件驱动维护,并切换其它可执行的模块. 当发生
相关事件时,再切回来继续完成模块的执行。
回调机制的缺陷
在上篇中,模块的执行是通过 回调函数进行的, 如:
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
在回调函数,又注册了一个 新的事件且绑定了新的回调函数,所以在回调函数内又设置了新的回调函数。
但如果业务变复杂, 我们需要在新的回调函数内再设置新的回调函数, 那将变成 "俄罗斯套娃",这无疑是程序维护的噩梦
Python 生成器
在 Python 迭代器和生成器中
已经介绍了生成器的基本用法,可以通过 yield
暂停生成器,并调用 next
恢复生成器的执行
这套操作,其实很像 协程中的任务挂起 和 恢复执行,但仅仅这样还不足以模拟协程,为了支持协程,python官方为
生成器设置了 send
方法,既能推进生成器执行,又能向生成器发送数据,
还设置了 throw
方法,可以向生成器内抛异常,以此来终止生成器的运行
def gen():
from_send = yield "value"
print(from_send)
g = gen()
print(g.send(None)) # 得到 "value"
# 再次 send
try:
g.send("传递值")
except StopIteration:
g.throw(Exception, "Method throw called!")
Future 对象
既然我们要用生成器来模拟协程,这就意味着,我们需要
1、暂时挂起任务: 在这之前,肯定需要先把 fd 注册到 selector 中,这个只需要在 yield 前注册就行
2、恢复执行: 我们推进生成器执行,如果需要数据,还需要把数据传进去。如何获得生成器对象呢?
首先,我们是需要等 selector.select() 返回事件信息,然而事件信息只包含: io的 fd 和 事件回调函数
如何 操作 生成器对象呢? 显然,事件回调函数,是我们唯一的操作入口。
容易发现,回调函数是在生成器内设置的,然而,生成器对象却只能在 生成器外初始化获得,那么回调函数怎么去执行
生成器的相关操作呢? 此时生成器在这里就扮演了重要的角色。
回调函数需要执行生成器的相关操作,然而生成器对象只能在生成器外获取,那就在生成器外,把 推进操作 绑定到
一个对象上,这个对象是回调函数能都获取到的,而且可以使用 yield 将其抛到生成器外
因此,将回调函数就设置在生成器内,这个对象也在生成器内初始化,并通过 yield 抛到生成器外
这个对象就是 Future 对象,定义为当事件发生后才会使用到的对象。
self.__callbacks # 用于储存 推进生成器操作
self.result # 用于储存 需要 send 回生成器的数据
class Future:
def __init__(self):
self.result = None
self.__callbacks = []
def add_done_callbacks(self, func):
self.__callbacks.append(func)
def set_result(self, result):
self.result = result
for func in self.__callbacks:
func(self)
Task 对象
建立 Task 对象用于完成 生成器(协程) 的推进工作
class Task:
def __init__(self, gene):
self.generator = gene
fut = next(self.generator) # 预激生成器
fut.add_done_callbacks(self.step) # 绑定 推进方法
def step(self, fut):
#用来 推进 生成器的 执行
#当监听事件发生后, 相关数据就会填充到 future.result 中,并且回调这个方法 推进 生成器的执行
try:
next_fut = self.generator.send(fut.result)
if next_fut != fut:
next_fut.add_done_callbacks(self.step)
except StopIteration as e:
return
完成代码
服务端
from socket import *
import selectors
class Future:
def __init__(self):
self.result = None
self.__callbacks = []
def add_done_callbacks(self, func):
self.__callbacks.append(func)
def set_result(self, result):
self.result = result
for func in self.__callbacks:
func(self)
class Task:
def __init__(self, gene):
self.generator = gene
fut = next(self.generator) # 预激生成器
fut.add_done_callbacks(self.step) # 绑定 推进方法
def step(self, fut):
#用来 推进 生成器的 执行
#当监听事件发生后, 相关数据就会填充到 future.result 中,并且回调这个方法 推进 生成器的执行
try:
next_fut = self.generator.send(fut.result)
if next_fut != fut:
next_fut.add_done_callbacks(self.step)
except StopIteration as e:
return
def build_listen_socket(port):
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('127.0.0.1', port))
s.listen(5)
s.setblocking(False) # 设置socket的接口为非阻塞
return s
def handle_data_ready(conn):
def read(conn, mask):
data = conn.recv(1024) # Should be ready
future.set_result(data)
future = Future()
sel.register(conn, selectors.EVENT_READ, read)
while True:
data = yield future
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data.upper()) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
break
def handle_incoming(port):
s = build_listen_socket(port)
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
future.set_result((conn, addr))
future = Future()
sel.register(s, selectors.EVENT_READ, accept) # 注册监听socket 并绑定 回调函数 accept
while True:
conn, addr = yield future
print('accepted', conn, 'from', addr)
conn.setblocking(False)
Task(handle_data_ready(conn))
sel = selectors.DefaultSelector()
Task(handle_incoming(8080))
Task(handle_incoming(8081))
while True: # 事件循环
events = sel.select()
for key, mask in events: # 获取事件信息,执行回调
callback = key.data
callback(key.fileobj, mask)
客户端
import sys
from socket import *
port = 8080
if len(sys.argv) > 1:
port = int(sys.argv[1])
c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', port))
while True:
msg = input('>>: ')
if not msg: continue
c.send(msg.encode('utf-8'))
data = c.recv(1024)
print(data.decode('utf-8'))
yield from
通过 yield from
如下面两种方式,是等效的
不使用 yield from
def handle_incoming(port):
s = build_listen_socket(port)
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
future.set_result((conn, addr))
future = Future()
sel.register(s, selectors.EVENT_READ, accept) # 注册监听socket 并绑定 回调函数 accept
while True:
conn, addr = yield future
print('accepted', conn, 'from', addr)
conn.setblocking(False)
Task(handle_data_ready(conn))
使用 yield from
def handle_data_ready(conn):
def read(conn, mask):
data = conn.recv(1024) # Should be ready
future.set_result(data)
future = Future()
sel.register(conn, selectors.EVENT_READ, read)
def continue_listen():
while True:
data = yield future
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data.upper()) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
break
yield from continue_listen()