IO模型及多路复用

BIO:每个和客户端连接,用的同步阻塞IO fd+多线程,缺点:线程多了

问的最多的,阻塞IO和IO多路复用

Non-blocking IO:循环,效率底下

IO多路复用:OS帮你盯着

阻塞:2个阶段全程阻塞

IO多路复用:第一阶段交给操作系统由他处理,第二阶段调函数阻塞一会,数据从内核空间到用户空间,然后返回结果

fd:文件描述符从0开始,0:标准输入,1:标准输出,2:标准错误输出

select(fds, 1024个位 read, 1024写 标志位)

100个,其中1路IO好了,然后遍历所有fds

for fd in fds:

   判断fd是否好了

bitmap 位图

poll

使用数组 fds,每一个fd结构题,包含fd、读、写

for fd in fds:

   判断fd对应的结构体里面的读写标志是否ok

epoll

import datetime
import selectors
import socket
import logging
import threading

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")

class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999):
        self.socket = socket.socket()
        self.socket.setblocking(False)
        self.addr = ip,port
        self.event = threading.Event()
    def start(self):
        #监听的socket绑定地址和端口
        self.socket.bind(self.addr)
        #开始监听
        self.socket.listen()
        #生成selector对象,根据当前系统生成最优的模式
        self.selector = selectors.DefaultSelector()
        #将socket注册到selector对象中,返回的是selectorkey对象,key记录了fileobj(就是self.socket),fileobj的fd,events(读或写),data
        self.selector.register(self.socket ,selectors.EVENT_READ,self.accept)
        #开始循环
        while True:
            #监听注册的对象的事件,发生被关注事件则返回events(就是接收到数据就调用并返回events)
            event = self.selector.select()
            logging.info("event = {}".format(event))
            #表示那个关注的对象的某个事件发生了
            for key,mask in event:
                key.data(key.fileobj)
    #socket的accpet方法
    def accept(self,conn):
        #3次握手成功,生成新的通信用的socket对象和客户端连接地址和端口信息
        newsock ,client = conn.accept()
        #设置非阻塞
        newsock.setblocking(False)
        logging.info("{}".format(newsock))
        #将新生成的通信用的socket对象注册
        self.selector.register(newsock,selectors.EVENT_READ,self.recv)
    #socket收数据
    def recv(self,conn):
        data = conn.recv(1024)
        #判断当前客户端是否退出
        if data == b'' or data == b'quit':
            #取消注册
            self.selector.unregister(conn)
            logging.info("{} is quit".format(conn.getpeername()))
            conn.close()
            return
        msg = "{:%Y/%m/%d %H:%M:%S} {}:{} data = {}".format(datetime.datetime.now(),*conn.getpeername(),data)
        logging.info(msg)
        #查看当前selector的map,
        for map in self.selector.get_map().values():
            print(map)
            #判断map的函数是否是recv,如果是则将消息发送到所有的通信socket
            if map.data == self.recv:
                logging.info('into into into')
                map.fileobj.send(msg.encode())
    def stop(self):
        self.event.set()
        self.selector.close()

if __name__ == '__main__':
    ct = ChatServer()
    ct.start()

重要概念

同步、异步

函数或方法被调用的时候,调用者是否得到最终结果的。

直接得到最终结果结果的,就是同步调用;

不直接得到最终结果的,就是异步调用。

阻塞、非阻塞

函数或方法调用的时候,是否立刻返回。

立即返回就是非阻塞调用;

不立即返回就是阻塞调用。

区别

同步、异步,与阻塞、非阻塞不相关。

同步、异步强调的是,是否得到(最终的)结果;

阻塞、非阻塞强调是时间,是否等待。

同步与异步区别在于:调用者是否得到了想要的最终结果。

同步就是一直要执行到返回最终结果;

异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,以后可以通过被

调用者提供的某种方式(被调用者通知调用者、调用者反复查询、回调),来取回最终结果。

阻塞与非阻塞的区别在于,调用者是否还能干其他事。

阻塞,调用者就只能干等;

非阻塞,调用者可以先去忙会别的,不用一直等。

联系

同步阻塞,我啥事不干,就等你打饭打给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。

同步非阻塞,我等着你打饭给我,饭没好,我不等,但是我无事可做,反复看饭好了没有。打饭是结果,但是我不一直等。

异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。例如,取了号什么不干就等叫自己的号。

异步非阻塞,我要打饭,你给我号,你说等叫号,并没有返回饭给我,我去看电视、玩手机,饭打好了叫我。

同步IO、异步IO、IO多路复用

IO两个阶段

IO过程分两阶段

  1. 数据准备阶段。从设备读取数据到内核空间的缓冲区
  2. 内核空间复制回用户空间进程缓冲区阶段
    系统调用——read函数、recv函数等
IO模型
同步IO

同步IO模型包括阻塞IO、非阻塞IO、IO多路复用

阻塞IO:进程等待(阻塞),直到读写完成(全程等待)。

image

非阻塞IO:

进程调用recvfrom操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用(可以轮询),如果内核已经准备好,就阻塞,然后复制数据到用户空间。

第一阶段数据没有准备好,可以先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。

第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。

淘米、蒸饭我不阻塞等,反复来询问,一直没有拿到饭。盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。

image

IO多路复用:

也称Event-driven IO。

所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力。

以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核“监视”select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。

select举例:

食堂供应很多菜(众多的IO fds),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要你等,好多人都在等菜,谁的菜先好不知道,你只好等待大师傅叫。有几样菜好了,大师傅叫大家,说菜有好的,你们得自己遍历找找看哪一样才好了,请服务员把做好的菜打给你。

image

信号驱动IO

进程在IO访问时,先通过sigaction系统调用,提交一个信号处理函数,立即返回。进程不阻塞。

当内核准备好数据后,产生一个SIGIO信号并投递给信号处理函数。可以在此函数中调用recvfrom函数操作数据从内核空间复制到用户空间,这段过程进程阻塞。

image

异步IO

进程发起异步IO请求,立即返回。内核完成IO的两个阶段,内核给进程发一个信号。

举例,来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。在整个过程中,进程都可以忙别的,等好了才过来。

举例,今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。

Linux的aio的系统调用,内核从版本2.6开始支持

image

image

前4个都是同步IO,因为核心操作recv函数调用时,进程阻塞直到拿到最终结果为止。

而异步IO进程全程不阻塞。

Python中的IO多路复用

  • IO多路复用
    • 大多数操作系统都支持select和poll,poll是对select的升级
    • Linux系统内核2.5+支持epoll
    • BSD、MAC支持kqueue
    • Solaris实现了/dev/poll
    • Windows的IOCP

Python的select库实现了select、poll系统调用,这个基本上操作系统都支持。对Linux内核2.5+支持了epoll。

开发中的选择

1、完全跨平台,使用select、poll。但是性能较差

2、针对不同操作系统自行选择支持的技术,这样做会提高IO处理的性能

假设当前进程监控的很多IO的文件描述符为fds

  • select
    • 使用读、写2个位图标记fd对应的读写是否就绪,这个位图限制为1024个
    • 每一个都需要遍历fds,效率低
  • poll
    • 使用数组保存结构体,没有了最大限制
    • 依然遍历fds查看谁就绪了,效率低
  • epoll
    • 内核空间与用户空间共享一段内存,减少数据的复制
    • 事件驱动,每次只返回就绪的fds

selector库

3.4版本提供selectors库,高级IO复用库。

类层次结构︰
BaseSelector
+-- SelectSelector   实现select
+-- PollSelector     实现poll
+-- EpollSelector     实现epoll
+-- DevpollSelector   实现devpoll
+-- KqueueSelector   实现kqueue

selectors.DefaultSelector返回当前平台最有效、性能最高的实现。

但是,由于没有实现Windows下的IOCP,所以,Windows下只能退化为select

# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
#   epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

事件注册

class SelectSelector(_BaseSelectorImpl):
    """Select-based selector."""
    def register(fileobj, events, data=None) -> SelectorKey: pass
  • 为selector注册一个文件对象,监视它的IO事件,返回selectKey对象
  • fileobj 被监视文件对象,例如socket对象
  • events事件,该文件对象必须等待的事件
  • data可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联方法。通过这个参数在关注的事件产生后让selector干什么事
Event常量 含义
EVENT_READ 可读0b01,内核已经准备好输入设备了,可以开始读了
EVENT_WAITE 可写ob10,内核已经准备好了,可以往里写了

selectors.SelectorKey 有4个属性

  1. fileobj 注册的文件对象
  2. fd文件描述符
  3. events等待上面的文件描述符的文件对象的事件
  4. data注册时关联的数据

练习:IO多路复用TCPServer群聊

import datetime
import selectors
import socket
import logging
import threading

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")

class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999):
        self.socket = socket.socket()
        self.socket.setblocking(False)
        self.addr = ip,port
        self.event = threading.Event()
    def start(self):
        #监听的socket绑定地址和端口
        self.socket.bind(self.addr)
        #开始监听
        self.socket.listen()
        #生成selector对象,根据当前系统生成最优的模式
        self.selector = selectors.DefaultSelector()
        #将socket注册到selector对象中,返回的是selectorkey对象,key记录了fileobj(就是self.socket),fileobj的fd,events(读或写),data
        self.selector.register(self.socket ,selectors.EVENT_READ,self.accept)
        #开始循环
        while True:
            #监听注册的对象的事件,发生被关注事件则返回events(就是接收到数据就调用并返回events)
            event = self.selector.select()
            logging.info("event = {}".format(event))
            #表示那个关注的对象的某个事件发生了
            for key,mask in event:
                key.data(key.fileobj)
    #socket的accpet方法
    def accept(self,conn):
        #3次握手成功,生成新的通信用的socket对象和客户端连接地址和端口信息
        newsock ,client = conn.accept()
        #设置非阻塞
        newsock.setblocking(False)
        logging.info("{}".format(newsock))
        #将新生成的通信用的socket对象注册
        self.selector.register(newsock,selectors.EVENT_READ,self.recv)
    #socket收数据
    def recv(self,conn):
        data = conn.recv(1024)
        #判断当前客户端是否退出
        if data == b'' or data == b'quit':
            #取消注册
            self.selector.unregister(conn)
            logging.info("{} is quit".format(conn.getpeername()))
            conn.close()
            return
        msg = "{:%Y/%m/%d %H:%M:%S} {}:{} data = {}".format(datetime.datetime.now(),*conn.getpeername(),data)
        logging.info(msg)
        #查看当前selector的map,
        for map in self.selector.get_map().values():
            print(map)
            #判断map的函数是否是recv,如果是则将消息发送到所有的通信socket
            if map.data == self.recv:
                logging.info('into into into')
                map.fileobj.send(msg.encode())
    def stop(self):
        self.event.set()
        self.selector.close()

if __name__ == '__main__':
    ct = ChatServer()
    ct.start()

实现HTTPServer

import threading
import selectors
import socket
import logging
import webob
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
html_content = """
<html>
<head><title></title></head>
<body>
   欢迎访问马哥教育
</body>
</html>
"""
class WebServer:
    def __init__(self, ip='0.0.0.0', port=80):
        self.addr = ip, port
        self.sock = socket.socket()
        self.sock.setblocking(False) # 非阻塞
        self.event = threading.Event()
        # 构建本系统最优Selector
        self.selector = selectors.DefaultSelector()
    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        key = self.selector.register(self.sock, selectors.EVENT_READ, 
self.accept)
        threading.Thread(target=self.select, name='select').start()
    def select(self):
        with self.selector:
            while not self.event.is_set():
                events = self.selector.select(1)# 超时返回[]
                # 监听注册的对象的事件,发生被关注事件则返回events
                print(events)
                for key, mask in events:
                    key.data(key.fileobj, mask)
    def accept(self, server:socket.socket, mask):
        conn, raddr = server.accept()
        conn.setblocking(False)
        logging.info("New client {} accepted. fd={}".format(raddr, 
conn.fileno()))
        key = self.selector.register(conn, selectors.EVENT_READ, self.recv)
    def recv(self, conn:socket.socket, mask):
        with conn: # 用完就断
            try:
                data = conn.recv(1024).strip()
                # 收到request报文,下面要做url映射等,此处都省略
                request = webob.Request.from_bytes(data)
                print(request.url)
                print('=' * 30)
                response = webob.Response(html_content, status=201)
                response.headers.add('Server', 'MageServer')
                firstline = 'HTTP/1.1 {}'.format(response.status)
                print(response.headerlist)
                headers = "\r\n".join(
                   [firstline] + ["{}: {}".format(k, v) for k, v inresponse.headerlist] + ['', '']
               ) # 响应头:第一行、头部字段、2个回车换行
                body = response.body
                print(type(headers), type(body))
                content = headers.encode() + body
                conn.send(content)
            finally:
                self.selector.unregister(conn)
    def stop(self):
        self.event.set()
if __name__ == '__main__':
    cs = WebServer()
    cs.start()
    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            break