博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python-高级编程-05-异步IO
阅读量:5296 次
发布时间:2019-06-14

本文共 9151 字,大约阅读时间需要 30 分钟。

 【异步非阻塞IO】

------------------------------------------------------------------------------------------------------------------- 

小明和小强是笔友,他们通过有邮件的方式联系,小明发一封,小强回一封

邮差有点时候天气好,早上发出的信件,晚上就能收的到,然后有的时候遇到意外,

可能好几天都不能收到,小强就在邮箱前面等啊等,一直等到天荒地老

 cont =1

mailbox = xxxxx

while 1:

  mail = mailbox.cleck()

  #look mail

  sleep(cont)

但是这样有个问题

cont的大小 如果小了就非常占用cpu 设置太大就效率低

我们能如果节约呢?

小强在邮箱上放了个旗子,让邮递员放好了信就把上面的旗子立起来,然后小强知道了 就去拿信

这样小强就能在等信的时候做其他的事情了。

这就是异步io的思想。

 

【select pool   epoll 】

--------------------------------------------------------------------------------------------------------------- 

随时小强的人脉越来越广,他就搞了很多邮箱,一个邮箱对应一个人,然后上面都放好旗子

select 就相当于循环的检查邮箱上面的旗子,一有旗子立起来的话就通知小强,但是他的限制只有1024个(这里的1024 在操作系统里面值得并不是同时打开文件描述符的个数,而是号,当文件描述符超过了这个号,即便是前头的都关闭了,还是不能够增加新的)

pool 跟select的区别就是去掉了1024的限制,但是如果连接增多那么他还是会面的很慢。因为没次循环都要对所有邮箱进行检查。

[epoll] -- epoll 是linux内核的可扩展I/O事件通知机制,特点就是让需要大量操作文件描述符的程序得以 更优异的性能。

select和poll的时间复杂度是O(n) 而epoll 是O(1)

这里的e 值得event 事件

说白了就是 给每个邮箱编号然后改造成电子邮箱,一但有邮件过来了,立马会在手机app上面显示 xxx号邮箱 有邮件啦!!!

 【epoll的ET和LT】

--------------------------------------------------------------------------------------------------------------- 

ET:边缘触发  来了信,手机上面只响一下

LT:水平触发 来了信,手机一直响,一直到你打开app处理

ET在处理的时候,如果处理方式不够谨慎 ,很容易造成消息丢失

epoll 默认是 LT

######################################################################

 

#!/usr/bin/env python#coding:utf-8####################from:Rico.xia    ##time:2017-08-19  ####################import socketimport timeimport selectimport pdb#############################################class STATE:    def __init__(self):        self.state = 'accept'        self.have_read = 0        self.need_read = 5        self.have_write = 0        self.need_write = 5        self.buff_write = ""        self.buff_read = ""        self.sock_obj = ""##############################################class nbNetBase:    def setFd(self,sock):        #实例化STATE() 并初始化参数        tmp_state = STATE()        #将socket对象存到实例化的对象的字段中        tmp_state.sock_obj = sock        #获取这个socket的文件描述符,并作为key,value 为实例化的状态(STATE)对象。        self.conn_state[sock.fileno()]=tmp_state    def accept(self,fd):        #dbpPrint("\n --accept start")        #获取sock对象        sock = self.conn_state[fd].sock_obj        #获取客户端的sock对象,和ip        conn,addr  = sock.accept()        #设置为非阻塞状态        conn.setblocking(0)        #返回        return conn    def close(self,fd):        print 'closeing'        try:            sock = self.conn_state[fd].sock_obj            sock.close()        finally:            self.epoll_sock.unregister(fd)            self.conn_state.pop(fd)        def read(self,fd):        print 'readng'        try:            #获取客户端的sock信息            sock_state = self.conn_state[fd]            conn  = sock_state.sock_obj            #防止宇宙射线导致的字符反转            if sock_state.need_read <=0:                raise socket.error            #第一读取,读取之前定义好的需要读取的字节数            one_read = conn.recv(sock_state.need_read)            #如果读取的信息为0就抛出异常            if len(one_read) == 0:                raise socket.error            #判断开头是不是回车 在telnet里面敲击一次回车会发送数据如果没有输入就是 \r\n            if one_read[0:2] == "\r\n":                one_read = one_read[2:]            #buff里面缓存住读取内容            sock_state.buff_read += one_read            #已经读了的加上去读取的字节数            sock_state.have_read += len(one_read)            #需要读取的剪掉读取的字节数            sock_state.need_read -= len(one_read)            #sock_state.printState()            #这里如果我们已经读了5个字节的头部之后 我们需要去读取需要出的处理的数据            #我们的协议是 00003abc -->00003cbc 我们通过头部知道需要读多少处理数据            if sock_state.have_read == 5:                header_said_need_read = int(sock_state.buff_read)                if header_said_need_read <= 0:                    raise socket.error                sock_state.need_read += header_said_need_read                sock_state.buff_read = ""                #如果满足条件 返回 readcontent  取阅读内容                return "readcontent"            elif sock_state.need_read == 0:                #如果需要读取的已经读完了 那么我们处理数据                return "process"            else:                return "readmore"        except (socket.error,ValueError),msg:            try:                if msg.error == 11:                    return "retry"            except:                pass            return "closing"    def write(self,fd):        #跟read方法类似        sock_state = self.conn_state[fd]        conn = sock_state.sock_obj        last_have_send = sock_state.have_write        try:            have_send  = conn.send(sock_state.buff_write[last_have_send:])            sock_state.have_write += have_send            sock_state.need_write -= have_send            if sock_state.need_write == 0 and sock_state.have_write != 0:                return "writecomplete"            else:                return "writemore"        except socket.error,msg:            return "closing"    def run(self):        while True:            # poll()返回的epoll_list就是有事件发生的fd的list            # 需要在循环中按照event的类型分别处理,一般分为以下几种类型            #  EPOLLIN :表示对应的文件描述符可以读;            #  EPOLLOUT:表示对应的文件描述符可以写;            #  EPOLLPRI:表示对应的文件描述符有紧急的数据可读;一般不需要特殊处理            #  EPOLLERR:表示对应的文件描述符发生错误;后面这两种需要关闭socket            #  EPOLLHUP:表示对应的文件描述符被挂断            #我们这里出现 EPOLLERR 和 EPOLLERR 情况将改变成'closing' 然后将fd扔到状态机中            epoll_list = self.epoll_sock.poll()            print epoll_list            for fd,events in epoll_list:                #dbgPrint("\n --epoll return fd:%d.event:%s"%(fd,events))                sock_state = self.conn_state[fd]                if select.EPOLLHUP & events:                    sock_state.state='closing'                elif select.EPOLLERR & events:                    sock_state.state = 'closing'                self.state_machine(fd)    def state_machine(self,fd):        #dbgPrint('\n - state machine:fd:%d,status:%s'%(fd,self.conn_state[fd].state))        print 'machine is use'        ##根据连接状态做对应处理        #获取状态对象        sock_state =self.conn_state[fd]        #根据字典sm里的处理方法处理其对应的状态        self.sm[sock_state.state](fd)class nbNet(nbNetBase):    def __init__(self,addr,port,logic):        dbgPrint("\n __init__:start !")        #定义一个空字典        self.conn_state = {}        # 初始化监听socket socket.AF_INET指的是以太网 socket.SOCK_STREAM指的是TCP        self.listen_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)        #设置socket 开启SO_REUSEADDR,这样当监听端口处于各种xxx_WAIT的状态的时候 也可以listen、bind        self.listen_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)        #绑定端口        self.listen_sock.bind((addr,port))        # 指定backlog数        self.listen_sock.listen(10)        ##################################        #设置文件描述符的相关信息        self.setFd(self.listen_sock)        #实例化epool对象        self.epoll_sock = select.epoll()        #将文件描述符传给epoll 并告诉它只只关注EPOLLIN,即connect过来的连接        self.epoll_sock.register(self.listen_sock.fileno(),select.EPOLLIN)        #初始化方法数据        self.logic = logic        #初始化逻辑处理数据,告诉状态机如果程序在某一状态应该用什么方法处理        self.sm = {"accept":self.accept2read,                   'read':self.read2process,                   "write":self.write2read,                   "process":self.process,                   'closing':self.close}    def accept2read(self,fd):        #获取客户端socket对象        conn = self.accept(fd)        #将客户端也注册一次epoll,监听EPOLLIN状态,也就当客户端有数据的时候        self.epoll_sock.register(conn.fileno(),select.EPOLLIN)        #注册客户端fd        self.setFd(conn)        #初始化设置为状态为read        self.conn_state[conn.fileno()].state = 'read'    def read2process(self,fd):        read_ret = ""        try:            #去读取            read_ret = self.read(fd)        except(Exception),msg:            #dbgPrint(msg)            read_ret = 'closing'        #我们已经读取头部了根据read方法的返回做各种处理        if read_ret == 'process':            self.process(fd)        elif read_ret == "readcontent":pass        elif read_ret == 'readmore':pass        elif read_ret == 'retry':pass        elif read_ret == 'closing':            self.conn_state[fd].state = 'closing'            self.state_machine(fd)        else:            raise Exception('impossible state returned by self.read')    def process(self,fd):        #获取socket 对象        sock_state = self.conn_state[fd]        #通过传入的logic方法得到要返回给客户端的值        response = self.logic(sock_state.buff_read)        sock_state.buff_write = "%05d%s" %(len(response),response)        sock_state.need_write = len(sock_state.buff_write)        sock_state.state = 'write'        self.epoll_sock.modify(fd,select.EPOLLOUT)    def write2read(self,fd):        try:            write_ret = self.write(fd)        except socket.error,msg:            write_ret = 'closing'        if write_ret == 'writemore':            pass        elif write_ret == 'writecomplete':            sock_state = self.conn_state[fd]            conn = sock_state.sock_obj            self.setFd(conn)            self.conn_state[fd].state = 'read'            self.epoll_sock.modify(fd,select.EPOLLIN)        elif write_ret == 'cldsing':            self.conn_state[fd].state = 'closing'            self.state_machine(fd)if __name__ == '__main__':    def logic(d_in):        return (d_in[::-1])    #将参数传到nbNet类并且实例化,监听本地 6789 端口    reverseD = nbNet('0.0.0.0',6789,logic)    #执行run函数    reverseD.run()

  

转载于:https://www.cnblogs.com/nerdlerss/p/7359584.html

你可能感兴趣的文章
Leetcode 268 Missing Number
查看>>
辅导日
查看>>
vue 组件小例子 this.$parent
查看>>
00-自测1. 打印沙漏
查看>>
Spring Boot 项目实战(二)集成 Logback
查看>>
Thread(线程)四
查看>>
使用Jmeter自带的 Http 代理服务器录制脚本
查看>>
UNITY在VS中调试
查看>>
福建省第八届 Triangles
查看>>
P1182 数列分段`Section II` P1316 丢瓶盖 二分答案
查看>>
更新下载库update绝对详解
查看>>
SDUTOJ3754_黑白棋(纯模拟)
查看>>
Scala入门(1)Linux下Scala(2.12.1)安装
查看>>
laravel
查看>>
installing the matplotlib via pip in the enviroment dos
查看>>
bzoj3312: [Usaco2013 Nov]No Change
查看>>
如何改善下面的代码 领导说了很耗资源
查看>>
Quartus II 中常见Warning 原因及解决方法
查看>>
数据库系统之1.使用gridview动态增加行的实现
查看>>
C# TimeSpan 计算时间差(时间间隔)
查看>>