【网络组件】接受连接Acceptor

本节介绍接受连接Acceptor,并给出实现;

接受连接

(1)Acceptor用于接受连接,,是TcpServer的成员,生命周期由TcpServer控制;

(2)当Acceptor使用accept接受连接时,回调TcpServer的函数,来通知TcpServer创建TcpConnection;

(3)Acceptor的socket fd是一个listening,也就是是一个服务器的socket,当Epoller会回调Acceptor的_handleRead,在_handleRead中会使用accept接收新连接;

总体类图如下:

接受连接时序示意图如下:

AcceptorAcceptor的声明

class Event;class EventLoop;class Acceptor final{public: Acceptor(const Acceptor&) = delete; Acceptor& operator=(const Acceptor&) = delete; Acceptor(const InetAddress& listenAddr, EventLoop* loop); ~Acceptor(); void setNewConnectionCallback(const NewConnectionCallback& cb) {_newConnectionCallback = cb; }private: void _handleRead(); void _bind(); void _listen(); static const int backlog = 10; int _acceptfd; InetAddress _listenAddr; EventLoop* _loop; std::unique_ptr<Event> _acceptEvent; NewConnectionCallback _newConnectionCallback;};

说明几点:

(1)_handleRead()为accept时的回调函数用来执行接受新连接;_acceptfd为监听的描述符;

(2)_newConnectionCallback为TcpServer注册给Acceptor的回调函数,当Acceptor的_handleRead函数执行时,将回调_newConnectionCallback来创建TcpConnection;

Acceptor的实现

Acceptor::Acceptor(const InetAddress& listenAddr, EventLoop* loop) :_acceptfd(sockets::createNonBlockingSocket()),_listenAddr(listenAddr),_loop(loop),_acceptEvent(new Event(_acceptfd, _loop)){ LOG_TRACE << "accept fd: " << _acceptfd; sockets::setReuseAddr(_acceptfd); _bind(); _listen(); _acceptEvent->setReadCallback(std::bind(&Acceptor::_handleRead, this)); _acceptEvent->enableReading();}Acceptor::~Acceptor(){ _acceptEvent->disableAll(); _acceptEvent->remove(); ::close(_acceptfd);}void Acceptor::_handleRead(){ InetAddress clientAddr; int connfd = sockets::accept(_acceptfd, &clientAddr.address());if (connfd < 0)return; if (_newConnectionCallback){_newConnectionCallback(connfd, clientAddr);} else{::close(connfd);}}void Acceptor::_bind(){ sockets::bind(_acceptfd, _listenAddr.address());}void Acceptor::_listen(){ sockets::listen(_acceptfd, backlog);}说明几点:

(1)在构造函数中,创建非阻塞的socket的_acceptfd描述符,绑定_acceptfd到服务器地址,以及监听_acceptfd;对于sockets::createNonBlockingSocket()、sockets::bind、sockets::listen下文介绍;

(2)_handleRead()用来处理接受连接,sockets::accept接受一个新连接,然后执行TcpServer的回调_newConnectionCallback;

Acceptor建立连接函数

void TcpServer::_newConnection(int connfd, const InetAddress& peerAddr){ EventLoop* loop; if ( _loopPool){loop = _loopPool->getNextLoop();} else{loop = _loop;} LOG_INFO << "TcpServer::_newConnection [" << connfd << "] from " << peerAddr.hostNameString(); TcpConnectionPtr conn(new TcpConnection(connfd, loop, peerAddr, _serverAddr)); assert(_connectionMaps.find(connfd) == _connectionMaps.end()); _connectionMaps[connfd] = conn; conn->setMessageCallback(_messageCallback); conn->setConnectionCallback(_connectionCallback); conn->setCloseConnectionCallback(std::bind(&TcpServer::_removeConnection, this, std::placeholders::_1)); conn->connectEstablished();}

说明几点:

(1)TcpServer的详细介绍,请见下一个博客;

(2)在_newConnection中,主要是创建TcpConnection,并在最后conn->connectEstablished();让管理TcpConnection的IO线程来处理TcpConnection上的回调函数;

InetAddress相关的SocketOps.cc函数

void sockets::setNonBlockingFd(int fd){ int flags; if ((flags = fcntl(fd, F_GETFL, 0)) < 0){LOG_SYSERR << "fcntl system error: " << strError();} flags |= O_NONBLOCK; if ((flags = fcntl(fd, F_SETFL, flags)) < 0){LOG_SYSERR << "fcntl system error: " << strError();}}int sockets::createNonBlockingSocket(void){ int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP); if (sockfd < 0){LOG_SYSERR << "socket system error: " << strError();abort();} return sockfd;}void sockets::bind(int sockfd, const struct sockaddr_in& addr){ socklen_t len = sizeof(addr); if (::bind(sockfd, reinterpret_cast<const sockaddr*>(&addr), len) < 0) //can not use C++ static_cast{LOG_SYSERR << "bind system error: " << strError();// abort();}}void sockets::listen(int listenfd, int backlog){ if (::listen(listenfd, backlog) < 0){LOG_SYSERR << "bind system error: " << strError();abort();}}int sockets::accept(int listenfd, struct sockaddr_in* clientAddr){ int connfd; socklen_t len;#if _GNU_SOURCE connfd = accept4(listenfd, reinterpret_cast<sockaddr*>(clientAddr), &len, SOCK_NONBLOCK | SOCK_CLOEXEC);#else confd = accept(listenfd, reinterpret_cast<sockaddr*>(clientAddr), &len); setNonBlockingFd(connfd);#endif if (connfd < 0){int savederrno = errno;switch (savederrno){case EAGAIN: //EWOULDBLOCK:case EINTR:case EBADF:case ECONNABORTED:errno = savederrno;LOG_SYSERR << "accept system expected error: " << strError();break;case ENOTSOCK:case EOPNOTSUPP:case ENOBUFS:case EINVAL:case EFAULT:LOG_SYSERR << "accept system unexpected error: " << strError();abort();break;default:LOG_SYSERR << "accept system unknown error: " << strError();abort();break;}} return connfd;}说明几点:

(1)accept函数中可以使用accept4一次性得到非阻塞性的连接描述符;

InetAddress旅游,放松心情,用眼睛享受风景。

【网络组件】接受连接Acceptor

相关文章:

你感兴趣的文章:

标签云: