Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

QMqttClient - connection not made from another thread



  • I have create a simple mqttclient in Qt which is trying to connect to MqttBroker. If i dont use another thread and calls everything from the same thread than it works fine.

    However, when I create another thread in main.cpp and try to run mqttclient from it. It does not make connection with mqttbroker but it shows its connecting and I have another ```
    error / warning "QSocketNotifier: Socket notifiers cannot be enabled or disabled from another thread". I need to run mqttclient in another thread.
    main.cpp

    int main(int argc, char *argv[])
    {
        QCoreApplication::setAttribute(Qt::AA_EnableHighDpiScaling);
    
        QGuiApplication app(argc, argv);
    
        QQmlApplicationEngine engine;
    
        QThread mqttclientThread;
    
        QObject::connect(&app, &QCoreApplication::aboutToQuit, &mqttclientThread, &QThread::quit);
    
        MqttClient * mqttclientObj = new MqttClient;
    
        mqttclientObj->moveToThread(&mqttclientThread);
    
        QObject::connect(&mqttclientThread, &QThread::started, mqttclientObj, &MqttClient::Init);
        mqttclientThread.start();
    
    
        engine.rootContext()->setContextProperty("myModel", mqttclientObj);
    
        engine.load(QUrl(QStringLiteral("qrc:/main.qml")));
    
    
    
        if (engine.rootObjects().isEmpty())
            return -1;
    
        return app.exec();
    
    }
    

    mqttclient.cpp

    #include "mqttclient.h"
    #include <QDebug>
    
    MqttClient::MqttClient(QObject *parent)
        : QMqttClient(parent)
    {
    }
    
    MqttSubscription* MqttClient::subscribe(const QString &topic)
    {
        auto sub = QMqttClient::subscribe(topic, 0);
        auto result = new MqttSubscription(sub, this);
        return result;
    }
    
    void MqttClient::Init()
    {
        this->setHostname("127.0.0.1");
        this->setPort(1883);
        this->connectToHost();
    }
    
    MqttSubscription::MqttSubscription(QMqttSubscription *s, MqttClient *c)
        : sub(s)
        , client(c)
    {
        connect(sub, &QMqttSubscription::messageReceived, this, &MqttSubscription::handleMessage);
        m_topic = sub->topic();
    }
    
    MqttSubscription::~MqttSubscription()
    {
    }
    
    void MqttSubscription::handleMessage(const QMqttMessage &qmsg)
    {
       // emit messageReceived(qmsg.payload());
    
        qDebug() << "value recieved " << qmsg.payload();
    
        emit sensorValueChanged(qmsg.payload());
    }
    


  • Update --- Problem is surely with the connection because I updated the classes and results i got confirms them.

    I think from qt documentation
    in Qmqttclient state 0 refers to disconnected , 1 refers to connecting and 2 for connected and the test below also confirms that .

    main.cpp

    int main(int argc, char *argv[])
    {
        QCoreApplication::setAttribute(Qt::AA_EnableHighDpiScaling);
    
        QGuiApplication app(argc, argv);
    
    
        QThread thread;
    
        QObject::connect(&app, &QCoreApplication::aboutToQuit, &thread, &QThread::quit);
    
        MqttClient *client =  new MqttClient();
    
        client->Init();
    /*
        client->moveToThread(&thread);
    
        QObject::connect(&thread, &QThread::started, client, &MqttClient::Init);
        thread.start();
    */
    
        QTimer timer;
        QObject::connect(&timer, &QTimer::timeout, [client]() {client->subscribe("/root/temp"); qDebug() << "Thread timer " <<QThread::currentThread();});
        timer.setSingleShot(true);
        timer.start(5000);
    
         qDebug() << client->state();
    
         qDebug() << "Thread main " <<QThread::currentThread();
    
        QQmlApplicationEngine engine;
    
        engine.rootContext()->setContextProperty("myModel",client);
    
        engine.load(QUrl(QStringLiteral("qrc:/main.qml")));
    
    
        if (engine.rootObjects().isEmpty())
            return -1;
    
        return app.exec();
    
    }
    

    mqttclient.cpp

    MqttClient::MqttClient(QObject *parent)
        : QMqttClient(parent)
    {
    }
    
    MqttSubscription* MqttClient::subscribe(const QString &topic)
    {
        qDebug() << "Thread subscribe() " <<this->thread();
        qDebug() << "this state" << this->state();
    
        auto sub = QMqttClient::subscribe(topic, 0);
        auto result = new MqttSubscription(sub, this);
    
        return result;
    }
    
    void MqttClient::Init()
    {
        this->setHostname("127.0.0.1");
        this->setPort(1883);
        this->connectToHost();
    
        connectionThread = QThread::currentThread();
    
        qDebug() << "Thread init() " <<QThread::currentThread();
    }
    
    void MqttClient::handleMessage(const QMqttMessage &qmsg)
    {
        emit MqttClient::sensorValueChanged(qmsg.payload(),qmsg.topic().name());
        qDebug() << qmsg.payload();
    }
    
    MqttSubscription::MqttSubscription(QMqttSubscription *s, MqttClient *c)
        : sub(s),
          client(c)
    {
        qDebug() << "Thread subscription() " <<QThread::currentThread();
        qDebug() << "Thread client " << client->thread();
        qDebug() << "client state" << client->state();
    
    
        connect(sub, &QMqttSubscription::messageReceived, client, &MqttClient::handleMessage);
    }
    
    MqttSubscription::~MqttSubscription()
    {
    }
    

    The Result when running in one thread

    Thread init()  QThread(0x7f883ec02df0)
    1
    Thread main  QThread(0x7f883ec02df0)
    Thread subscribe()  QThread(0x7f883ec02df0)
    this state 2
    Thread subscription()  QThread(0x7f883ec02df0)
    Thread client  QThread(0x7f883ec02df0)
    client state 2
    Thread timer  QThread(0x7f883ec02df0)
    

    The Result when running in another thread

    0
    Thread main  QThread(0x7fc0eae011c0)
    Thread init()  QThread(0x7ffee4715a10)
    QSocketNotifier: Socket notifiers cannot be enabled or disabled from another thread
    Thread subscribe()  QThread(0x7ffee4715a10)
    this state 1
    Thread subscription()  QThread(0x7fc0eae011c0)
    Thread client  QThread(0x7ffee4715a10)
    client state 1
    QObject::connect(QMqttSubscription, MqttClient): invalid null parameter
    Thread timer  QThread(0x7fc0eae011c0)
    

    **So how to make mqtt connection from another / seperate thread.

    Does it has to do something with Qsocket ?**


  • Lifetime Qt Champion

    Hi,

    I haven't used nor read the sources of that module but one thing I would do is trigger the init method from the other thread.



  • @SGaist Thank you for the reply. the results above mentioned "when running in another thread are obtained" via commenting and uncommenting this part. I think i forgot to explicitly mention that thing.

    The "The Result when running in another thread" are obtained when main.cpp looks like

    // client->Init();
    
     client->moveToThread(&thread);
    
     QObject::connect(&thread, &QThread::started, client, &MqttClient::Init);
     thread.start();
    

    Rest the of the files remains the same. I dont know why mqttclient is not able to make connection. I have seen on the mqttbroker terminal it makes an attempt to make the connection but cannot establish it.


  • Lifetime Qt Champion

    Then lets try something else. Don't make your MqttClient inherit from QMqttClient, only from QObject and have a QMqttClient member variable in that class and instantiate it in your init method.



  • This post is deleted!


  • This post is deleted!


  • @SGaist

    Thank You very much for your advice. it works , However i have few question

    1. Why inherit from Qmqttclient doesnt work as class memeber works.
    2. How do i connect (handle message) slot from mqttclient class

    Currently I am using slot (handle message) from mqttsubscription it works. But for later stage i need from mqttclient.

    I have created a small handler file which is taking care of mqttclient subscribe function when running in thread.

    mqttclient.cpp
    MqttClient::MqttClient(QObject *parent) : QObject(parent)
    {
    }
    
    MqttSubscription* MqttClient::subscribe(const QString &topic)
    {
        qDebug() << "Thread subscribe() " <<m_QmqttClient->thread();
        qDebug() << "this state" << m_QmqttClient->state();
    
      //  auto sub = QMqttClient::subscribe(topic, 0);
        auto sub = m_QmqttClient->subscribe(topic, 0);
    
        auto result = new MqttSubscription(sub, m_QmqttClient);
    
        return result;
    }
    
    void MqttClient::Init()
    {
        m_QmqttClient = new QMqttClient();
    
        m_QmqttClient->setHostname("127.0.0.1");
        m_QmqttClient->setPort(1883);
        m_QmqttClient->connectToHost();
    
        connectionThread = QThread::currentThread();
    
        qDebug() << "Thread init() " <<QThread::currentThread();
    }
    
    void MqttClient::handleMessage(const QMqttMessage &qmsg)
    {
        emit MqttClient::sensorValueChanged(qmsg.payload(),qmsg.topic().name());
        qDebug() << qmsg.payload();
    }
    
    MqttSubscription::MqttSubscription(QMqttSubscription *s, QMqttClient *c)
        : sub(s),
          client(c)
    {
        qDebug() << "Thread subscription() " <<QThread::currentThread();
        qDebug() << "Thread client " << client->thread();
        qDebug() << "client state" << client->state();
    
    
        connect(sub, &QMqttSubscription::messageReceived, this, &MqttSubscription::handleMessage);
    
    
    }
    
    MqttSubscription::~MqttSubscription()
    {
    }
    
    void MqttSubscription::handleMessage(const QMqttMessage &qmsg)
    {
        qDebug() << qmsg.payload();
    }
    

    main.cpp

    int main(int argc, char *argv[])
    {
        QCoreApplication::setAttribute(Qt::AA_EnableHighDpiScaling);
    
        QGuiApplication app(argc, argv);
    
    
        QThread thread;
    
        QObject::connect(&app, &QCoreApplication::aboutToQuit, &thread, &QThread::quit);
    
        MqttClient *client =  new MqttClient();
    
      //  client->Init();
    
       client->moveToThread(&thread);
    
       QObject::connect(&thread, &QThread::started, client, &MqttClient::Init);
       thread.start();
    
       // QtConcurrent::run([client](){client->Init();});
    
       handler hanlerobj;
    
       QObject::connect(&hanlerobj,&handler::handlersub_signal,client,&MqttClient::subscribe);
    
    
        QTimer timer;
        QObject::connect(&timer, &QTimer::timeout, [&hanlerobj]() {
            //client->subscribe("/root/temp");
            hanlerobj.handlersub_slot("/root/temp");
            qDebug() << "Thread timer " <<QThread::currentThread();});
        timer.setSingleShot(true);
        timer.start(500);
    
    
        // qDebug() << client->state();
    
         qDebug() << "Thread main " <<QThread::currentThread();
    
        QQmlApplicationEngine engine;
    
        engine.rootContext()->setContextProperty("myModel",client);
    
        engine.load(QUrl(QStringLiteral("qrc:/main.qml")));
    
    
        if (engine.rootObjects().isEmpty())
            return -1;
    
        return app.exec();
    
    }
    

    results

    Thread main  QThread(0x7fabbf402df0)
    Thread init()  QThread(0x7ffeea655a10)
    Thread timer  QThread(0x7fabbf402df0)
    Thread subscribe()  QThread(0x7ffeea655a10)
    this state 2
    Thread subscription()  QThread(0x7ffeea655a10)
    Thread client  QThread(0x7ffeea655a10)
    client state 2
    "50\n"
    "50\n"
    "50\n"
    "50\n"
    "50\n"
    

  • Lifetime Qt Champion

    Hi,

    1. I currently don't know because I haven't used that class yet.
    2. You should have apply the single responsibility principle. Have your message handler handle messages and connect whatever is needed to it from a manager class. If your classes start to know each other too deeply, you are going to have a lot of maintenance pain.

Log in to reply