半同步/半异步并发模式进程池实现

半同步/半异步并发模式:父进程监听到新的客户端连接请求后,以通信管道通知进程池中的某一子进程:“嘿,有新的客户连接来了,你去accept,然后处理下!”,从而避免在进程间传递文件描述符。这种模式中,一个客户连接上的所有任务始终有同一个进程来处理。

具体细节,尽在代码中:

process{public:process() : m_pid( -1 ){}public:pid_t m_pid; //子进程pidint m_pipefd[2];//子进程与父进程通信管道(父进程通过管道通知选定的子进程:“有新的连接到来,你去accept”)};//进程池类:将需要处理的逻辑任务封装为任务类,作为模板参数,以提高代码复用template< typename T >class processpool{private://构造函数,creat函数调用processpool( int listenfd, int process_number = 8 );public://给定服务器监听的socket,和进程数,创建子进程。(注:单例模式)。 为静态函数,因此可以直接以类名调用static processpool< T >* create( int listenfd, int process_number = 8 ){if( !m_instance )//确保只创建唯一的一个进程池实例{m_instance = new processpool< T >( listenfd, process_number );}return m_instance;}~processpool(){delete [] m_sub_process;}void run(); //启动进程池,在其中根据当前进程的标号来区分为父进程或子进程,并分别调用其run_***函数来处理逻辑任务private:void setup_sig_pipe();:MAX_PROCESS_NUMBER = USER_PER_PROCESS = MAX_EVENT_NUMBER = m_idx;m_listenfd; //监听socket:父进程与所有子进程的监听socket文件描述符指向内核中的同一文件表项int m_stop;//子进程是否要停止运行process* m_sub_process; //保存所有子进程的数组static processpool< T >* m_instance; //进程池静态实例:标识全局唯一的进程池};template< typename T >processpool< T >* processpool< T >::m_instance = NULL;sig_pipefd[setnonblocking( int fd ){int old_option = fcntl( fd, F_GETFL );int new_option = old_option | O_NONBLOCK;fcntl( fd, F_SETFL, new_option );return old_option;}addfd( int epollfd, int fd ){epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );setnonblocking( fd );//”只有当事件就绪时,非阻塞才是高校的“}removefd( int epollfd, int fd ){epoll_ctl( epollfd, EPOLL_CTL_DEL, fd, 0 );close( fd );}sig_handler( int sig ){int save_errno = errno;int msg = sig;send( sig_pipefd[1], ( char* )&msg, 1, 0 );errno = save_errno;}static void addsig( int sig, void( handler )(int), bool restart = true ){struct sigaction sa;memset( &sa, ‘\0’, sizeof( sa ) );sa.sa_handler = handler;if( restart ){sa.sa_flags |= SA_RESTART;}sigfillset( &sa.sa_mask );assert( sigaction( sig, &sa, NULL ) != -1 );}//进程池构造函数template< typename T >processpool< T >::processpool( int listenfd, int process_number ): m_listenfd( listenfd ), m_process_number( process_number ), m_idx( -1 ), m_stop( false ){assert( ( process_number > 0 ) && ( process_number <= MAX_PROCESS_NUMBER ) );m_sub_process = new process[ process_number ];assert( m_sub_process );for( int i = 0; i < process_number; ++i ){int ret = socketpair( PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd );assert( ret == 0 );m_sub_process[i].m_pid = fork();assert( m_sub_process[i].m_pid >= 0 );if( m_sub_process[i].m_pid > 0 )//为父进程{close( m_sub_process[i].m_pipefd[1] );continue;//父进程继续创建好所有的子进程后,才退出该函数}else{close( m_sub_process[i].m_pipefd[0] );//子进程m_idx = i;//初始化子进程在进程池中的编号(最小为0,,而父进程标号为-1)break;//子进程一旦创建好,就退出该函数,进入逻辑任务处理(pool->run)}}}//信号管道设置函数template< typename T >void processpool< T >::setup_sig_pipe(){m_epollfd = epoll_create( 5 );assert( m_epollfd != -1 );int ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );assert( ret != -1 );setnonblocking( sig_pipefd[1] );//信号处理函数向写端写入消息addfd( m_epollfd, sig_pipefd[0] );//在while中监听读端addsig( SIGCHLD, sig_handler );//子进程退出会暂停addsig( SIGTERM, sig_handler );//终止进程,kill函数默认即发送该信号addsig( SIGINT, sig_handler );//键盘输入以终止该进程ctrl+Caddsig( SIGPIPE, SIG_IGN );//忽略往管道读端被关闭的管道写数据的信号}//进程池启动函数template< typename T >void processpool< T >::run(){if( m_idx != -1 ){run_child();//子进程return;//子进程退出后,记得return}run_parent();//父进程}//子进程逻辑函数template< typename T >void processpool< T >::run_child(){setup_sig_pipe();int pipefd = m_sub_process[m_idx].m_pipefd[ 1 ];//与父进程的通信管道addfd( m_epollfd, pipefd );epoll_event events[ MAX_EVENT_NUMBER ];T* users = new T [ USER_PER_PROCESS ];//一次性创建所有的客户端任务对象数组(也用到池的思想,当某个客户任务处理完后,不用释放该对象资源,而是放回池中再利用)assert( users );int number = 0;int ret = -1;//监听信号管道、通信管道、连接socket,处理逻辑任务while( ! m_stop ){number = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number < 0 ) && ( errno != EINTR ) ){printf( “epoll failure\n” );break;}for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) ) //为父进程的通信事件{int client = 0;ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );if( ( ( ret < 0 ) && ( errno != EAGAIN ) ) || ret == 0 ){continue;}else{struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int connfd = accept( m_listenfd, ( struct sockaddr* )&client_address, &client_addrlength );if ( connfd < 0 ){printf( “errno is: %d\n”, errno );continue;}addfd( m_epollfd, connfd );users[connfd].init( m_epollfd, connfd, client_address );}}else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )//为信号管道事件{int sig;char signals[1024];ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );if( ret <= 0 ){continue;}else{for( int i = 0; i < ret; ++i ){switch( signals[i] ){case SIGCHLD:{pid_t pid;int stat;while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ){continue;}break;}case SIGTERM:case SIGINT:{m_stop = true;break;}default:{break;}}}}}else if( events[i].events & EPOLLIN ) //为连接socket事件{users[sockfd].process();//客户任务对象的逻辑处理函数}else{continue;}}}delete [] users;users = NULL;close( pipefd );//关闭与父进程的通信管道的读端//close( m_listenfd );//”对象由那个函数创建,就由那个函数销毁“close( m_epollfd );}//父进程逻辑函数template< typename T >void processpool< T >::run_parent(){setup_sig_pipe();addfd( m_epollfd, m_listenfd );epoll_event events[ MAX_EVENT_NUMBER ];int sub_process_counter = 0;int new_conn = 1;int number = 0;int ret = -1;//监听listen socket、和信号管道while( ! m_stop ){number = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number < 0 ) && ( errno != EINTR ) ){printf( “epoll failure\n” );break;}for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;if( sockfd == m_listenfd ) //监听到新连接到来,论选出一个子进程,通知该子进程”嘿,有新的连接到了,你接受下!“{int i = sub_process_counter;do{if( m_sub_process[i].m_pid != -1 ){break;}i = (i+1)%m_process_number;}while( i != sub_process_counter );if( m_sub_process[i].m_pid == -1 )//所有子进程都都已经推出{m_stop = true;break;}sub_process_counter = (i+1)%m_process_number;//send( m_sub_process[sub_process_counter++].m_pipefd[0], ( char* )&new_conn, sizeof( new_conn ), 0 );send( m_sub_process[i].m_pipefd[0], ( char* )&new_conn, sizeof( new_conn ), 0 );//通知子进程printf( “send request to child %d\n”, i );//sub_process_counter %= m_process_number;}else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) ){int sig;char signals[1024];ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );if( ret <= 0 ){continue;}else{for( int i = 0; i < ret; ++i ){switch( signals[i] ){case SIGCHLD://子进程退出信号{pid_t pid;int stat;while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 ){for( int i = 0; i < m_process_number; ++i ){if( m_sub_process[i].m_pid == pid ){printf( “child %d join\n”, i );close( m_sub_process[i].m_pipefd[0] );m_sub_process[i].m_pid = -1; //设置已经推出的子进程的PID为-1}}}m_stop = true;for( int i = 0; i < m_process_number; ++i ){if( m_sub_process[i].m_pid != -1 ) //只要还有一个子进程没有退出,则父进程继续{m_stop = false;}}break;}case SIGTERM:case SIGINT:{printf( “kill all the clild now\n” );for( int i = 0; i < m_process_number; ++i ){int pid = m_sub_process[i].m_pid;if( pid != -1 ){kill( pid, SIGTERM );}}break;}default:{break;}}}}}else{continue;}}}//close( m_listenfd );close( m_epollfd );}#endif

带着感恩的心启程,学会爱,爱父母,爱自己,爱朋友,爱他人。

半同步/半异步并发模式进程池实现

相关文章:

你感兴趣的文章:

标签云: