多路复用是在单线程下的一种高效io模型, 多路指的是多个io操作,复用 指的是 “复用一个线程"。
常见的 IO 模型
常见的 三种 io模型 有 阻塞io,非阻塞io, 多路复用
以 io 操作 read 举例,参与操作的对象有两个,调用操作的线程 和 系统内核, 操作分为两个阶段:
1、等待数据准备 (wait)
2、将数据从系统内核拷贝到用户空间 (copy)
阻塞式 IO
这是最常见的一种io模型,一个io操作完成才能够进行下一个io操作
非阻塞式 IO
可以让多个io操作进入 wait 状态,然后线程不断循环检查是否有io操作已准备好数据,一旦有,则系统会发出 error,
通过捕获这个 error, 对已准备好数据的 io 进行 copy 操作,完成后再次进入 轮询检查。 所以,虽然等待的过程
是非阻塞的,但 copy 的过程是阻塞。 虽然实现了”非阻塞“,但是,主动轮询会占用cpu大量的时间,因此这个模型是
不被推荐的。 以下是实现代码
# 服务端
from socket import *
import time
s = socket(AF_INET, SOCK_STREAM)
s.bind(('127.0.0.1', 8080))
s.listen(5)
s.setblocking(False) # 设置socket的接口为非阻塞
conn_l = []
del_l = []
while True:
try:
conn, addr = s.accept()
print("append:conn")
conn_l.append(conn)
except BlockingIOError:
if conn_l:
print(f"conn_l, num:{len(conn_l)}")
for conn in conn_l:
try:
data = conn.recv(1024)
if not data:
del_l.append(conn)
continue
conn.send(data.upper())
except BlockingIOError:
pass
except ConnectionResetError:
del_l.append(conn)
for conn in del_l:
conn_l.remove(conn)
conn.close()
del_l = []
# 客户端
from socket import *
c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', 8080))
while True:
msg = input('>>: ')
if not msg: continue
c.send(msg.encode('utf-8'))
data = c.recv(1024)
print(data.decode('utf-8'))
多路复用
基于 非阻塞io,多路复用解决了 非阻塞io对 cpu 高占用的问题, 使用"操作系统的事件驱动"来代替程序的主动轮询,
即让操作系统 来通知我们 数据准备好了, 不用主动去询问。基于此的实现有 select, poll, epoll,
其中,效率最高的是 epoll, 但在 windows 不能使用,可在 linux 上使用。
select: 时间复杂度是 O(n), 只知道有 io 事件发生,具体哪个/哪几个需要通过轮询确认
poll: O(n), 和select类似,不一样的是使用 链表 代替了 set, 没有最大数量限制
epoll: O(1), 精确知道是触发了哪个事件
select 引用例子
# 服务端
from socket import *
import select
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('127.0.0.1', 8081))
s.listen(5)
s.setblocking(False) # 设置socket的接口为非阻塞
read_l = [s, ]
while True:
print(f"registered socket:{len(read_l)}")
r_l, w_l, x_l = select.select(read_l, [], []) # 当没有事件发生时,会阻塞在这里,有事件则返回
print(f"ready socket:{len(r_l)}")
for ready_obj in r_l:
if ready_obj == s: # 有新连接 接入
conn, addr = ready_obj.accept() # 把 新连接 添加到 read_l, 在下一个循环注册到 select
read_l.append(conn)
else: # 有连接 准备好了数据
try:
data = ready_obj.recv(1024) # 读取数据
if not data:
ready_obj.close()
read_l.remove(ready_obj)
continue
ready_obj.send(data.upper())
except ConnectionResetError:
ready_obj.close()
read_l.remove(ready_obj)
# 客户端
from socket import *
c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', 8081)) # 请求连接
while True:
msg = input('>>: ') # 此处输入操作,服务端的新连接将会因此等待数据
if not msg: continue
c.send(msg.encode('utf-8'))
data = c.recv(1024)
print(data.decode('utf-8'))
上面的例子中,服务端只建立了 1 个socket, 让 select 代理监听 “发生新连接”事件, 当有新连接发生时,
执行 accept 得到 获取的数据的 conn, 然后再将其交给 select 代理监听 "数据准备完成" 事件, 当发生该
事件时,再 使用 conn 接收读取数据。
这个例子很简单,只有 1 个 监听 "新连接接入" 的 socket 和 一些需要 监听 "数据准备完成" socket,
select 捕获到的事件中,可能是其中任一个 socket, 可以很简单地通过 ready_j == s 判断是哪种 socket,
从而决定进行哪种操作,然而,在实际开发中,需要监听的 socket数量,事件类型,操作类型 都比这要复杂的多,
因此为了缩减代码,引入了 回调机制,即在把 socket 交给 select 代理前,将 socket 需要监听的事件和发生事件
后需要执行的操作都绑定好,绑定好的操作就叫做 “回调函数”, 这样当 select 捕获到事件时,就可以直接执行对应
的回调函数。
这个功能 python 已经有现成的库,selectors --- 高级 I/O 复用库
应用 selectors, 加入回调机制
可以将服务端改造成这样
# 服务端
from socket import *
import selectors
def build_listen_socket(port):
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('127.0.0.1', port))
s.listen(5)
s.setblocking(False) # 设置socket的接口为非阻塞
return s
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data.upper()) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
# 这里建立 两个 监听 socket
s1 = build_listen_socket(8080)
s2 = build_listen_socket(8081)
sel = selectors.DefaultSelector()
sel.register(s1, selectors.EVENT_READ, accept) # 注册监听socket 并绑定 回调函数 accept
sel.register(s2, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events: # 获取事件信息,执行回调
callback = key.data
callback(key.fileobj, mask)
参考: