QTcpServer, QTcpSocket and LOST packets!
-
Hey
This was a fun week of screams, I've finally narrowed it down to... well I have no idea what to be honest. So maybe some1 can enlighten me here.
I have a server that creates connection in worker thread, I also have a client-socket spawner that makes 400 sockets in 40 threads and bombards the server with connections.Now the good news is that server accepts all 400 connections, the bad news is server mises 10-20% of messages from those connections! ; O !!
Here is the server :
#include "QThread" #include "QApplication" #include "QTcpServer" #include "QTcpSocket" #include "QHostAddress" #include "QWidget" class myServerSocket : public QTcpSocket { Q_OBJECT public: myServerSocket(QThread *workerThread); ~myServerSocket(); public Q_SLOTS: void handleAvailableData(); Q_SIGNALS: void newData(myServerSocket *senderPtr, int dataSize); }; myServerSocket::myServerSocket(QThread *workerThread) : QTcpSocket(nullptr) { moveToThread(workerThread); connect(this, &QTcpSocket::readyRead, this, &myServerSocket::handleAvailableData); } myServerSocket::~myServerSocket() { } void myServerSocket::handleAvailableData() { auto data = readAll(); Q_EMIT newData(this, data.size()); if (bytesAvailable() > 0) { QMetaObject::invokeMethod(this, [this]() { handleAvailableData(); }, Qt::QueuedConnection); } } class myServer : public QTcpServer { Q_OBJECT QThread *mClientWorker; std::atomic<unsigned int> mConCount; protected: void incomingConnection(qintptr handle) override; public: myServer(); ~myServer(); public Q_SLOTS: void handleItemData(myServerSocket *client, int dataSize); void handleItemDeleted(QObject *itemPtr); }; myServer::myServer() { mClientWorker = new QThread(); mClientWorker->setObjectName("clientWorker"); mClientWorker->start(); listen(QHostAddress("localhost"), 40000); } myServer::~myServer() { } void myServer::handleItemData(myServerSocket *client, int dataSize) { qDebug() << QThread::currentThread << "Got data : " << dataSize << client; } void myServer::incomingConnection(qintptr handle) { auto myClient = new myServerSocket(mClientWorker); myClient->setSocketOption(QAbstractSocket::KeepAliveOption, true); QMetaObject::invokeMethod(myClient, [&, cl = myClient, ptr = handle]() { cl->setSocketDescriptor(ptr); }, Qt::QueuedConnection); mConCount++; qDebug() << "Got new con! : " << mConCount; connect(myClient, &myServerSocket::newData, this, &myServer::handleItemData, Qt::DirectConnection); // we want to emit in client thread for now connect(myClient, &myServerSocket::destroyed, this, &myServer::handleItemDeleted, Qt::QueuedConnection); connect(myClient, &myServerSocket::disconnected, myClient, &myServerSocket::deleteLater, Qt::QueuedConnection); } void myServer::handleItemDeleted(QObject *itemPtr) { mConCount--; qDebug() << "Losing client : " << mConCount; } int main(int argc, char *argv[]) { auto app = QApplication(argc, argv); myServer s; QWidget w; w.show(); auto ax = app.exec(); return ax; }
Here is Client bombardier :
#include <QThreadPool> #include <QtConcurrent/QtConcurrent> #include "QThread" #include "QApplication" #include "QTcpServer" #include "QTcpSocket" #include "QHostAddress" class myClient : public QTcpSocket { Q_OBJECT QString ip; unsigned int port; unsigned int clientId; unsigned int workerId; public: myClient(const QString &i, const unsigned int p, const unsigned int cId, const unsigned int wId, QObject *parent); ~myClient(); void sendData(); }; myClient::myClient(const QString &i, const unsigned int p, const unsigned int cId, const unsigned int wId, QObject *parent) : QTcpSocket(parent) { ip = i; port = p; clientId = cId; workerId = wId; } myClient::~myClient() { } void myClient::sendData() { QMetaObject::invokeMethod(this, [this]() { /// send to client thread. connectToHost(ip, port); waitForConnected(); qDebug() << QThread::currentThread() << clientId << workerId << "Connecting"; if (state() == QTcpSocket::ConnectedState) { QByteArray data; data.resize(724); auto w = write(data); flush(); qDebug() << clientId << workerId << "I send : " << w << " Data"; } else { qWarning() << clientId << workerId << "Could not send data as I didnt connect..."; } }, Qt::QueuedConnection); } int main(int argc, char *argv[]) { auto app = QApplication(argc, argv); std::atomic<unsigned int> maxProcess = 40; const unsigned int maxClient = 10; const unsigned int aliveTime = 5 * 1000;// 5 seconds; QThreadPool pool; pool.setMaxThreadCount(maxProcess); std::atomic<unsigned int> currentCount; while (true) { #pragma omp parallel for for (int ca = 0; ca < maxProcess; ca++) { if (maxProcess > currentCount) { QtConcurrent::run(&pool, [&]() { currentCount++; QThread *myWorker = new QThread(); myWorker->setObjectName("Worker id : " + QString::number(currentCount)); QObject *parent = new QObject(); myWorker->start(); std::vector<myClient *> clients(maxClient); for (int x = 0; x < maxClient; ++x) { auto c = new myClient("localhost", 40000, x, currentCount, parent); clients[x] = c; } parent->moveToThread(myWorker); app.processEvents(); QThread::msleep(500); app.processEvents();; #pragma omp parallel for for (int x = 0; x < maxClient; ++x) { clients[x]->sendData(); } unsigned int wait = 0; while (true) { QThread::msleep(1); app.processEvents(); wait++; if (wait > aliveTime) { break; } } for (auto &client:clients) { client->deleteLater(); } parent->deleteLater(); myWorker->quit(); myWorker->wait(); myWorker->deleteLater(); currentCount--; } ); } } app.processEvents(); } auto ax = app.exec(); return ax; }
The question is... how do I not lose the messages? :D I have a "massive" app that relies on a similar system and over there out of 1200 connections/messages I get around ... 20 messages through...! Painful week, can any1 shed information on where I'm doing a mistake here?
I've also tried delaying message send on the client side, say send 3s after connection, still the same issue! :- (((
The way I test it is I run the server/client until clients start to disconnect, then I pause the app, and run a search over the output. Usually searching for
got new c
will yeld 400 results, but searching forGot data
will be less than 400, thus I have message loss.TIA!
Uuuu a side note, when I run serverClients in "main" thread, and not my worker thread, I did not lose messages, but in worker thread I lose them ! :- (
-
I may have a breakthrough... need to test more... but it looks like settings
connect(myClient, &myServerSocket::newData, this, &myServer::handleItemData, Qt::DirectConnection); // we want to emit in client thread for now connect(myClient, &myServerSocket::destroyed, this, &myServer::handleItemDeleted, Qt::QueuedConnection); connect(myClient, &myServerSocket::disconnected, myClient, &myServerSocket::deleteLater, Qt::QueuedConnection);
Before setting socketDescriptor... fixed the "lost" packages... need to test more, but maybe ?
Hmmm, does it matter on which thread I create those connections?
-
How do you check you are losing messages?
P.S.
#pragma omp parallel for
should nowadays be replaced withstd::for_each(std::execution::par
- you are leaking objects (
parent
for example) - You are making a mess with threads, mixing concurrent with QThread for no real reason tbh.
- The repository attached to this example might be of help to you. It looks like you are overcomplicating things. You can also check the commonlib branch of the repo for a more advanced example
-
@VRonin When I see disconnections start(as all 400 die at the same time) I pause app and run a search string over the output log.
I then can see that I have 400 connections & 400> messages. So I know I lost packets as I should have 400 mesages/400 connections.
#pragma omp parallel for should nowadays be replaced with std::for_each(std::execution::par
woa! Will dig in to that, thanks!you are leaking objects (parent for example)
Parent gets deleted in client at the end. So no leak there.for (auto &client:clients) { client->deleteLater(); } parent->deleteLater(); /// < here myWorker->quit(); myWorker->wait(); myWorker->deleteLater();
You are making a mess with threads, mixing concurrent with QThread for no real reason tbh.
In client ? Yeah... its a bit of a "hacky" method. But I added#pragma omp parallel for for (int ca = 0; ca < maxProcess; ca++) { if (maxProcess > currentCount) {
5 min before posting the topic and I didn't realize I can drop concurrent, still should not cause "real" issue, as there are no warning from Qt and in my other app I'm loosing messages without the concurent bananza.
The repository attached to this example might be of help to you. It looks like you are overcomplicating things. You can also check the commonlib branch of the repo for a more advanced example
Sweet thanks, will check it out!Again, odd thing is that if I run all my serverClients in main thread = no packet loss, if I use my worker thred = packet loss... Its as if worker thread is somewhat leaking data.
TIA!
-
@VRonin said in QTcpServer, QTcpSocket and LOST packets!:
@Dariusz said in QTcpServer, QTcpSocket and LOST packets!:
400 mesages
By "messages" you mean
qDebug() << QThread::currentThread << "Got data : " << dataSize << client;
?Yep I run search in my ide over console log for "Got Data"... I kinda expect them to get printed... like... yea... if that does not print then I have a problem :D
clientWorker here is the client socket worker I move it in to. While in it there is packet loss.
-
I may have a breakthrough... need to test more... but it looks like settings
connect(myClient, &myServerSocket::newData, this, &myServer::handleItemData, Qt::DirectConnection); // we want to emit in client thread for now connect(myClient, &myServerSocket::destroyed, this, &myServer::handleItemDeleted, Qt::QueuedConnection); connect(myClient, &myServerSocket::disconnected, myClient, &myServerSocket::deleteLater, Qt::QueuedConnection);
Before setting socketDescriptor... fixed the "lost" packages... need to test more, but maybe ?
Hmmm, does it matter on which thread I create those connections?
-
@VRonin yes thats right. But the example here is simplified to the bone to track down that evil issue...
Currently my server in release mode with profiler enable pushes around 3000 connections in 592ms, so 6k per second +/-... I've no idea if thats good or bad, But looking at profiler >
The top green are new connection handlers, all it does is grab a handle > pushes it to vector. Its about 0.001 ms
the second row is the creation of objects for each handle, which is done in worker and the "tall" one are all the pragma omp helping multithread different parts of app.
Seems pretty fast given that in that time >- accept connections,
- push them to monitor for display to show that there are new connections - this is batched too
- Set connections details from packed I received
- Validate connection credentials
some more work.
Yeah, I'm amazed o.o, but I have no idea if thats good/bad performance-wise. Learning networking here lol. Possibly 360k Connections /validations/ displaying per minute ? I can't spawn enough test connections per second to test how much can I push lol.