[SOLVED] Threaded QTcpServer Multiple QSslSocket clients with QDataStream readyread() sometimes not emitted



  • Hey guys,

    I've got a huge network problem with my application. The base of this application is a client server based architecture. The clients connect to the server via SSL encryption. The server has a MySQL connection and the clients send the query to the server, it is executed and the answer is sent back to client. Furthermore the server is used to update the clients in between. Every client can send an update message to the server and the server spreads it to the other clients.

    Here comes the problem. Everything is doing fine untill I'll come to point where one client sents an update. It is correctly spread to all the other clients, but when a client who received an update want's to send the next data to the server it never gets the readyread() signal back, even if I see that the server sends the answer back correctly. I assume some strange behaviour due to multithreaded architecture with QSslSocket, but I am not sure at all. I'll try to solve this issue for more than 3 days now. Please help me!

    Here comes the code:

    ClientThread on the server side:

    ClientThread::ClientThread(int socketDescr, QString serverCert, QString DBHostName, qint16 DBPort, QString DBName, QString DBUserName, QString DBPassword, bool isMySqp, QObject *parent):
        QThread(parent)
    {
        qDebug() << "ClientThread::ClientThread("<<socketDescr<<","<<serverCert<<","<<DBPort<<","<< DBHostName<<","<<DBName<<","<<DBUserName<<","<<DBPassword<<","<<isMySqp<<","<<parent<<"):"<<parent;
    
        socketDescriptor = socketDescr;
        this->serverCert = serverCert;
    
        this->DBHostName = DBHostName;
        this->DBPort = DBPort;
        this->DBName = DBName;
        this->DBUserName = DBUserName;
        this->DBPassword = DBPassword;
        this->isMySql = isMySqp;
    
        connect(this, SIGNAL(startNewConnection()), this, SLOT(createNewSocket()));
        emit startNewConnection();
    }
    
    ClientThread::~ClientThread()
    {
        qDebug() << "ClientThread::~ClientThread()";
    
        datab.close();
    }
    
    void ClientThread::run()
    {
        qDebug() << "ClientThread::run()";
    
        exec();
    }
    
    void ClientThread::connectSocketSignals()
    {
        qDebug() << "ClientThread::connectSocketSignals()";
    
        connect(socket, SIGNAL(readyRead()), this, SLOT(messageFromClient()), Qt::DirectConnection);
        connect(socket, SIGNAL(encryptedBytesWritten(qint64)), this, SLOT(encryptedBytesWritten(qint64)), Qt::DirectConnection);
        connect(socket, SIGNAL(modeChanged(QSslSocket::SslMode)), this, SLOT(modeChanged(QSslSocket::SslMode)), Qt::DirectConnection);
        connect(socket, SIGNAL(peerVerifyError(QSslError)), this, SLOT(peerVerifyError(QSslError)), Qt::DirectConnection);
        connect(socket, SIGNAL(sslErrors(QList<QSslError>)), this, SLOT(sslErrors(QList<QSslError>)), Qt::DirectConnection);
        connect(socket, SIGNAL(connected()), this, SLOT(connected()), Qt::DirectConnection);
        connect(socket, SIGNAL(disconnected()), this, SLOT(disconnected()), Qt::DirectConnection);
        connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError)), Qt::DirectConnection);
        connect(socket, SIGNAL(hostFound()), this, SLOT(hostFound()), Qt::DirectConnection);
        connect(socket, SIGNAL(proxyAuthenticationRequired(QNetworkProxy,QAuthenticator*)), this, SLOT(proxyAuthenticationRequired(QNetworkProxy,QAuthenticator*)), Qt::DirectConnection);
        connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)), this, SLOT(stateChanged(QAbstractSocket::SocketState)), Qt::DirectConnection);
    }
    
    void ClientThread::sendSqlResultBack(QStringList resultList, QString type)
    {
        QByteArray block;
        QDataStream out(&block, QIODevice::WriteOnly);
        out.setVersion(QDataStream::Qt_5_4);
        out << (quint32)0; //Blocksize
        out << type; //Type
        out << getClientID(); // ID
        out << resultList;
        out.device()->seek(0);
        out << (quint32)(block.size() - sizeof(quint32));
    
        qDebug() << block.size();
    
        socket->write(block);
        socket->flush();
    }
    
    void ClientThread::sendSqlResultBack(QString result, QString type)
    {
        QByteArray block;
        QDataStream out(&block, QIODevice::WriteOnly);
        out.setVersion(QDataStream::Qt_5_4);
        out << (quint32)0; //Blocksize
        out << type; //Type
        out << getClientID(); // ID
        out << result;
        out.device()->seek(0);
        out << (quint32)(block.size() - sizeof(quint32));
    
        qDebug() << block.size();
    
        socket->write(block);
        socket->flush();
    }
    
    void ClientThread::createNewSocket()
    {
        socket = new QSslSocket();
        //socket->moveToThread(this);
    
        if(!socket)
        {
            qWarning("not enough memory to create new QSslSocket!");
            emit error(socket->error());
            exit(1);
        }
    
        //New Connection
        qDebug() << "New Thread with socket-ID: " << socketDescriptor;
    
        socket->setProtocol(QSsl::SslV3);
    
        connectSocketSignals();
    
        socket->setSocketOption(QAbstractSocket::KeepAliveOption, true );
    
        if(!socket->setSocketDescriptor(socketDescriptor))
        {
            qWarning("could not set socket descriptor!");
            socket->deleteLater();
            exit(2);
        }
    
        startServerEncryption();
    
        qDebug() << "Client connected" << socketDescriptor;
    
        addr = socket->localAddress();
        port = socket->localPort();
        emit clientInfo(socket);
    }
    
    void ClientThread::messageFromClient()
    {
        qDebug("ClientThread::messageFromClient()");
    
        QDataStream in(socket);
        in.setVersion(QDataStream::Qt_5_4);
    
        while(socket->bytesAvailable())
        {
            if(socket->bytesAvailable() < (int)sizeof(quint32))
                return;
    
            int objectSize;
            in >> objectSize;
    
            QByteArray buffer;
            char *temp = new char[objectSize];
            int bufferSize = in.readRawData(temp, objectSize);
            buffer.append(temp, bufferSize);
    
            bool readyForParsing = false;
    
            if(buffer.size() == objectSize)
            {
                readyForParsing = true;
            }
            else if (buffer.size() > objectSize)
            {
                qDebug() << "buffer size!!!";
                readyForParsing = true;
            }
            else
            {
                while (buffer.size() < objectSize) {
                    socket->waitForReadyRead(5000);
                    char *temp = new char[objectSize - buffer.size()];
                    int bufferSize = in.readRawData(temp, objectSize - buffer.size());
                    buffer.append(temp, bufferSize);
                    readyForParsing = true;
                }
            }
    
            if(readyForParsing == true)
            {
                QDataStream toParse(&buffer, QIODevice::ReadOnly);
                toParse.setVersion(QDataStream::Qt_5_4);
    
                QString type;
                toParse >> type;
    
                QString id;
                toParse >> id;
    
                if(clientID.isEmpty())
                {
                    clientID = id;
    
                    if (isMySql == false)
                    {
                        datab = QSqlDatabase::addDatabase("QODBC", clientID);
                        datab.setDatabaseName("");
                        datab.setDatabaseName(DBName);
                        datab.setPassword(DBPassword);
                    }
                    else
                    {
                        datab = QSqlDatabase::addDatabase("QMYSQL", clientID);
                        datab.setHostName(DBHostName);
                        datab.setPort(DBPort);
                        datab.setDatabaseName(DBName);
                        datab.setUserName(DBUserName);
                        datab.setPassword(DBPassword);
                    }
                }
    
                if (type == "upd")
                {
                    QString updateMessage;
                    toParse >> updateMessage;
    
                    qDebug() << "ClientThread::messageFromClient() -> upd ->" << updateMessage;
                    spreadNewUpdate(updateMessage, id);
                }
                else if (type == "dbq")
                {
                    QString sqlOrder;
                    toParse >> sqlOrder;
    
                    if (datab.open()) {
                        QSqlQuery DBquery(datab.database(clientID));
                        QStringList sqlResultList;
    
                        if (DBquery.exec(sqlOrder)) {
                            while (DBquery.next()) {
                                sqlResultList.append(DBquery.value(0).toString());
                            }
                            qDebug() << "ClientThread::messageFromClient() -> dbq ->" << sqlOrder;
                            qDebug() << "ClientThread::messageFromClient() -> dbq ->" << sqlResultList;
                            sendSqlResultBack(sqlResultList, type);
                            return;
                        }
                        else {
                            sqlError(clientID+":\t"+DBquery.lastError().text());
                            sendSqlResultBack(DBquery.lastError().text(), "err");
                        }
                    }
                }
                else if (type == "dbm")
                {
                    QString strSize;
                    toParse >> strSize;
                    int size = strSize.toInt();
    
                    QString sqlOrder;
                    toParse >> sqlOrder;
    
                    if (datab.open()) {
                        QSqlQuery DBquery(datab.database(clientID));
                        QStringList sqlResultList;
    
                        if (DBquery.exec(sqlOrder)) {
                            while (DBquery.next()) {
                                for(int i=0; i<size; i++)
                                {
                                    sqlResultList.append(DBquery.value(i).toString());
                                }
                            }
                            qDebug() << "ClientThread::messageFromClient() -> dbm ->" << sqlOrder;
                            qDebug() << "ClientThread::messageFromClient() -> dbm ->" << sqlResultList;
                            sendSqlResultBack(sqlResultList, type);
                            return;
                        }
                        else {
                            sqlError(clientID+":\t"+DBquery.lastError().text());
                            sendSqlResultBack(DBquery.lastError().text(), "err");
                        }
                    }
                }
                else if (type == "dbs")
                {
                    QString sqlOrder;
                    toParse >> sqlOrder;
    
                    if (datab.open()) {
                        QSqlQuery DBquery(datab.database(clientID));
                        QString sqlResult;
    
                        if (DBquery.exec(sqlOrder)) {
                            if (DBquery.next()) {
                                sqlResult = DBquery.value(0).toString();
                                qDebug() << "ClientThread::messageFromClient() -> dbs ->" << sqlOrder;
                                qDebug() << "ClientThread::messageFromClient() -> dbs ->" << sqlResult;
                            }
                            sendSqlResultBack(sqlResult, type);
                            return;
                        }
                        else {
                            sqlError(clientID+":\t"+DBquery.lastError().text());
                            sendSqlResultBack(DBquery.lastError().text(), "err");
                        }
                    }
                }
                else if (type == "set")
                {
                    QString sqlOrder;
                    toParse >> sqlOrder;
    
                    if (datab.open()) {
                        QSqlQuery DBquery(datab.database(clientID));
    
                        if(DBquery.exec(sqlOrder)==false)
                        {
                            sqlError(clientID+":\t"+DBquery.lastError().text());
                            sendSqlResultBack(DBquery.lastError().text(), "err");
                        }
                        else
                        {
                            qDebug() << "ClientThread::messageFromClient() -> set ->" << sqlOrder;
                            sendSqlResultBack("true", "set");
                        }
                    }
                }
                else
                {
                    QString wtf;
                    toParse >> wtf;
                    qDebug() << "WTF: " << wtf;
                }
            }
        }
    }
    
    void ClientThread::sendUpdate(QString message, QString id)
    {
        qDebug() << "ClientThread::sendUpdate("<<message<<")";
    
        if(id != clientID)
        {
            QByteArray block;
            QDataStream out(&block, QIODevice::WriteOnly);
            out.setVersion(QDataStream::Qt_5_4);
            out << (quint32)0; //Blocksize
            out << QString("upd"); //Type
            out << id; // ID
            out << message;
            out.device()->seek(0);
            out << (quint32)(block.size() - sizeof(quint32));
    
            socket->write(block);
            socket->flush();
    
            qDebug() << "ClientThread::sendUpdate("<<message<<")" << "GESENDET";
        }
    }
    

    The ClientThread::messageFromClient() method receives the clients database query, executes it and sends the result back via QSslSocket. The ClientThread::sendUpdate(QString message, QString id) is connected to a signal in the servers main thread and executed if any client sends an update to the server.

    No here comes the client side:

    UpdateClient::UpdateClient(bool isMySQL, QWidget *parent)
        : QWidget(parent)
    {
        qDebug() << "UpdateClient::UpdateClient("<< isMySQL << "," << parent << "):" << parent;
    
        //create ID
        clientID = QUuid::createUuid();
    
        //create SSL socket
        client_socket = new QSslSocket;
        client_socket->setProtocol(QSsl::SslV3);
    
        //connect socket signals
        connect(client_socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError)));
        connect(client_socket, SIGNAL(sslErrors(QList<QSslError>)), this, SLOT(sslErrors(QList<QSslError>)));
        connect(client_socket, SIGNAL(readyRead()), this, SLOT(tcpReady()));
    
        mySQL = isMySQL;
    }
    
    UpdateClient::~UpdateClient()
    {
        qDebug() << "UpdateClient::~UpdateClient()";
    
        client_socket->disconnectFromHost();
        client_socket->close();
    }
    
    void UpdateClient::tcpReady()
    {
        qDebug() << "UpdateClient::tcpReady()";
    
        QDataStream in(client_socket);
        in.setVersion(QDataStream::Qt_5_4);
    
        while(client_socket->bytesAvailable())
        {
            if(client_socket->bytesAvailable() < (int)sizeof(quint32))
                return;
    
            int objectSize;
            in >> objectSize;
    
            QByteArray buffer;
            char *temp = new char[objectSize];
            int bufferSize = in.readRawData(temp, objectSize);
            buffer.append(temp, bufferSize);
    
            bool readyForParsing = false;
    
            if(buffer.size() == objectSize)
            {
                readyForParsing = true;
            }
            else if (buffer.size() > objectSize)
            {
                qDebug() << "buffer size!!!";
                readyForParsing = true;
            }
            else
            {
                while (buffer.size() < objectSize) {
                    client_socket->waitForReadyRead(5000);
                    char *temp = new char[objectSize - buffer.size()];
                    int bufferSize = in.readRawData(temp, objectSize - buffer.size());
                    buffer.append(temp, bufferSize);
                    readyForParsing = true;
                }
            }
    
            if(readyForParsing == true)
            {
                QDataStream toParse(&buffer, QIODevice::ReadOnly);
                toParse.setVersion(QDataStream::Qt_5_4);
    
                QString type;
                toParse >> type;
    
                QString ID;
                toParse >> ID;
    
                if (type == "upd")
                {
                    QString updateMessage;
                    toParse >> updateMessage;
                    updateData(updateMessage);
                }
                else if (type == "dbs")
                {
                    toParse >> dbResultString;
                    if (dbResultString == "empty_str")
                    {
                        dbResultString = "";
                    }
                    dbStringReady();
                }
                else if (type == "dbq")
                {
                    toParse >> dbResultList;
                    dbQueryReady();
                }
                else if (type == "dbm")
                {
                    toParse >> dbResultList;
                    dbMoreQueryReady();
                }
                else if (type == "set")
                {
                    toParse >> dbOrderExecuted;
                    dbOrderWasExecuted();
                }
                else if (type == "err")
                {
                    QString sqlError;
                    toParse >> sqlError;
    
                    QMessageBox *error = new QMessageBox();
                    error->setText(sqlError);
                    error->setStandardButtons(QMessageBox::Ok);
                    error->exec();
    
                    dbResultList.clear();
                    dbResultString.clear();
    
                    dbStringReady();
                    dbQueryReady();
                    dbMoreQueryReady();
                    dbOrderWasExecuted();
                }
                else
                {
                    QString input;
                    qDebug() << input;
    
                    dbStringReady();
                    dbQueryReady();
                    dbMoreQueryReady();
                    dbOrderWasExecuted();
                }
            }
        }
    }
    
    void UpdateClient::sendUpdateToServer(QString updateMessage)
    {
        qDebug() << "UpdateClient::sendUpdateToServer("<< updateMessage <<")";
    
        QString temp = updateMessage;
        temp += ";";
        temp += database;
        temp += "#";
    
        QByteArray block;
        QDataStream out(&block, QIODevice::WriteOnly);
        out.setVersion(QDataStream::Qt_5_4);
        out << (quint32)0; //Blocksize
        out << QString("upd"); //Type
        out << clientID.toString(); //ID
        out << temp;
        out.device()->seek(0);
        out << (quint32)(block.size() - sizeof(quint32));
    
        if(client_socket->isOpen()&&client_socket->isEncrypted())
        {
            client_socket->write(block);
            client_socket->flush();
        }
    }
    
    QStringList UpdateClient::getDBquery(QString SQLorder)
    {
        qDebug() << "UpdateClient::getDBquery("<<SQLorder<<")";
    
        if(mySQL==false)
        {
            SQLorder = transferMySQLToAccess(SQLorder);
        }
    
        QByteArray block;
        QDataStream out(&block, QIODevice::WriteOnly);
        out.setVersion(QDataStream::Qt_5_4);
        out << (quint32)0; //Blocksize
        out << QString("dbq"); //Type
        out << clientID.toString(); // ID
        out << SQLorder;
        out.device()->seek(0);
        out << (quint32)(block.size() - sizeof(quint32));
    
        if(client_socket->isOpen()&&client_socket->isEncrypted())
        {
            qDebug() << block.size();
    
            client_socket->write(block);
            client_socket->flush();
        }
    
        QEventLoop *loop = new QEventLoop();
        connect(this, SIGNAL(dbQueryReady()), loop, SLOT(quit()));
        loop->exec();
    
        return dbResultList;
    }
    
    QStringList UpdateClient::getDBMorequery(QString SQLorder, int size)
    {
        qDebug() << "UpdateClient::getDBMorequery("<<SQLorder<<","<<size<<")";
    
        if(mySQL==false)
        {
            SQLorder = transferMySQLToAccess(SQLorder);
        }
    
        //this function is used if more columns are queried, "size" is the number of colums
        QByteArray block;
        QDataStream out(&block, QIODevice::WriteOnly);
        out.setVersion(QDataStream::Qt_5_4);
        out << (quint32)0; //Blocksize
        out << QString("dbm"); //Type
        out << clientID.toString(); // ID
        out << QString::number(size); //Size
        out << SQLorder;
        out.device()->seek(0);
        out << (quint32)(block.size() - sizeof(quint32));
    
        if(client_socket->isOpen()&&client_socket->isEncrypted())
        {
            qDebug() << block.size();
    
            client_socket->write(block);
            client_socket->flush();
        }
    
        QEventLoop *loop = new QEventLoop();
        connect(this, SIGNAL(dbMoreQueryReady()), loop, SLOT(quit()));
        loop->exec();
    
        return dbResultList;
    }
    
    QString UpdateClient::getDBstring(QString SQLorder)
    {
        qDebug() << "UpdateClient::getDBstring("<<SQLorder<<")";
    
        if(mySQL==false)
        {
            SQLorder = transferMySQLToAccess(SQLorder);;
        }
    
        //this function is used if just a single String is queried
        QByteArray block;
        QDataStream out(&block, QIODevice::WriteOnly);
        out.setVersion(QDataStream::Qt_5_4);
        out << (quint32)0; //Blocksize
        out << QString("dbs"); //Type
        out << clientID.toString(); // ID
        out << SQLorder;
        out.device()->seek(0);
        out << (quint32)(block.size() - sizeof(quint32));
    
        if(client_socket->isOpen()&&client_socket->isEncrypted())
        {
            qDebug() << block.size();
    
            client_socket->write(block);
            client_socket->flush();
        }
    
        QEventLoop *loop = new QEventLoop();
        connect(this, SIGNAL(dbStringReady()), loop, SLOT(quit()));
        loop->exec();
    
        return dbResultString;
    }
    
    bool UpdateClient::setDBorder(QString SQLorder)
    {
        qDebug() << "UpdateClient::setDBorder("<<SQLorder<<")";
    
        if(mySQL==false)
        {
            SQLorder = transferMySQLToAccess(SQLorder);
        }
    
        //this function is used for any other regular SQL order (no query!)
        QByteArray block;
        QDataStream out(&block, QIODevice::WriteOnly);
        out.setVersion(QDataStream::Qt_5_4);
        out << (quint32)0; //Blocksize
        out << QString("set"); //Type
        out << clientID.toString(); // ID
        out << SQLorder;
        out.device()->seek(0);
        out << (quint32)(block.size() - sizeof(quint32));
    
        if(client_socket->isOpen()&&client_socket->isEncrypted())
        {
            qDebug() << block.size();
    
            client_socket->write(block);
            client_socket->flush();
        }
    
        QEventLoop *loop = new QEventLoop();
        connect(this, SIGNAL(dbOrderWasExecuted()), loop, SLOT(quit()));
        loop->exec();
    
        return dbOrderExecuted;
    }
    

    On the client side the methos UpdateClient::getDBquery(QString SQLorder), UpdateClient::getDBMorequery(QString SQLorder, int size), UpdateClient::getDBstring(QString SQLorder) are called to query the database via the server. They block via an QEventLooop until tcpready() ist emitted, the answer is successfully parsed and the signal connected to the event loops close() slot is emitted. Furthermore if a client has to send an update he uses the UpdateClient::sendUpdateToServer(QString updateMessage) method which is non blocking.

    If you need more information or code please let me now. I really got stucked here and I dont't know how to resolve that issue.



  • Ok guy thank you for that much replies :P! I solved the issue by myself anyway. In case sombebody has a similar problem here comes the solution:

    What I did wrong was that I connected a time consuming function to the UpdateClient::tcpReady() slot. In this function I did some stuff which also lead to the emission of UpdateClient::tcpReady(). This broke the QEventQueue. So to what I had to chance was to skip the direct signal slot connection an do this in the UpdateClient::tcpReady():

    QTimer::singleShot(0, this, SIGNAL(updateAvailable()));
    

    No everything is working fine ;).



Looks like your connection to Qt Forum was lost, please wait while we try to reconnect.