Linux 上实现双向进程间通信管道

级别: 中级 吴咏炜 ( 本文阐述了一个使用 socketpair 系统调用在 Linux 上实现双向进程通讯管道的方法,并提供了一个实现。 问题和常见方法 Linux 提供了 popen 和 pclose 函数(1) ,用于创建和关闭管道与另外一个进程进行通信。其接口如下: FILE *popen(const char *command, const char *mode); int pclose(FILE *stream);遗憾的是,popen 创建的管道只能是单向的 — mode 只能是 "r" 或 "w" 而不能是某种组合–用户只能选择要么往里写,要么从中读,而不能同时在一个管道中进行读写。实际应用中,经常会有同时进行读写的要求,比如,我们可能希望把文本数据送往 sort 工具排序后再取回结果。此时 popen 就无法用上了。我们需要寻找其它的解决方案。 有一种解决方案是使用 pipe 函数(2)创建两个单向管道。没有错误检测的代码示意如下: int pipe_in[2], pipe_out[2]; pid_t pid; pipe(&pipe_in); // 创建父进程中用于读取数据的管道 pipe(&pipe_out); // 创建父进程中用于写入数据的管道 if ( (pid = fork()) == 0) { // 子进程 close(pipe_in[0]); // 关闭父进程的读管道的子进程读端 close(pipe_out[1]); // 关闭父进程的写管道的子进程写端 dup2(pipe_in[1], STDOUT_FILENO);// 复制父进程的读管道到子进程的标准输出 dup2(pipe_out[0], STDIN_FILENO); // 复制父进程的写管道到子进程的标准输入 close(pipe_in[1]); // 关闭已复制的读管道 close(pipe_out[0]); // 关闭已复制的写管道 /* 使用exec执行命令 */ } else { // 父进程 close(pipe_in[1]); // 关闭读管道的写端 close(pipe_out[0]); // 关闭写管道的读端 /* 现在可向pipe_out[1]中写数据,并从pipe_in[0]中读结果 */ close(pipe_out[1]); // 关闭写管道 /* 读取pipe_in[0]中的剩余数据 */ close(pipe_in[0]); // 关闭读管道 /* 使用wait系列函数等待子进程退出并取得退出代码 */ }当然,这样的代码的可读性(特别是加上错误处理代码之后)比较差,也不容易封装成类似于 popen/pclose 的函数,方便高层代码使用。究其原因,是 pipe 函数返回的一对文件描述符只能从第一个中读、第二个中写(至少对于 Linux 是如此)。为了同时读写,就只能采取这么累赘的两个 pipe 调用、两个文件描述符的形式了。 一个更好的方案 使用pipe就只能如此了。不过,Linux 实现了一个源自 BSD 的 socketpair 调用 (3),可以实现上述在同一个文件描述符中进行读写的功能(该调用目前也是 POSIX 规范的一部分(4) )。该系统调用能创建一对已连接的(UNIX 族)无名 socket。在 Linux 中,完全可以把这一对 socket 当成 pipe 返回的文件描述符一样使用,唯一的区别就是这一对文件描述符中的任何一个都可读和可写。 这似乎可以是一个用来实现进程间通信管道的好方法。不过,要注意的是,为了解决我前面的提出的使用 sort 的应用问题,我们需要关闭子进程的标准输入通知子进程数据已经发送完毕,而后从子进程的标准输出中读取数据直到遇到 EOF。使用两个单向管道的话每个管道可以单独关闭,因而不存在任何问题;而在使用双向管道时,如果不关闭管道就无法通知对端数据已经发送完毕,但关闭了管道又无法从中读取结果数据。——这一问题不解决的话,使用 socketpair 的设想就变得毫无意义。 令人高兴的是,shutdown 调用 (5) 可解决此问题。毕竟 socketpair 产生的文件描述符是一对 socket,socket 上的标准操作都可以使用,其中也包括 shutdown。——利用 shutdown,可以实现一个半关闭操作,通知对端本进程不再发送数据,同时仍可以利用该文件描述符接收来自对端的数据。没有错误检测的代码示意如下: int fd[2]; pid_t pid; socketpair(AF_UNIX, SOCKET_STREAM, 0, fd); // 创建管道 if ( (pid = fork()) == 0) { // 子进程 close(fd[0]);// 关闭管道的父进程端 dup2(fd[1], STDOUT_FILENO); // 复制管道的子进程端到标准输出 dup2(fd[1], STDIN_FILENO); // 复制管道的子进程端到标准输入 close(fd[1]);// 关闭已复制的读管道 /* 使用exec执行命令 */ } else { // 父进程 close(fd[1]);// 关闭管道的子进程端 /* 现在可在fd[0]中读写数据 */ shutdown(fd[0], SHUT_WR);// 通知对端数据发送完毕 /* 读取剩余数据 */ close(fd[0]);// 关闭管道 /* 使用wait系列函数等待子进程退出并取得退出代码 */ }很清楚,这比使用两个单向管道的方案要简洁不少。我将在此基础上作进一步的封装和改进。 封装和实现 直接使用上面的方法,无论怎么看,至少也是丑陋和不方便的。程序的维护者想看到的是程序的逻辑,而不是完成一件任务的各种各样的繁琐细节。我们需要一个好的封装。 封装可以使用 C 或者 C++。此处,我按照 UNIX 的传统,提供一个类似于 POSIX 标准中 popen/pclose 函数调用的 C 封装,以保证最大程度的可用性。接口如下: FILE *dpopen(const char *command); int dpclose(FILE *stream); int dphalfclose(FILE *stream);关于接口,以下几点需要注意一下: – 与 pipe 函数类似,dpopen 返回的是文件结构的指针,而不是文件描述符。这意味着,我们可以直接使用 fprintf 之类的函数,文件缓冲区会缓存写入管道的数据(除非使用 setbuf 函数关闭文件缓冲区),要保证数据确实写入到管道中需要使用 fflush 函数。 – 由于 dpopen 返回的是可读写的管道,所以 popen 的第二个表示读/写的参数不再需要。 – 在双向管道中我们需要通知对端写数据已经结束,此项操作由dphalfclose函数来完成。 具体的实现请直接查看程序源代码,其中有详细的注释和 doxygen 文档注释(6) 。我只略作几点说明: – 本实现使用了一个链表来记录所有 dpopen 打开的文件指针和子进程 ID 的对应关系,因此,在同时用 dpopen 打开的管道的多的时候,dpclose(需要搜索链表)的速度会稍慢一点。我认为在通常使用过程中这不会产生什么问题。如果在某些特殊情况下这会是一个问题的话,可考虑更改 dpopen 的返回值类型和 dpclose 的传入参数类型(不太方便使用,但实现简单),或者使用哈希表/平衡树来代替目前使用的链表以加速查找(接口不变,但实现较复杂)。 – 当编译时在 gcc 中使用了 "-pthread" 命令行参数时,本实现会启用 POSIX 线程支持,使用互斥量保护对链表的访问。因此本实现可以安全地用于 POSIX 多线程环境之中。 – 与 popen 类似(7) ,dpopen 会在 fork 产生的子进程中关闭以前用 dpopen 打开的管道。 – 如果传给 dpclose 的参数不是以前用 dpopen 返回的非 NULL 值,当前实现除返回 -1 表示错误外,还会把 errno 设为 EBADF。对于 pclose 而言,这种情况在 POSIX 规范中被视为不确定(unspecified)行为 (8)。 – 实现中没有使用任何平台相关特性,以方便移植到其它 POSIX 平台上。 下面的代码展示了一个简单例子,将多行文本送到 sort 中,然后取回结果、显示出来: #include <stdio.h> #include <stdlib.h> #include "dpopen.h" #define MAXLINE 80 int main() { char line[MAXLINE]; FILE *fp; fp = dpopen("sort"); if (fp == NULL) { perror("dpopen error"); exit(1); } fprintf(fp, "orange/n"); fprintf(fp, "apple/n"); fprintf(fp, "pear/n"); if (dphalfclose(fp) < 0) { perror("dphalfclose error"); exit(1); } for (;;) { if (fgets(line, MAXLINE, fp) == NULL) break; fputs(line, stdout); } dpclose(fp); return 0; }输出结果为: apple orange pear总结 本文阐述了一个使用 socketpair 系统调用在 Linux 上实现双向进程通讯管道的方法,并提供了一个实现。该实现提供的接口与 POSIX 规范中的 popen/pclose 函数较为接近,因而非常易于使用。该实现没有使用平台相关的特性,因而可以不加修改或只进行少量修改即可移植到支持 socketpair 调用的 POSIX 系统中去。 参考资料 1. 相应的man (3)页。在线查看: 2. 相应的man (2)页。在线查看: 3. 相应的man (2)页。在线查看: 4. POSIX规范: 5. 相应的man (2)页。在线查看: 6. Doxygen主页: 7. POSIX规范: 8. POSIX规范: 关于作者 吴咏炜,目前在 Linux 上从事高性能入侵检测系统的研发。对于开发跨平台、高性能、可重用的 C++ 代码有着浓厚的兴趣。 可以跟他联系。全文出自 : IBM developerWorks 中国// -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-// vim:tabstop=4:shiftwidth=4:expandtab:/*** @file dpopen.h** Header file of a duplex pipe stream.** @version 1.3, 2004/07/26* @authorWu Yongwei**/#ifndef _DPOPEN_H#define _DPOPEN_H#include <stdio.h>#ifdef __cplusplusextern "C" {#endifFILE *dpopen(const char *command);int dpclose(FILE *stream);int dphalfclose(FILE *stream);#ifdef __cplusplus}#endif#endif /* _DPOPEN_H */// -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-// vim:tabstop=4:shiftwidth=4:expandtab:/*** @file dpopen.c** Implementation of a duplex pipe stream.** @version 1.10, 2004/08/31* @authorWu Yongwei**/#include <errno.h>#include <fcntl.h>#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <sys/types.h>#include <sys/socket.h>#include <sys/wait.h>#ifdef _REENTRANT#include <pthread.h>static pthread_mutex_t chain_mtx = PTHREAD_MUTEX_INITIALIZER;#endif#include "dpopen.h"/** Struct to store duplex-pipe-specific information. */struct dpipe_chain { FILE *stream; ///< Pointer to the duplex pipe stream pid_t pid; ///< Process ID of the command struct dpipe_chain *next; ///< Pointer to the next one in chain};/** Typedef to make the struct easier to use. */typedef struct dpipe_chain dpipe_t;/** Header of the chain of opened duplex pipe streams. */static dpipe_t *chain_hdr;/*** Initiates a duplex pipe stream from/to a process.** Like /e popen, all previously #dpopen’d pipe streams will be closed* in the child process.** @param command the command to execute on /c sh* @return a pointer to an open stream on successful* completion; /c NULL otherwise*/FILE *dpopen(const char *command){ int fd[2], parent, child; pid_t pid; FILE *stream; dpipe_t *chain; /* Create a duplex pipe using the BSD socketpair call */ if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd) < 0) return NULL; parent = fd[0]; child= fd[1]; /* Fork the process and check whether it is successful */ if ( (pid = fork()) < 0) { close(parent); close(child); return NULL; } if (pid == 0) { /* child */ /* Close the other end */ close(parent); /* Duplicate to stdin and stdout */ if (child != STDIN_FILENO) if (dup2(child, STDIN_FILENO) < 0) { close(child); return NULL; } if (child != STDOUT_FILENO) if (dup2(child, STDOUT_FILENO) < 0) { close(child); return NULL; } /* Close this end too after it is duplicated to standard I/O */ close(child); /* Close all previously opened pipe streams, as popen does */ for (chain = chain_hdr; chain != NULL; chain = chain->next) close(fileno(chain_hdr->stream)); /* Execute the command via sh */ execl("/bin/sh", "sh", "-c", command, NULL); /* Exit the child process if execl fails */ _exit(127); } else { /* parent */ /* Close the other end */ close(child); /* Open a new stream with the file descriptor of the pipe */ stream = fdopen(parent, "r+"); if (stream == NULL) { close(parent); return NULL; } /* Allocate memory for the dpipe_t struct */ chain = (dpipe_t *)malloc(sizeof(dpipe_t)); if (chain == NULL) { fclose(stream); return NULL; } /* Store necessary info for dpclose, and adjust chain header */ chain->stream = stream; chain->pid = pid;#ifdef _REENTRANT pthread_mutex_lock(&chain_mtx);#endif chain->next = chain_hdr; chain_hdr = chain;#ifdef _REENTRANT pthread_mutex_unlock(&chain_mtx);#endif /* Successfully return here */ return stream; }}/*** Closes a duplex pipe stream from/to a process.** @param stream pointer to a pipe stream returned from a previous* #dpopen call* @return the exit status of the command if successful; /c -1* if an error occurs*/int dpclose(FILE *stream){ int status; pid_t pid, wait_res; dpipe_t *cur; dpipe_t **ptr; /* Search for the stream starting from chain header */#ifdef _REENTRANT pthread_mutex_lock(&chain_mtx);#endif ptr = &chain_hdr; while ( (cur = *ptr) != NULL) { /* Not end of chain */ if (cur->stream == stream) { /* Stream found */ pid = cur->pid; *ptr = cur->next;#ifdef _REENTRANT pthread_mutex_unlock(&chain_mtx);#endif free(cur); if (fclose(stream) != 0) return -1; do { wait_res = waitpid(pid, &status, 0); } while (wait_res == -1 && errno == EINTR); if (wait_res == -1) return -1; return status; } ptr = &cur->next; /* Check next */ }#ifdef _REENTRANT pthread_mutex_unlock(&chain_mtx);#endif errno = EBADF; /* If the given stream is not found */ return -1;}/*** Flushes the buffer and sends /c EOF to the process at the other end* of the duplex pipe stream.** @param stream pointer to a pipe stream returned from a previous* #dpopen call* @return /c 0 if successful; /c -1 if an error occurs*/int dphalfclose(FILE *stream){ /* Ensure all data are flushed */ if (fflush(stream) == EOF) return -1; /* Close pipe for writing */ return shutdown(fileno(stream), SHUT_WR);}要愈合不能,要忘却不能,要再次拥抱,却鼓不起足够的勇气,

Linux 上实现双向进程间通信管道


