Python协程原理-IO多路复用

    多路复用是在单线程下的一种高效io模型,  多路指的是多个io操作,复用  指的是  “复用一个线程"。

常见的  IO  模型

        常见的  三种  io模型  有  阻塞io非阻塞io,  多路复用

        以  io  操作  read  举例,参与操作的对象有两个,调用操作的线程  和  系统内核,  操作分为两个阶段:
                1、等待数据准备  (wait)
                2、将数据从系统内核拷贝到用户空间  (copy)

阻塞式  IO

        blocking_io.png
        这是最常见的一种io模型,一个io操作完成才能够进行下一个io操作

非阻塞式  IO

        nonblocking_io.png
        可以让多个io操作进入  wait  状态,然后线程不断循环检查是否有io操作已准备好数据,一旦有,则系统会发出  error,
        通过捕获这个  error,  对已准备好数据的  io  进行  copy  操作,完成后再次进入  轮询检查。  所以,虽然等待的过程
        是非阻塞的,但  copy  的过程是阻塞。  虽然实现了”非阻塞“,但是,主动轮询会占用cpu大量的时间,因此这个模型是
        不被推荐的。  以下是实现代码

# 服务端
from socket import *
import time

s = socket(AF_INET, SOCK_STREAM)
s.bind(('127.0.0.1', 8080))
s.listen(5)
s.setblocking(False)  # 设置socket的接口为非阻塞
conn_l = []
del_l = []
while True:
    try:
        conn, addr = s.accept()
        print("append:conn")
        conn_l.append(conn)
    except BlockingIOError:
        if conn_l:
            print(f"conn_l, num:{len(conn_l)}")
        for conn in conn_l:
            try:
                data = conn.recv(1024)
                if not data:
                    del_l.append(conn)
                    continue
                conn.send(data.upper())
            except BlockingIOError:
                pass
            except ConnectionResetError:
                del_l.append(conn)

        for conn in del_l:
            conn_l.remove(conn)
            conn.close()
        del_l = []

# 客户端
from socket import *

c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', 8080))

while True:
    msg = input('>>: ')
    if not msg: continue
    c.send(msg.encode('utf-8'))
    data = c.recv(1024)
    print(data.decode('utf-8'))

多路复用

        multi_io.png
        基于  非阻塞io,多路复用解决了  非阻塞io对  cpu  高占用的问题,  使用"操作系统的事件驱动"来代替程序的主动轮询,
        即让操作系统  来通知我们  数据准备好了,  不用主动去询问。基于此的实现有  select,  poll,  epoll,
        其中,效率最高的是  epoll,  但在  windows  不能使用,可在  linux  上使用。

        select:  时间复杂度是  O(n),  只知道有  io  事件发生,具体哪个/哪几个需要通过轮询确认
        poll:  O(n),  和select类似,不一样的是使用  链表  代替了  set,  没有最大数量限制
        epoll:  O(1),  精确知道是触发了哪个事件

select  引用例子

# 服务端
from socket import *
import select

s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind(('127.0.0.1', 8081))
s.listen(5)
s.setblocking(False)  # 设置socket的接口为非阻塞
read_l = [s, ]
while True:
    print(f"registered socket:{len(read_l)}")
    r_l, w_l, x_l = select.select(read_l, [], [])  # 当没有事件发生时,会阻塞在这里,有事件则返回
    print(f"ready socket:{len(r_l)}")
    for ready_obj in r_l:
        if ready_obj == s: # 有新连接 接入
            conn, addr = ready_obj.accept()  # 把 新连接 添加到 read_l, 在下一个循环注册到 select
            read_l.append(conn)
        else: # 有连接 准备好了数据
            try:
                data = ready_obj.recv(1024)  # 读取数据
                if not data:
                    ready_obj.close()
                    read_l.remove(ready_obj)
                    continue
                ready_obj.send(data.upper())
            except ConnectionResetError:
                ready_obj.close()
                read_l.remove(ready_obj)


# 客户端
from socket import *

c = socket(AF_INET, SOCK_STREAM)
c.connect(('127.0.0.1', 8081))  # 请求连接

while True:
    msg = input('>>: ')  # 此处输入操作,服务端的新连接将会因此等待数据
    if not msg: continue
    c.send(msg.encode('utf-8'))
    data = c.recv(1024)
    print(data.decode('utf-8'))

        上面的例子中,服务端只建立了  1  个socket,  让  select  代理监听  “发生新连接”事件,  当有新连接发生时,
        执行  accept  得到  获取的数据的  conn,  然后再将其交给  select  代理监听  "数据准备完成"  事件,  当发生该
        事件时,再  使用  conn  接收读取数据。

        这个例子很简单,只有  1  个  监听  "新连接接入"  的  socket  和  一些需要  监听  "数据准备完成"  socket,
        select  捕获到的事件中,可能是其中任一个  socket,  可以很简单地通过  ready_j  ==  s  判断是哪种  socket,
        从而决定进行哪种操作,然而,在实际开发中,需要监听的  socket数量,事件类型,操作类型  都比这要复杂的多,
        因此为了缩减代码,引入了  回调机制,即在把  socket  交给  select  代理前,将  socket  需要监听的事件和发生事件
        后需要执行的操作都绑定好,绑定好的操作就叫做  “回调函数”,  这样当  select  捕获到事件时,就可以直接执行对应
        的回调函数。

        这个功能  python  已经有现成的库,selectors  ---  高级  I/O  复用库

应用  selectors,  加入回调机制

可以将服务端改造成这样

# 服务端
from socket import *
import selectors


def build_listen_socket(port):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind(('127.0.0.1', port))
    s.listen(5)
    s.setblocking(False)  # 设置socket的接口为非阻塞
    return s


def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)


def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data.upper())  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()


# 这里建立 两个 监听 socket
s1 = build_listen_socket(8080)
s2 = build_listen_socket(8081)

sel = selectors.DefaultSelector()

sel.register(s1, selectors.EVENT_READ, accept)  # 注册监听socket 并绑定 回调函数 accept
sel.register(s2, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:  # 获取事件信息,执行回调
        callback = key.data
        callback(key.fileobj, mask)

参考:

  IO多路复用
  官方文档-selectors  ---  高级  I/O  复用库
  深入理解Python异步编程(上)