Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

How to pass data via QThreads correctly?



  • Recently I write little project. I use QTcpServer which creates QTcpSockets in new QThreads after each new incomming connection. The QTcpSocket instance, next I will name it 'ConnectionEntry'. Connection entry should request from the database some info via signal/slot connection. The database it is the separate essence which running in it own QThread. So, I create the slot for the database and the signal for the connection entry.

    Where I should connect database slot and connection entry signal?

    I'm connecting it in QTcpServer when I create the instance of ConnectionEntry. Everything works but I get next message in console:

    QObject: Cannot create children for a parent that is in a different thread. (Parent is QNativeSocketEngine(0x7f1070003290), parent's thread is CConnectionEntry(0x5631beb596f0), current thread is QThread(0x5631beb3bfd0)

    It's little bit annoying me. Please, help if you know where I should connect these essences or which essences I should make parent for another.

    Thank you in advance!

    P.S. You can found some code below if you need ;)

    CConnectionCtrl (Server)

    class CConnectionCtrl : public QTcpServer
                          , public IServiceBase
                          , public IConnectionService
    {
       Q_OBJECT
    
       public:
    
          explicit CConnectionCtrl(QObject* parent = nullptr);
    
          virtual void incomingConnection(qintptr handle) override final;
    
          ... ... ...
    
          virtual void init() override final;
    
          ... ... ...
    
       private:
    
          bool             mbIsServiceAvailable;
          int              mCountOfPendingConnections;
          unsigned int     mPort;
          unsigned int     mMaxConnectionCount;
          IDatabaseBroker* mDatabaseBrokerPtr;
          std::unordered_map<qintptr, CConnectionEntry*> mActiveConnectionsList;
    }
    
    CConnectionCtrl::CConnectionCtrl(QObject* parent)
    : QTcpServer(parent)
    , mbIsServiceAvailable(false)
    , mCountOfPendingConnections(0)
    , mPort(13666)
    , mMaxConnectionCount(15)
    {
       FNC_MSG();
    }
    
    void CConnectionCtrl::init()
    {
       qRegisterMetaType<myDatabase::SRequestsQueueEntry>("myDatabase::SRequestsQueueEntry");
    
       setMaxPendingConnections(mMaxConnectionCount);
       if ( listen(QHostAddress::Any, mPort) )
       {
          DBG_MSG("Server is listening on PORT: [%d], max users count is [%d]", mPort, mMaxConnectionCount);
          mbIsServiceAvailable = true;
       }
       else
       {
          ERR_MSG("Server can not listen!");
          mbIsServiceAvailable = false;
       }
    }
    
    void CConnectionCtrl::incomingConnection(qintptr handle)
    {
       ++mCountOfPendingConnections;
       DBG_MSG("Detected new connection, handle ID: [%d], Current connection count: [%d]", handle, mCountOfPendingConnections);
       CConnectionEntry *connectionEntry = new CConnectionEntry(handle, mDatabaseBrokerPtr, this);
       connect(connectionEntry, SIGNAL(finished()), connectionEntry, SLOT(deleteLater()));
       connect(connectionEntry, SIGNAL(clientIsDisconnected(int)), this, SLOT(disconected(int)) );
       connect(connectionEntry, SIGNAL(passRequestToBroker(myDatabase::SRequestsQueueEntry)), mDatabaseBrokerPtr, SLOT(addRequest(myDatabase::SRequestsQueueEntry)));
       connectionEntry->start();
       connectionEntry->setParent(this);
       mActiveConnectionsList.insert(std::pair<qintptr, CConnectionEntry*>(handle, connectionEntry));
    }
    

    CConnectionEntry (QTcp socket created by server)

    class IConnectionEntry : public QThread
    {
       Q_OBJECT
    
       public:
    
          explicit IConnectionEntry(QObject* parent = nullptr) : QThread(parent) { }
    
       public slots:
    
          virtual void sendReply(const SReply& replyStruct) = 0;
    };
    
    class CConnectionEntry : public IConnectionEntry
    {
       Q_OBJECT
    
       public:
    
          explicit CConnectionEntry(const qintptr& handleID, IDatabaseBroker* brokerPtr, QObject* parent = 0);
    
       public slots:
    
          virtual void sendReply(const SReply& replyStruct) override final;
          void readyRead();
    
       protected:
    
          void run();
    
       signals:
    
    
          void passRequestToBroker(const myDatabase::SRequestsQueueEntry& request);
    
          ... ... ...
    
       private:
    
           QTcpSocket*      mSocketPtr;
           QDataStream      mDataStreamIn;
           QDataStream      mDataStreamOut;
           QByteArray       mSendedDataBlock;
           qintptr          mHandleID;
           IDatabaseBroker* mDatabaseBrokerPtr;
    };
    
    CConnectionEntry::CConnectionEntry(const qintptr& handleID, IDatabaseBroker* brokerPtr, QObject* parent)
    : IConnectionEntry(parent)
    , mHandleID(handleID)
    , mDatabaseBrokerPtr(brokerPtr)
    , mDataStreamOut(&mSendedDataBlock, QIODevice::WriteOnly)
    {
    
    }
    
    void CConnectionEntry::sendReply(const SReply& replyStruct)
    {
       //Some code with QTcpSoket
    }
    
    void CConnectionEntry::readyRead()
    {
       ... ... ...
    
       if (isRecivedStructValid)
       {
          DBG_MSG("Handle ID: [%d] Recived data is valid. Request: [%s]", mHandleID, toString(recivedStruct.requestName).c_str());
    
          emit passRequestToBroker(myDatabase::SRequestsQueueEntry(recivedStruct, eProcessingPriority::PRIORITY_MIDDLE, this));
       }
    
       ... ... ...
    }
    
    void CConnectionEntry::run()
    {
        DBG_MSG("Handle ID: [%d] Start working thread.", mHandleID);
    
        mSocketPtr = new QTcpSocket();
        if(!mSocketPtr->setSocketDescriptor(this->mHandleID))
        {
            emit error(mSocketPtr->error());
            return;
        }
    
        connect(mSocketPtr, SIGNAL(readyRead()), this, SLOT(readyRead()), Qt::DirectConnection);
        connect(mSocketPtr, SIGNAL(disconnected()), this, SLOT(disconnected()), Qt::DirectConnection);
    
        mDataStreamIn.setDevice(mSocketPtr);
        mDataStreamOut.setDevice(mSocketPtr);
        mDataStreamIn.setVersion(QDataStream::Qt_4_0);
        mDataStreamOut.setVersion(QDataStream::Qt_4_0);
    
        DBG_MSG("Handle ID: [%d] Client is connected.", mHandleID);
    
        exec();
    }
    

    CDatabaseBroker (This is the database which I described)

    class IDatabaseBroker : public QThread
    {
       Q_OBJECT
    
       public:
    
          explicit IDatabaseBroker(QObject* parent = nullptr) : QThread(parent) { }
    
       public slots:
    
          virtual void addRequest(const myDatabase::SRequestsQueueEntry& request) = 0;
    };
    
    class CDatabaseBroker : public IDatabaseBroker
                          , public IServiceBase
                          , public IDatabaseClient
                          , public IConnectionClient
    {
       Q_OBJECT
    
          ... ... ...
    
       public slots:
    
          virtual void addRequest(const myDatabase::SRequestsQueueEntry& request) override final;
    
          ... ... ...
    
       private:
    
          void onRequestIsProcessed(const SReply& reply);
    
          ... ... ...
    
       signals:
    
          void replyOnRequest(const SReply&);
    
       private:
    
          ... ... ...
    
          QMutex              mThreadSaver;
          bool                mbIsBrockerAvailable;
          IDatabaseService*   mDatabasePtr;
          IConnectionService* mConnectionPtr;
          std::priority_queue<SRequestsQueueEntry, std::vector<SRequestsQueueEntry>, CPriorityPairComparision> mRequestsQueue;
    };
    
    void CDatabaseBroker::addRequest(const myDatabase::SRequestsQueueEntry& request)
    {
       mThreadSaver.lock();
    
       DBG_MSG("Adding new request [%s] with priority [%s].", toString(request.request.requestName).c_str(), toString(request.priority).c_str());
    
       if (mbIsBrockerAvailable)
       {
    
          mRequestsQueue.push(request);
       }
    
       mThreadSaver.unlock();
    }
    
    void CDatabaseBroker::onRequestIsProcessed(const SReply& reply)
    {
       IConnectionEntry* receiverPtr = mRequestsQueue.top().responseReceiverPtr;
    
       if (nullptr != receiverPtr)
       {
          connect(this, SIGNAL(replyOnRequest(SReply)), receiverPtr, SLOT(sendReply(SReply)));
          emit replyOnRequest(reply);
          disconnect(this, SIGNAL(replyOnRequest(SReply)), receiverPtr, SLOT(sendReply(SReply)));
       }
       else
       {
          WRG_MSG("nullptr == receiverPtr!");
       }
    
       mRequestsQueue.pop();
    }
    

    Also, if somebody knows the way to emit signal for specific client (if I have it pointer on this client) please tell in comments. Because now I connect/disconnect broker with connection entry each time when I want to sent message to certain connection entry.


Log in to reply