How to pass data via QThreads correctly?
-
Recently I write little project. I use QTcpServer which creates QTcpSockets in new QThreads after each new incomming connection. The QTcpSocket instance, next I will name it 'ConnectionEntry'. Connection entry should request from the database some info via signal/slot connection. The database it is the separate essence which running in it own QThread. So, I create the slot for the database and the signal for the connection entry.
Where I should connect database slot and connection entry signal?
I'm connecting it in QTcpServer when I create the instance of ConnectionEntry. Everything works but I get next message in console:
QObject: Cannot create children for a parent that is in a different thread. (Parent is QNativeSocketEngine(0x7f1070003290), parent's thread is CConnectionEntry(0x5631beb596f0), current thread is QThread(0x5631beb3bfd0)
It's little bit annoying me. Please, help if you know where I should connect these essences or which essences I should make parent for another.
Thank you in advance!
P.S. You can found some code below if you need ;)
CConnectionCtrl (Server)
class CConnectionCtrl : public QTcpServer , public IServiceBase , public IConnectionService { Q_OBJECT public: explicit CConnectionCtrl(QObject* parent = nullptr); virtual void incomingConnection(qintptr handle) override final; ... ... ... virtual void init() override final; ... ... ... private: bool mbIsServiceAvailable; int mCountOfPendingConnections; unsigned int mPort; unsigned int mMaxConnectionCount; IDatabaseBroker* mDatabaseBrokerPtr; std::unordered_map<qintptr, CConnectionEntry*> mActiveConnectionsList; } CConnectionCtrl::CConnectionCtrl(QObject* parent) : QTcpServer(parent) , mbIsServiceAvailable(false) , mCountOfPendingConnections(0) , mPort(13666) , mMaxConnectionCount(15) { FNC_MSG(); } void CConnectionCtrl::init() { qRegisterMetaType<myDatabase::SRequestsQueueEntry>("myDatabase::SRequestsQueueEntry"); setMaxPendingConnections(mMaxConnectionCount); if ( listen(QHostAddress::Any, mPort) ) { DBG_MSG("Server is listening on PORT: [%d], max users count is [%d]", mPort, mMaxConnectionCount); mbIsServiceAvailable = true; } else { ERR_MSG("Server can not listen!"); mbIsServiceAvailable = false; } } void CConnectionCtrl::incomingConnection(qintptr handle) { ++mCountOfPendingConnections; DBG_MSG("Detected new connection, handle ID: [%d], Current connection count: [%d]", handle, mCountOfPendingConnections); CConnectionEntry *connectionEntry = new CConnectionEntry(handle, mDatabaseBrokerPtr, this); connect(connectionEntry, SIGNAL(finished()), connectionEntry, SLOT(deleteLater())); connect(connectionEntry, SIGNAL(clientIsDisconnected(int)), this, SLOT(disconected(int)) ); connect(connectionEntry, SIGNAL(passRequestToBroker(myDatabase::SRequestsQueueEntry)), mDatabaseBrokerPtr, SLOT(addRequest(myDatabase::SRequestsQueueEntry))); connectionEntry->start(); connectionEntry->setParent(this); mActiveConnectionsList.insert(std::pair<qintptr, CConnectionEntry*>(handle, connectionEntry)); }
CConnectionEntry (QTcp socket created by server)
class IConnectionEntry : public QThread { Q_OBJECT public: explicit IConnectionEntry(QObject* parent = nullptr) : QThread(parent) { } public slots: virtual void sendReply(const SReply& replyStruct) = 0; }; class CConnectionEntry : public IConnectionEntry { Q_OBJECT public: explicit CConnectionEntry(const qintptr& handleID, IDatabaseBroker* brokerPtr, QObject* parent = 0); public slots: virtual void sendReply(const SReply& replyStruct) override final; void readyRead(); protected: void run(); signals: void passRequestToBroker(const myDatabase::SRequestsQueueEntry& request); ... ... ... private: QTcpSocket* mSocketPtr; QDataStream mDataStreamIn; QDataStream mDataStreamOut; QByteArray mSendedDataBlock; qintptr mHandleID; IDatabaseBroker* mDatabaseBrokerPtr; }; CConnectionEntry::CConnectionEntry(const qintptr& handleID, IDatabaseBroker* brokerPtr, QObject* parent) : IConnectionEntry(parent) , mHandleID(handleID) , mDatabaseBrokerPtr(brokerPtr) , mDataStreamOut(&mSendedDataBlock, QIODevice::WriteOnly) { } void CConnectionEntry::sendReply(const SReply& replyStruct) { //Some code with QTcpSoket } void CConnectionEntry::readyRead() { ... ... ... if (isRecivedStructValid) { DBG_MSG("Handle ID: [%d] Recived data is valid. Request: [%s]", mHandleID, toString(recivedStruct.requestName).c_str()); emit passRequestToBroker(myDatabase::SRequestsQueueEntry(recivedStruct, eProcessingPriority::PRIORITY_MIDDLE, this)); } ... ... ... } void CConnectionEntry::run() { DBG_MSG("Handle ID: [%d] Start working thread.", mHandleID); mSocketPtr = new QTcpSocket(); if(!mSocketPtr->setSocketDescriptor(this->mHandleID)) { emit error(mSocketPtr->error()); return; } connect(mSocketPtr, SIGNAL(readyRead()), this, SLOT(readyRead()), Qt::DirectConnection); connect(mSocketPtr, SIGNAL(disconnected()), this, SLOT(disconnected()), Qt::DirectConnection); mDataStreamIn.setDevice(mSocketPtr); mDataStreamOut.setDevice(mSocketPtr); mDataStreamIn.setVersion(QDataStream::Qt_4_0); mDataStreamOut.setVersion(QDataStream::Qt_4_0); DBG_MSG("Handle ID: [%d] Client is connected.", mHandleID); exec(); }
CDatabaseBroker (This is the database which I described)
class IDatabaseBroker : public QThread { Q_OBJECT public: explicit IDatabaseBroker(QObject* parent = nullptr) : QThread(parent) { } public slots: virtual void addRequest(const myDatabase::SRequestsQueueEntry& request) = 0; }; class CDatabaseBroker : public IDatabaseBroker , public IServiceBase , public IDatabaseClient , public IConnectionClient { Q_OBJECT ... ... ... public slots: virtual void addRequest(const myDatabase::SRequestsQueueEntry& request) override final; ... ... ... private: void onRequestIsProcessed(const SReply& reply); ... ... ... signals: void replyOnRequest(const SReply&); private: ... ... ... QMutex mThreadSaver; bool mbIsBrockerAvailable; IDatabaseService* mDatabasePtr; IConnectionService* mConnectionPtr; std::priority_queue<SRequestsQueueEntry, std::vector<SRequestsQueueEntry>, CPriorityPairComparision> mRequestsQueue; }; void CDatabaseBroker::addRequest(const myDatabase::SRequestsQueueEntry& request) { mThreadSaver.lock(); DBG_MSG("Adding new request [%s] with priority [%s].", toString(request.request.requestName).c_str(), toString(request.priority).c_str()); if (mbIsBrockerAvailable) { mRequestsQueue.push(request); } mThreadSaver.unlock(); } void CDatabaseBroker::onRequestIsProcessed(const SReply& reply) { IConnectionEntry* receiverPtr = mRequestsQueue.top().responseReceiverPtr; if (nullptr != receiverPtr) { connect(this, SIGNAL(replyOnRequest(SReply)), receiverPtr, SLOT(sendReply(SReply))); emit replyOnRequest(reply); disconnect(this, SIGNAL(replyOnRequest(SReply)), receiverPtr, SLOT(sendReply(SReply))); } else { WRG_MSG("nullptr == receiverPtr!"); } mRequestsQueue.pop(); }
Also, if somebody knows the way to emit signal for specific client (if I have it pointer on this client) please tell in comments. Because now I connect/disconnect broker with connection entry each time when I want to sent message to certain connection entry.