muduo::Connector、TcpClient分析

Connector

Connector用来发起连接。在非阻塞网络中,主动发起连接比被动接收连接更为复杂,因为要考虑错误处理,还要考虑重试。 主要难点在于 1、socket是一次性的,一旦出错无法恢复;只能关闭重来。使用新的fd后,用新的channel。 2、错误代码与acce(2)不同。及时是socket可写,也不意味着已经成功建立连接,还需要用getsockopt(sockfd, SOL_SOCKET, SO_ERROR, ……)再次确认。 3、重试的间隔时间应该逐渐延长,直至back-off。重试使用了EventLoop::runAfter,防止Connector在定时器到时前析构,在Connector的析构函数中要注销定时器。 4、要防止自连接发生。

对于第2点,这里解释一下。非阻塞的socket调用connect后会立即返回,这时三次握手还在进行。这时可以用poll/epoll来检查socket。 当连接成功时,,socket变为可写。 当连接失败时,socket变为可读可写。 因此还需要再次确认一下。还有其他方法再次确认: 1、调用getpeername,如果调用失败,返回ENOTCONN,表示连接失败。 2、调用read,长度参数为0,如果read失败,表示connect失败。 3、再调用connect一次,其应该失败,如果错误是EISCONN,表示套接字已建立而且连接成功。

Connector.h

class Connector : boost::noncopyable,public boost::enable_shared_from_this<Connector>{ public: typedef boost::function<void (int sockfd)> NewConnectionCallback; Connector(EventLoop* loop, const InetAddress& serverAddr); ~Connector(); void setNewConnectionCallback(const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } stop(); // can be called in any thread const InetAddress& serverAddress() const { return serverAddr_; } private: enum States { kDisconnected, kConnecting, kConnected }; kMaxRetryDelayMs = kInitRetryDelayMs = 500;//初始化重试延迟 void setState(States s) { state_ = s; } void startInLoop(); void stopInLoop(); void connect(); void connecting(int sockfd); void handleWrite(); void handleError(); void retry(int sockfd); int removeAndResetChannel(); void resetChannel(); EventLoop* loop_;//所属的EventLoop InetAddress serverAddr_;//server地址 bool connect_; // atomic States state_; // FIXME: use atomic variable boost::scoped_ptr<Channel> channel_; NewConnectionCallback newConnectionCallback_; int retryDelayMs_;};

Connector.cc

Connector::Connector(EventLoop* loop, const InetAddress& serverAddr) : loop_(loop),serverAddr_(serverAddr),connect_(false),state_(kDisconnected),retryDelayMs_(kInitRetryDelayMs){ LOG_DEBUG << “ctor[” << this << “]”;}Connector::~Connector(){ LOG_DEBUG << “dtor[” << this << “]”; assert(!channel_);}void Connector::start(){ connect_ = true; loop_->runInLoop(boost::bind(&Connector::startInLoop, this)); // FIXME: unsafe}void Connector::startInLoop(){ loop_->assertInLoopThread(); assert(state_ == kDisconnected); if (connect_) {connect();//开始建立连接 } else {LOG_DEBUG << “do not connect”; }}void Connector::stop(){ connect_ = false; loop_->queueInLoop(boost::bind(&Connector::stopInLoop, this)); // FIXME: unsafe // FIXME: cancel timer}void Connector::stopInLoop(){ loop_->assertInLoopThread(); if (state_ == kConnecting) {setState(kDisconnected);int sockfd = removeAndResetChannel();retry(sockfd); }}void Connector::connect()//建立连接{ savedErrno = (ret == 0) ? 0 : errno; switch (savedErrno)//错误处理 {case 0:case EINPROGRESS:case EINTR:case EISCONN:connecting(sockfd);break;case EAGAIN:case EADDRINUSE:case EADDRNOTAVAIL:case ECONNREFUSED:case ENETUNREACH:retry(sockfd);break;case EACCES:case EPERM:case EAFNOSUPPORT:case EALREADY:case EBADF:case EFAULT:case ENOTSOCK:LOG_SYSERR << “connect error in Connector::startInLoop ” << savedErrno;sockets::close(sockfd);break;default:LOG_SYSERR << “Unexpected error in Connector::startInLoop ” << savedErrno;sockets::close(sockfd);// connectErrorCallback_();break; }}void Connector::restart()//重启{ loop_->assertInLoopThread(); setState(kDisconnected); retryDelayMs_ = kInitRetryDelayMs; connect_ = true; startInLoop();}void Connector::connecting(int sockfd){ setState(kConnecting); assert(!channel_);//这里设置channel。因为有了sockfd后才可以设置channel channel_.reset(new Channel(loop_, sockfd)); channel_->setWriteCallback(boost::bind(&Connector::handleWrite, this)); // FIXME: unsafe channel_->setErrorCallback(boost::bind(&Connector::handleError, channel_->enableWriting();}int Connector::removeAndResetChannel(){ channel_->disableAll(); channel_->remove(); int sockfd = channel_->fd(); // Can’t reset channel_ here, because we are inside Channel::handleEvent loop_->queueInLoop(boost::bind(&Connector::resetChannel, this)); // FIXME: unsafe return sockfd;}void Connector::resetChannel()//reset后channel_为空{ channel_.reset();}void Connector::handleWrite()//可写不一定表示已经建立连接{ LOG_TRACE << “Connector::handleWrite ” << state_; if (state_ == kConnecting) {(err){LOG_WARN << “Connector::handleWrite – SO_ERROR = “<< err << ” ” << strerror_tl(err);retry(sockfd);}else if (sockets::isSelfConnect(sockfd))//判断是否时自连接{LOG_WARN << “Connector::handleWrite – Self connect”;retry(sockfd);}else{setState(kConnected);//设置状态为已经连接if (connect_){newConnectionCallback_(sockfd);}else{sockets::close(sockfd);}} } else {// what happened?assert(state_ == kDisconnected); }}void Connector::handleError(){ LOG_ERROR << “Connector::handleError state=” << state_; if (state_ == kConnecting) {int sockfd = removeAndResetChannel();int err = sockets::getSocketError(sockfd);LOG_TRACE << “SO_ERROR = ” << err << ” ” << strerror_tl(err);retry(sockfd); }}void Connector::retry(int sockfd)//重新尝试连接{ sockets::close(sockfd); setState(kDisconnected); if (connect_) {LOG_INFO << “Connector::retry – Retry connecting to ” << serverAddr_.toIpPort()<< ” in ” << retryDelayMs_ << ” milliseconds. “;loop_->runAfter(retryDelayMs_/1000.0,boost::bind(&Connector::startInLoop, shared_from_this()));retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);//延迟加倍,但不超过最大延迟 } else {LOG_DEBUG << “do not connect”; }}TcpClient再长的路,一步步也能走完,再短的路,不迈开双脚也无法到达。

muduo::Connector、TcpClient分析

相关文章:

你感兴趣的文章:

标签云: