【网络组件】事件和IO复用

本节研究事件和IO复用的实现,主要的研究是Event和Epoller,并给出C++实现;

线程模型

本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也就是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:

事件循环

(1)每一个IO线程一个EventLoop对象,并拥有它,其主要功能就是loop循环;

(2)Event对象始终负责一个fd的IO事件分发,但是它不拥有该fd,析构时也不会关闭这个fd,Event会根据时间类型的不同分发为不同的回调,如ReadCallback、WriteCallback等;回调对象为C++中的function表示;

(3)Epoller是IO复用(epoll机制)的实现,它是EventLoop对象的成员,只供EventLoop调用;

事件循环时序图如下:

事件操作

(1)_update主要包括两个操作,添加和更新事件;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_ADD,,EPOLL_CTL_MOD;

(2)_remove主要删除事件操作;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_DEL;

事件操作时序图如下:

EventEvent声明class EventLoop;class Event final{public: Event(const Event&) = delete; Event& operator=(const Event&) = delete; Event(int fd, EventLoop *loop); void setReadCallback(const ReadCallback& cb) {_readCallback = cb; } void setWriteCallback(const WriteCallback& cb) {_writeCallback = cb; } void setErrorCallback(const ErrorCallback& cb) {_errorCallback = cb; } void setCloseCallback(const CloseCallback& cb) {_closeCallback = cb; } bool isWriting() const {return _events & _cWriteEvent; } bool isReading() const {return _events & _cReadEvent; } void disableWriting() {_events &= ~_cWriteEvent;_update(); } void enableWriting() {_events |= _cWriteEvent;_update(); } void disableAll() {_events = 0;_update(); } void enableReading() {_events |= _cReadEvent;_update(); } int fd() const {return _fd; } uint32_t events() const {return _events; } void setRevents(uint32_t revents) {_revents = revents; } void handleEvent(); void remove(); private: void _update(); uint32_t _events; uint32_t _revents; int _fd; EventLoop* _loop; ReadCallback _readCallback; WriteCallback _writeCallback; ErrorCallback _errorCallback; CloseCallback _closeCallback; static uint32_t _cReadEvent; static uint32_t _cWriteEvent;};

说明几点

(1)_events为需要关心的事件类型,而_revents为epoll_wait返回后发生的事件类型,需要根据_revents进行相应的分发操作;

(2)具体的分发操作为 ReadCallback _readCallback、WriteCallback _writeCallback、ErrorCallback _errorCallback、CloseCallback _closeCallback;

Event实现uint32_t Event::_cReadEvent = EPOLLIN;uint32_t Event::_cWriteEvent = EPOLLOUT;Event::Event(int efd, EventLoop *loop):_events(0),//impotant_fd(efd),_loop(loop){}void Event::remove(){ _loop->removeEvent(this);}void Event::_update(){ _loop->updateEvent(this);}void Event::handleEvent(){if ((_revents & EPOLLHUP) && !(_revents & EPOLLIN)) //do not handle HUP about socket, we should handle before EPOLLIN,because if readCallback handclose ,the event will destructor, the _revents will random{LOG_TRACE << "_closeCallback, fd: " << fd();if (_closeCallback)_closeCallback();}if (_revents & (EPOLLIN | EPOLLRDHUP)){LOG_TRACE << "_readCallback, fd: " << fd();if (_readCallback)_readCallback(TimeStamp::now());} if (_revents & EPOLLOUT){LOG_TRACE << "_writeCallback, fd: " << fd();if (_writeCallback)_writeCallback();} if (_revents & EPOLLERR){LOG_TRACE << "_errorCallback, fd: " << fd();if (_errorCallback)_errorCallback();}}说明几点:

(1)handleEvent()就是根据_revents不同的事件类型进行相应不同的回调操作;

(2)_update()和remove即为相应的事件操作;

EpollerEpoller声明class Event;class EventLoop;class Epoller final{public: Epoller(const Epoller&) = delete; Epoller& operator=(const Epoller&) = delete; Epoller(EventLoop* loop);~Epoller(); void removeEvent(Event* event); void updateEvent(Event* event); void pollAndHandleEvent(int seconds);private: int _epollfd; EventLoop* _loop; std::vector<struct epoll_event> _activeEvents; std::map<int, Event*> _eventMap; static const int cMaxEventNumber = 1000;};}

说明几点:

(1)pollAndHandleEvent主要为调用epoll_wait和事件处理函数;

(2)removeEvent,updateEvent()分别为删除事件和更新事件;

(3)_eventMap为建立fd和Event建立的map,红黑树高效查找;_activeEvents为内核epoll_wait填写的活动事件列表;

躲在某一时间想念一段时光的掌纹,

【网络组件】事件和IO复用

相关文章:

你感兴趣的文章:

标签云: