QT 线程池 + TCP 小试(二)实现通信功能

*免分资源链接点击打开链接

有了线程池,我们下一步就利用 QTcpServer 搭建一个服务器,接受客户端的连接,并把数据发送到线程池上。由于 QTcpServer 资料太多了,这里不在赘述。唯一值得注意的是,当客户端退出时,如果线程池队列中还有该客户的信息,这个信息还会被处理,只是无法再发送回去而已。其实,还可实现成客户端退出,就发一个信号到线程池,删除自己的所有任务。这个也很简单,但之所以没有做,因为这些数据的处理结果可能还会被其他消费者(而非生产者自己)使用,最典型的例子是从工业传感器上采集的数据,其生成的图像需要存储到设备中去。

QTcpSocket的 Write 方法默认是支持大体积数据的,即使一次发了500MB的数据,只要硬件资源可以承受,调用也会成功并立刻返回。接受者会以一定的载荷大小不停的触发readyRead,直到发送全部成功。但是,为了能够观察到并控制收发队列中的数据包的大小、体积,我们在外层实现了一个发送队列,每次以 payLoad为大小发送数据包。这是从MFC中带来的习惯,很难说好坏。

qghtcpserver.h

#ifndef QGHTCPSERVER_H#define QGHTCPSERVER_H#include <QTcpServer>#include <QMap>#include <QList>class QGHTcpServer : public QTcpServer{Q_OBJECTpublic:QGHTcpServer(QObject *parent,int nPayLoad = 4096);~QGHTcpServer();//踢出所有客户void KickAllClients();QList <QObject *> clientsList();void SetPayload(int nPayload);private:QMap<QObject *,QList<QByteArray> > m_buffer_sending;QMap<QObject *,QList<qint64> > m_buffer_sending_offset;QMap<QObject*,int> m_clientList;int m_nPayLoad;public slots://新的客户连接到来void new_client_recieved();//客户连接被关闭void client_closed();//新的数据到来void new_data_recieved();//一批数据发送完毕void some_data_sended(qint64);//客户端错误void displayError(QAbstractSocket::SocketError socketError);//向客户端发送数据void SendDataToClient(QObject * objClient,const QByteArray & dtarray);//向客户端广播数据,不包括 objFromClientvoid BroadcastData(QObject * objFromClient,const QByteArray & dtarray);signals://错误信息void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError);//新的客户端连接void evt_NewClientConnected(QObject * client);//客户端退出void evt_ClientDisconnected(QObject * client);//收到一批数据void evt_Data_recieved(QObject * ,const QByteArray & );//一批数据被发送void evt_Data_transferred(QObject * client,qint64);};#endif // QGHTCPSERVER_Hqghtcpserver.cpp#include "qghtcpserver.h"#include <assert.h>#include <QTcpSocket>QGHTcpServer::QGHTcpServer(QObject *parent,int nPayLoad ): QTcpServer(parent),m_nPayLoad(nPayLoad){assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);connect(this, SIGNAL(newConnection()), this, SLOT(new_client_recieved()));}QGHTcpServer::~QGHTcpServer(){}QList <QObject *> QGHTcpServer::clientsList(){return m_clientList.keys();}void QGHTcpServer::SetPayload(int nPayload){m_nPayLoad = nPayload;assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024);}void QGHTcpServer::new_client_recieved(){QTcpSocket * sock_client = nextPendingConnection();while (sock_client){connect(sock_client, SIGNAL(readyRead()),this, SLOT(new_data_recieved()));connect(sock_client, SIGNAL(disconnected()),this,SLOT(client_closed()));connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError)));connect(sock_client, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64)));m_clientList[sock_client] = 0;emit evt_NewClientConnected(sock_client);sock_client = nextPendingConnection();}}void QGHTcpServer::client_closed(){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_ClientDisconnected(pSock);m_buffer_sending.remove(pSock);m_buffer_sending_offset.remove(pSock);m_clientList.remove(pSock);pSock->deleteLater();}}void QGHTcpServer::new_data_recieved(){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock)emit evt_Data_recieved(pSock,pSock->readAll());}void QGHTcpServer::some_data_sended(qint64 wsended){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_Data_transferred(pSock,wsended);QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];while (list_sock_data.empty()==false){QByteArray & arraySending = *list_sock_data.begin();qint64 & currentOffset = *list_offset.begin();qint64 nTotalBytes = arraySending.size();assert(nTotalBytes>=currentOffset);qint64 nBytesWritten = pSock->write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad));currentOffset += nBytesWritten;if (currentOffset>=nTotalBytes){list_offset.pop_front();list_sock_data.pop_front();}elsebreak;}}}void QGHTcpServer::displayError(QAbstractSocket::SocketError socketError){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender());if (pSock){emit evt_SocketError(pSock,socketError);pSock->disconnectFromHost();}}void QGHTcpServer::SendDataToClient(QObject * objClient,const QByteArray & dtarray){if (m_clientList.find(objClient)==m_clientList.end())return;QTcpSocket * pSock = qobject_cast<QTcpSocket*>(objClient);if (pSock&&dtarray.size()){QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];if (list_sock_data.empty()==true){qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));if (bytesWritten < dtarray.size()){list_sock_data.push_back(dtarray);list_offset.push_back(bytesWritten);}}else{list_sock_data.push_back(dtarray);list_offset.push_back(0);}}}void QGHTcpServer::BroadcastData(QObject * objClient,const QByteArray & dtarray){for(QMap<QObject *,int>::iterator p = m_clientList.begin();p!=m_clientList.end();p++){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(p.key());if (pSock&&dtarray.size()&&pSock!=objClient){QList<QByteArray> & list_sock_data = m_buffer_sending[pSock];QList<qint64> & list_offset = m_buffer_sending_offset[pSock];if (list_sock_data.empty()==true){qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad));if (bytesWritten < dtarray.size()){list_sock_data.push_back(dtarray);list_offset.push_back(bytesWritten);}else{list_sock_data.push_back(dtarray);list_offset.push_back(0);}}}}}void QGHTcpServer::KickAllClients(){QList<QObject *> clientList = m_clientList.keys();foreach(QObject * obj,clientList){QTcpSocket * pSock = qobject_cast<QTcpSocket*>(obj);if (pSock){pSock->disconnectFromHost();}}}下一次,,我会介绍最后的实现功能。

版权声明:本文为博主原创文章,未经博主允许不得转载。

力微休负重,言轻莫劝人。

QT 线程池 + TCP 小试(二)实现通信功能

相关文章:

你感兴趣的文章:

标签云: