[SOLVED] Threaded QTcpServer Multiple QSslSocket clients with QDataStream readyread() sometimes not emitted
-
Hey guys,
I've got a huge network problem with my application. The base of this application is a client server based architecture. The clients connect to the server via SSL encryption. The server has a MySQL connection and the clients send the query to the server, it is executed and the answer is sent back to client. Furthermore the server is used to update the clients in between. Every client can send an update message to the server and the server spreads it to the other clients.
Here comes the problem. Everything is doing fine untill I'll come to point where one client sents an update. It is correctly spread to all the other clients, but when a client who received an update want's to send the next data to the server it never gets the
readyread()
signal back, even if I see that the server sends the answer back correctly. I assume some strange behaviour due to multithreaded architecture with QSslSocket, but I am not sure at all. I'll try to solve this issue for more than 3 days now. Please help me!Here comes the code:
ClientThread on the server side:
ClientThread::ClientThread(int socketDescr, QString serverCert, QString DBHostName, qint16 DBPort, QString DBName, QString DBUserName, QString DBPassword, bool isMySqp, QObject *parent): QThread(parent) { qDebug() << "ClientThread::ClientThread("<<socketDescr<<","<<serverCert<<","<<DBPort<<","<< DBHostName<<","<<DBName<<","<<DBUserName<<","<<DBPassword<<","<<isMySqp<<","<<parent<<"):"<<parent; socketDescriptor = socketDescr; this->serverCert = serverCert; this->DBHostName = DBHostName; this->DBPort = DBPort; this->DBName = DBName; this->DBUserName = DBUserName; this->DBPassword = DBPassword; this->isMySql = isMySqp; connect(this, SIGNAL(startNewConnection()), this, SLOT(createNewSocket())); emit startNewConnection(); } ClientThread::~ClientThread() { qDebug() << "ClientThread::~ClientThread()"; datab.close(); } void ClientThread::run() { qDebug() << "ClientThread::run()"; exec(); } void ClientThread::connectSocketSignals() { qDebug() << "ClientThread::connectSocketSignals()"; connect(socket, SIGNAL(readyRead()), this, SLOT(messageFromClient()), Qt::DirectConnection); connect(socket, SIGNAL(encryptedBytesWritten(qint64)), this, SLOT(encryptedBytesWritten(qint64)), Qt::DirectConnection); connect(socket, SIGNAL(modeChanged(QSslSocket::SslMode)), this, SLOT(modeChanged(QSslSocket::SslMode)), Qt::DirectConnection); connect(socket, SIGNAL(peerVerifyError(QSslError)), this, SLOT(peerVerifyError(QSslError)), Qt::DirectConnection); connect(socket, SIGNAL(sslErrors(QList<QSslError>)), this, SLOT(sslErrors(QList<QSslError>)), Qt::DirectConnection); connect(socket, SIGNAL(connected()), this, SLOT(connected()), Qt::DirectConnection); connect(socket, SIGNAL(disconnected()), this, SLOT(disconnected()), Qt::DirectConnection); connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError)), Qt::DirectConnection); connect(socket, SIGNAL(hostFound()), this, SLOT(hostFound()), Qt::DirectConnection); connect(socket, SIGNAL(proxyAuthenticationRequired(QNetworkProxy,QAuthenticator*)), this, SLOT(proxyAuthenticationRequired(QNetworkProxy,QAuthenticator*)), Qt::DirectConnection); connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(stateChanged(QAbstractSocket::SocketState)), Qt::DirectConnection); } void ClientThread::sendSqlResultBack(QStringList resultList, QString type) { QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_5_4); out << (quint32)0; //Blocksize out << type; //Type out << getClientID(); // ID out << resultList; out.device()->seek(0); out << (quint32)(block.size() - sizeof(quint32)); qDebug() << block.size(); socket->write(block); socket->flush(); } void ClientThread::sendSqlResultBack(QString result, QString type) { QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_5_4); out << (quint32)0; //Blocksize out << type; //Type out << getClientID(); // ID out << result; out.device()->seek(0); out << (quint32)(block.size() - sizeof(quint32)); qDebug() << block.size(); socket->write(block); socket->flush(); } void ClientThread::createNewSocket() { socket = new QSslSocket(); //socket->moveToThread(this); if(!socket) { qWarning("not enough memory to create new QSslSocket!"); emit error(socket->error()); exit(1); } //New Connection qDebug() << "New Thread with socket-ID: " << socketDescriptor; socket->setProtocol(QSsl::SslV3); connectSocketSignals(); socket->setSocketOption(QAbstractSocket::KeepAliveOption, true ); if(!socket->setSocketDescriptor(socketDescriptor)) { qWarning("could not set socket descriptor!"); socket->deleteLater(); exit(2); } startServerEncryption(); qDebug() << "Client connected" << socketDescriptor; addr = socket->localAddress(); port = socket->localPort(); emit clientInfo(socket); } void ClientThread::messageFromClient() { qDebug("ClientThread::messageFromClient()"); QDataStream in(socket); in.setVersion(QDataStream::Qt_5_4); while(socket->bytesAvailable()) { if(socket->bytesAvailable() < (int)sizeof(quint32)) return; int objectSize; in >> objectSize; QByteArray buffer; char *temp = new char[objectSize]; int bufferSize = in.readRawData(temp, objectSize); buffer.append(temp, bufferSize); bool readyForParsing = false; if(buffer.size() == objectSize) { readyForParsing = true; } else if (buffer.size() > objectSize) { qDebug() << "buffer size!!!"; readyForParsing = true; } else { while (buffer.size() < objectSize) { socket->waitForReadyRead(5000); char *temp = new char[objectSize - buffer.size()]; int bufferSize = in.readRawData(temp, objectSize - buffer.size()); buffer.append(temp, bufferSize); readyForParsing = true; } } if(readyForParsing == true) { QDataStream toParse(&buffer, QIODevice::ReadOnly); toParse.setVersion(QDataStream::Qt_5_4); QString type; toParse >> type; QString id; toParse >> id; if(clientID.isEmpty()) { clientID = id; if (isMySql == false) { datab = QSqlDatabase::addDatabase("QODBC", clientID); datab.setDatabaseName(""); datab.setDatabaseName(DBName); datab.setPassword(DBPassword); } else { datab = QSqlDatabase::addDatabase("QMYSQL", clientID); datab.setHostName(DBHostName); datab.setPort(DBPort); datab.setDatabaseName(DBName); datab.setUserName(DBUserName); datab.setPassword(DBPassword); } } if (type == "upd") { QString updateMessage; toParse >> updateMessage; qDebug() << "ClientThread::messageFromClient() -> upd ->" << updateMessage; spreadNewUpdate(updateMessage, id); } else if (type == "dbq") { QString sqlOrder; toParse >> sqlOrder; if (datab.open()) { QSqlQuery DBquery(datab.database(clientID)); QStringList sqlResultList; if (DBquery.exec(sqlOrder)) { while (DBquery.next()) { sqlResultList.append(DBquery.value(0).toString()); } qDebug() << "ClientThread::messageFromClient() -> dbq ->" << sqlOrder; qDebug() << "ClientThread::messageFromClient() -> dbq ->" << sqlResultList; sendSqlResultBack(sqlResultList, type); return; } else { sqlError(clientID+":\t"+DBquery.lastError().text()); sendSqlResultBack(DBquery.lastError().text(), "err"); } } } else if (type == "dbm") { QString strSize; toParse >> strSize; int size = strSize.toInt(); QString sqlOrder; toParse >> sqlOrder; if (datab.open()) { QSqlQuery DBquery(datab.database(clientID)); QStringList sqlResultList; if (DBquery.exec(sqlOrder)) { while (DBquery.next()) { for(int i=0; i<size; i++) { sqlResultList.append(DBquery.value(i).toString()); } } qDebug() << "ClientThread::messageFromClient() -> dbm ->" << sqlOrder; qDebug() << "ClientThread::messageFromClient() -> dbm ->" << sqlResultList; sendSqlResultBack(sqlResultList, type); return; } else { sqlError(clientID+":\t"+DBquery.lastError().text()); sendSqlResultBack(DBquery.lastError().text(), "err"); } } } else if (type == "dbs") { QString sqlOrder; toParse >> sqlOrder; if (datab.open()) { QSqlQuery DBquery(datab.database(clientID)); QString sqlResult; if (DBquery.exec(sqlOrder)) { if (DBquery.next()) { sqlResult = DBquery.value(0).toString(); qDebug() << "ClientThread::messageFromClient() -> dbs ->" << sqlOrder; qDebug() << "ClientThread::messageFromClient() -> dbs ->" << sqlResult; } sendSqlResultBack(sqlResult, type); return; } else { sqlError(clientID+":\t"+DBquery.lastError().text()); sendSqlResultBack(DBquery.lastError().text(), "err"); } } } else if (type == "set") { QString sqlOrder; toParse >> sqlOrder; if (datab.open()) { QSqlQuery DBquery(datab.database(clientID)); if(DBquery.exec(sqlOrder)==false) { sqlError(clientID+":\t"+DBquery.lastError().text()); sendSqlResultBack(DBquery.lastError().text(), "err"); } else { qDebug() << "ClientThread::messageFromClient() -> set ->" << sqlOrder; sendSqlResultBack("true", "set"); } } } else { QString wtf; toParse >> wtf; qDebug() << "WTF: " << wtf; } } } } void ClientThread::sendUpdate(QString message, QString id) { qDebug() << "ClientThread::sendUpdate("<<message<<")"; if(id != clientID) { QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_5_4); out << (quint32)0; //Blocksize out << QString("upd"); //Type out << id; // ID out << message; out.device()->seek(0); out << (quint32)(block.size() - sizeof(quint32)); socket->write(block); socket->flush(); qDebug() << "ClientThread::sendUpdate("<<message<<")" << "GESENDET"; } }
The
ClientThread::messageFromClient()
method receives the clients database query, executes it and sends the result back viaQSslSocket
. TheClientThread::sendUpdate(QString message, QString id)
is connected to a signal in the servers main thread and executed if any client sends an update to the server.No here comes the client side:
UpdateClient::UpdateClient(bool isMySQL, QWidget *parent) : QWidget(parent) { qDebug() << "UpdateClient::UpdateClient("<< isMySQL << "," << parent << "):" << parent; //create ID clientID = QUuid::createUuid(); //create SSL socket client_socket = new QSslSocket; client_socket->setProtocol(QSsl::SslV3); //connect socket signals connect(client_socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError))); connect(client_socket, SIGNAL(sslErrors(QList<QSslError>)), this, SLOT(sslErrors(QList<QSslError>))); connect(client_socket, SIGNAL(readyRead()), this, SLOT(tcpReady())); mySQL = isMySQL; } UpdateClient::~UpdateClient() { qDebug() << "UpdateClient::~UpdateClient()"; client_socket->disconnectFromHost(); client_socket->close(); } void UpdateClient::tcpReady() { qDebug() << "UpdateClient::tcpReady()"; QDataStream in(client_socket); in.setVersion(QDataStream::Qt_5_4); while(client_socket->bytesAvailable()) { if(client_socket->bytesAvailable() < (int)sizeof(quint32)) return; int objectSize; in >> objectSize; QByteArray buffer; char *temp = new char[objectSize]; int bufferSize = in.readRawData(temp, objectSize); buffer.append(temp, bufferSize); bool readyForParsing = false; if(buffer.size() == objectSize) { readyForParsing = true; } else if (buffer.size() > objectSize) { qDebug() << "buffer size!!!"; readyForParsing = true; } else { while (buffer.size() < objectSize) { client_socket->waitForReadyRead(5000); char *temp = new char[objectSize - buffer.size()]; int bufferSize = in.readRawData(temp, objectSize - buffer.size()); buffer.append(temp, bufferSize); readyForParsing = true; } } if(readyForParsing == true) { QDataStream toParse(&buffer, QIODevice::ReadOnly); toParse.setVersion(QDataStream::Qt_5_4); QString type; toParse >> type; QString ID; toParse >> ID; if (type == "upd") { QString updateMessage; toParse >> updateMessage; updateData(updateMessage); } else if (type == "dbs") { toParse >> dbResultString; if (dbResultString == "empty_str") { dbResultString = ""; } dbStringReady(); } else if (type == "dbq") { toParse >> dbResultList; dbQueryReady(); } else if (type == "dbm") { toParse >> dbResultList; dbMoreQueryReady(); } else if (type == "set") { toParse >> dbOrderExecuted; dbOrderWasExecuted(); } else if (type == "err") { QString sqlError; toParse >> sqlError; QMessageBox *error = new QMessageBox(); error->setText(sqlError); error->setStandardButtons(QMessageBox::Ok); error->exec(); dbResultList.clear(); dbResultString.clear(); dbStringReady(); dbQueryReady(); dbMoreQueryReady(); dbOrderWasExecuted(); } else { QString input; qDebug() << input; dbStringReady(); dbQueryReady(); dbMoreQueryReady(); dbOrderWasExecuted(); } } } } void UpdateClient::sendUpdateToServer(QString updateMessage) { qDebug() << "UpdateClient::sendUpdateToServer("<< updateMessage <<")"; QString temp = updateMessage; temp += ";"; temp += database; temp += "#"; QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_5_4); out << (quint32)0; //Blocksize out << QString("upd"); //Type out << clientID.toString(); //ID out << temp; out.device()->seek(0); out << (quint32)(block.size() - sizeof(quint32)); if(client_socket->isOpen()&&client_socket->isEncrypted()) { client_socket->write(block); client_socket->flush(); } } QStringList UpdateClient::getDBquery(QString SQLorder) { qDebug() << "UpdateClient::getDBquery("<<SQLorder<<")"; if(mySQL==false) { SQLorder = transferMySQLToAccess(SQLorder); } QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_5_4); out << (quint32)0; //Blocksize out << QString("dbq"); //Type out << clientID.toString(); // ID out << SQLorder; out.device()->seek(0); out << (quint32)(block.size() - sizeof(quint32)); if(client_socket->isOpen()&&client_socket->isEncrypted()) { qDebug() << block.size(); client_socket->write(block); client_socket->flush(); } QEventLoop *loop = new QEventLoop(); connect(this, SIGNAL(dbQueryReady()), loop, SLOT(quit())); loop->exec(); return dbResultList; } QStringList UpdateClient::getDBMorequery(QString SQLorder, int size) { qDebug() << "UpdateClient::getDBMorequery("<<SQLorder<<","<<size<<")"; if(mySQL==false) { SQLorder = transferMySQLToAccess(SQLorder); } //this function is used if more columns are queried, "size" is the number of colums QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_5_4); out << (quint32)0; //Blocksize out << QString("dbm"); //Type out << clientID.toString(); // ID out << QString::number(size); //Size out << SQLorder; out.device()->seek(0); out << (quint32)(block.size() - sizeof(quint32)); if(client_socket->isOpen()&&client_socket->isEncrypted()) { qDebug() << block.size(); client_socket->write(block); client_socket->flush(); } QEventLoop *loop = new QEventLoop(); connect(this, SIGNAL(dbMoreQueryReady()), loop, SLOT(quit())); loop->exec(); return dbResultList; } QString UpdateClient::getDBstring(QString SQLorder) { qDebug() << "UpdateClient::getDBstring("<<SQLorder<<")"; if(mySQL==false) { SQLorder = transferMySQLToAccess(SQLorder);; } //this function is used if just a single String is queried QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_5_4); out << (quint32)0; //Blocksize out << QString("dbs"); //Type out << clientID.toString(); // ID out << SQLorder; out.device()->seek(0); out << (quint32)(block.size() - sizeof(quint32)); if(client_socket->isOpen()&&client_socket->isEncrypted()) { qDebug() << block.size(); client_socket->write(block); client_socket->flush(); } QEventLoop *loop = new QEventLoop(); connect(this, SIGNAL(dbStringReady()), loop, SLOT(quit())); loop->exec(); return dbResultString; } bool UpdateClient::setDBorder(QString SQLorder) { qDebug() << "UpdateClient::setDBorder("<<SQLorder<<")"; if(mySQL==false) { SQLorder = transferMySQLToAccess(SQLorder); } //this function is used for any other regular SQL order (no query!) QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_5_4); out << (quint32)0; //Blocksize out << QString("set"); //Type out << clientID.toString(); // ID out << SQLorder; out.device()->seek(0); out << (quint32)(block.size() - sizeof(quint32)); if(client_socket->isOpen()&&client_socket->isEncrypted()) { qDebug() << block.size(); client_socket->write(block); client_socket->flush(); } QEventLoop *loop = new QEventLoop(); connect(this, SIGNAL(dbOrderWasExecuted()), loop, SLOT(quit())); loop->exec(); return dbOrderExecuted; }
On the client side the methos
UpdateClient::getDBquery(QString SQLorder)
,UpdateClient::getDBMorequery(QString SQLorder, int size)
,UpdateClient::getDBstring(QString SQLorder)
are called to query the database via the server. They block via anQEventLooop
untiltcpready()
ist emitted, the answer is successfully parsed and the signal connected to the event loopsclose()
slot is emitted. Furthermore if a client has to send an update he uses theUpdateClient::sendUpdateToServer(QString updateMessage)
method which is non blocking.If you need more information or code please let me now. I really got stucked here and I dont't know how to resolve that issue.
-
Ok guy thank you for that much replies :P! I solved the issue by myself anyway. In case sombebody has a similar problem here comes the solution:
What I did wrong was that I connected a time consuming function to the
UpdateClient::tcpReady()
slot. In this function I did some stuff which also lead to the emission ofUpdateClient::tcpReady()
. This broke theQEventQueue
. So to what I had to chance was to skip the direct signal slot connection an do this in theUpdateClient::tcpReady()
:QTimer::singleShot(0, this, SIGNAL(updateAvailable()));
No everything is working fine ;).