Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

Keeping track of threads



  • Hello,

    This is my first kick at QT Threading.

    I have a program that up until now ran everything in the main thread. The GUI program has 25+ user input variables, reads data from a db, performs calculations on the data and spits out a spreadsheet. This can take up to 1 minute to accomplish.

    Now I wish to do this in a loop of up to 1000 so I would like to multithread it for performance.

    I have it almost complete for running it once and now have some questions and looking for some advise on best practice.

    The threads need to update their progress while running and then send back the information when complete.

    I am using QThread with slots/signals to communicate from the thread to the main thread and the Worker variables to communicate to the thread. My worker class is:

    class Worker : public QObject
    {
        Q_OBJECT
    
    private:
        //variables used in the calculations    
    public:
        ///25 user input variables here
    
        explicit Worker(QObject *parent = 0);
    
    signals:
        void sendRunOnce(std::vector<QString> rows);
        void sendProgress(int amount);
        void sendFinalAmount(int id, double amount);//used when looping the mutiple thread runOnce
        void sendLog(QString text);
    
    public slots:
        double doWork();//where the calculations are done
    
    };
    
    

    Then I create and run the thread/worker here:

            QThread* thread = new QThread();
            Worker* worker = new Worker();
            //get all the ui elements and
            //set the 25 worker variables
    
            worker->moveToThread(thread);
            //set connections
    
            connect(thread, SIGNAL(finished()),worker, SLOT(deleteLater()));
            connect(thread, SIGNAL(started()), worker, SLOT(doWork()));
            connect(worker, SIGNAL(sendRunOnce(std::vector<QString>)),this, SLOT(receiveRunOnce(std::vector<QString>)));
    
            thread->start();
    

    This all works fine.

    So my questions are:
    1: am I doing this via best practice
    2: what is the best way to monitor this worker thread to see when it is done as I wish to grey the UI until its complete(this will also assist in question 3)
    3: The big question....Now I wish to loop this process and keep track of the threads created, and when they are done.

    I am thinking of making a vector of Threads, using QThread::idealThreadCount to see how many should be running at one time and of course looping to watch the vector for completion of threads and then starting a new one in its place.

    I am just unsure how to accomplish this. I dont think I can do a while loop to monitor the threads withing the main thread as the signals will not come through? Do I create a 'monitor' thread to do this work, if so what would that look like?

    I have done research on this but cannot seem to put it together.

    Any insight would be appreciated.

    Thanks,
    --James



  • example:

    worker.h

    #ifndef WORKER_H
    #define WORKER_H
    #include <QObject>
    #include <QTimer>
    class Worker : public QObject{
        Q_OBJECT
        Q_DISABLE_COPY_MOVE(Worker)
    public:
        explicit Worker(QObject* parent = nullptr)
            : QObject(parent)
            , m_input(0)
        {}
        int input() const {return m_input;}
        void setInput(int value){m_input = value;}
    public slots:
        void doWork(){
            // simulate a calculation that takes 500ms to return the double of input
            QTimer::singleShot(500,this,std::bind(&Worker::workerFinished,this,m_input*2));
        }
    signals:
        void workerFinished(int result);
    private:
        int m_input;
    };
    #endif
    

    mainobject.h

    #ifndef MAINOBJECT_H
    #define MAINOBJECT_H
    #include <QObject>
    #include <QQueue>
    #include <QVector>
    #include <QThread>
    #include <functional>
    #include "worker.h"
    #include <iostream>
    #include <QTimer>
    class MainObject : public QObject{
        Q_OBJECT
        Q_DISABLE_COPY_MOVE(MainObject)
    public:
        explicit MainObject(QObject* parent = nullptr)
            :QObject(parent)
            ,resultsToFetch(0)
        {}
        void addInputToCalculate(int value){
            inputsToCalculate.append(value);
        }
        ~MainObject(){
            for(QThread* thread : qAsConst(threads)){
                thread->quit();
                thread->wait();
            }
            qDeleteAll(workers);
        }
    public slots:
        void calculate(){
            if(inputsToCalculate.isEmpty())
                return;
            resultsToFetch = inputsToCalculate.size();
            for(int i : qAsConst(inputsToCalculate)){
                Worker* worker = new Worker;
                worker->setInput(i);
                workers.enqueue(worker);
            }
            for(int i=0;i<qMin(inputsToCalculate.size(),qMax(1,QThread::idealThreadCount()));++i){
                QThread* thread = new QThread;
                thread->start();
                threads.append(thread);
                connect(thread,&QThread::finished,thread, &QThread::deleteLater);
                startNextWorker(thread);
            }
        }
    private slots:
        void fetchWorkerResult(int input, int result){
            std::cout << "Input: " << input << " Result: " << result << std::endl;
            if(--resultsToFetch<=0)
                calculationComplete();
        }
        void startNextWorker(QThread* threadToUse){
            if(workers.isEmpty()){
                threadToUse->quit();
                threadToUse->wait();
                threads.removeAll(threadToUse);
                return;
            }
            Worker* worker = workers.dequeue();
            const int workInput = worker->input();
            worker->moveToThread(threadToUse);
            connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::startNextWorker, this, threadToUse));
            connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::fetchWorkerResult, this, workInput , std::placeholders::_1));
            connect(worker,&Worker::workerFinished,worker,&Worker::deleteLater);
            QTimer::singleShot(0,worker,&Worker::doWork);
        }
    signals:
        void calculationComplete();
    private:
        QVector<int> inputsToCalculate;
        QQueue<Worker*> workers;
        QVector<QThread*> threads;
        int resultsToFetch;
    };
    
    #endif // MAINOBJECT_H
    

    main.cpp

    #include "mainobject.h"
    #include <QCoreApplication>
    
    int main(int argc, char *argv[])
    {
        QCoreApplication a(argc, argv);
        MainObject obj;
        for(int i=0;i<100;++i)
            obj.addInputToCalculate(i);
        obj.calculate();
        QObject::connect(&obj,&MainObject::calculationComplete,[](){std::cout << "Completed!";});
        QObject::connect(&obj,&MainObject::calculationComplete,&a,&QCoreApplication::quit);
        return a.exec();
    }
    


  • I think instead of moving a QObject to a thread, you could subclass QThread (the second example in QThread doc).
    Write your doWork() codes in the run() function.
    When the run() ends, the thread is finished.
    And you can get the thread state by QThread::isFinished() or QThread::isRunning() when doing a loop.
    [ADDED]
    Another approach is to use QThreadPool and subclass QRunnable. QtConcurrent is also an option.
    If they meet your requirements, there's no need to manage the threads yourself.



  • @Bonnie
    I looked at QThreadPool but the documentation I read says you cannot use signals/slots IE you cannot pass information which I need to do.

    Ill look into subclassing. Thanks



  • @Bonnie
    I looked into subclassing. I do not think it will work as everything Constructor etc is in the old thread, only the run() function is moved to the new thread so I do not think I would have access to the member variables I require(didnt test this).

    --James



  • @JSher Don't quite understand what you need.
    Anything that the first example is able to do, the second is also capable of and more powerful (my thought).
    [EDITED]
    I see, it's signal/slot related, right?
    The subclassing thread cannot invoke a slot in the new thread. (Oh, that's what it cannot do :( )
    But as I saw you code, you seem not need to.
    And you can access member varibles in the run funtion, just careful about synchronizing.


  • Lifetime Qt Champion

    @JSher

    1. Is fine
    2. You already connected a slot to finished() signal, so you get notified when a thread finishes. So, what exactly is the problem here?
    3. Yes, you can use a vector. No need to use any loop or additional "monitoring" thread. Just start a new thread (if one is waiting) when another one finishes, or if you still did not reach max amount of threads.


  • @Bonnie thanks for the info however I think Ill stick with the worker class system as it appears more robust and I may need slots.

    @jsulm

    Isnt deleteLater() a built in function that destroys the worker object? I am unsure how this helps me?

    I think I see where your going though:

    1 Have a class member vector of threads
    2 When thread dies, run a function that starts another if there is still need of more so no loop

    Again though, I am unsure the exact logistics of this unless I can connect finished() to another SLOT?

    Thanks,
    --James


  • Lifetime Qt Champion

    @JSher said in Keeping track of threads:

    Again though, I am unsure the exact logistics of this unless I can connect finished() to another SLOT?

    Yes, connect it to another slot and remove the connection to delete later (call deleteLater() in the new slot). In that slot implement the logic.



  • You can have a look at this example (in particular ChatServer::incomingConnection). It keeps 2 vecors, one with the threads and one of of integers that determine how much each thread is "loaded" with work. Every time a new worker needs to be created it finds out the thread with the least amount of workers currently running and uses it.



  • @jsulm

    So the issue I cannot get by is if I connect the thread's finished() to another SLOT lets say threadFinished(), since the thread finished() does not have any parameters, I do not know what worker it was associated to that thread so I can activate that worker's deleteLater(). Best I can think of is keeping flag in the worker so it knows when its finished and check the whole list to see what ones need deleted but this would be inefficient...

    Any ideas?

    @VRonin

    Thank you, this is a great example and helped. It is not the same logic as it keeps threads going and adding workers to them to balance them so it just links finished() with deleteLater() but calls deleteLater() prior if the worker is done. Mine will use 1 worker to 1 thread at a time as each worker will max the core usage.

    Thanks,
    --James



  • @JSher said in Keeping track of threads:

    Mine will use 1 worker to 1 thread at a time as each worker will max the core usage.

    In this case I'd spawn qMin(qMax(1,QThread::idealThreadCount()),numberOfWorkers) spread the workers among the threads and chain the finish signal of 1 worker to the doWork slot of the next worker. This way you avoid the overhead of creating and destorying a thread for each worker



  • @VRonin

    I like this idea but am unsure how to implement it.

    I will ALWAYS have more workers than threads. I already spawn idealThreadcount - 1 Threads.

    It is functioning correctly except the thread does not die so my threadFinished() is never called from the connector

    connect(thread, SIGNAL(finished()),this, SLOT(threadFinished()));
    

    My design was once a thread dies, start another and throw a worker into it. I am receiving the information back from the worker when its complete through connect(worker, SIGNAL(sendWorkerInfo(int,double)),this,SLOT(receiveWorkerInfo(int,double)));.

    Can you describe a little more detail of how to chain the workers together as well as why my threads are not dying? I thought they died when doWork() ended?

    Thanks,
    --James



  • I have it working but there is a warning being thrown QObject::connect: No such slot Worker::threadFinished() and I cannot figure out why. Not sure if it is done the correct way but here it is:

    I have 2 vectors for threads and workers:

    std::vector<Worker*> workers;
    std::vector<QThread*> threads;
    

    I create the workers in a loop here:

    Worker *worker = new Worker();
    fillWorker(worker);
    worker->id = wid;
    wid++;
    worker->decisionMonths = bt;
     worker->recalcFreq = rc;
    worker->percentSummerized = ps;
    worker->decTrade = f;
    connect(worker, SIGNAL(sendWorkerInfo(int,double)),this,SLOT(receiveWorkerInfo(int,double)));
    workers.push_back(worker);
    

    Then I create the threads(cpu-1) here:

     QThread *thread = new QThread();
    Worker *worker = getWorker();//grabs the next non started worker from vector
    worker->isStarted = true;
    worker->moveToThread(thread);
    connect(thread, SIGNAL(finished()),this, SLOT(threadFinished()));
    connect(thread, SIGNAL(started()), worker, SLOT(doWork()));
    connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);
    thread->start();
    threads.push_back(thread);
    

    Then when the worker is complete, it sends required info to receiveWorkerInfo(int,double) and I store it then deal with it once all is complete.
    When the thread is finished(not destroyed), threadFinished() runs. It grabs a non running thread and throws another worker in and starts it here:

    Worker *worker = getWorker();//grabs a non started worker
        if(worker != nullptr){
            //got a worker, get an idle thread and use it
            QThread *thread = nullptr;
            while(thread == nullptr){
                thread = getThread();//gets an idle thread from vector
            }
           
            worker->isStarted = true;
            worker->moveToThread(thread);
            connect(thread, SIGNAL(finished()),worker, SLOT(threadFinished()));//WARNING THROWN HERE
            connect(thread, SIGNAL(started()), worker, SLOT(doWork()));
            connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);
            thread->start();
        }
    

    Although it works, why am I getting the warning when trying to connect the SLOT threadFinished()?
    EDIT: Im an idiot, this is not a new thread and cannot connect the SLOT again...;) But is this the correct way to accomplish this?

    Also, is this the correct way to do this?

    Thanks,
    --James



  • @JSher
    If you have a list of data sets that can be worked on independently, there may be simpler solutions:

    • QtConcurrent::map works on a sequence or container, and will call a processing method on each item (in separate threads, of course). Progress monitoring is built-in with QFuture, which updates you on the progress using signals and slots
    • std::transform with a parallel execution policy works in a similar fashion, but without the bells and whistles of QFuture


  • @JSher said in Keeping track of threads:

    connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);

    Why? 1st don't use Qt::DirectConnection for items on different threads, 2nd if you still need the thread for another worker there is no point stopping it.

    Put the workers in a QQueue, dequeue a worker, move it to a thread and then use QTimer::singleShot(0,worker,&Worker::doWork); to start it, do it for as many threads as you have.
    When a worker is done (i.e. when it emits finished), dequeue another worker and do the same.
    Once you have no more workers to process stop and cleanup the threads



  • @VRonin said in Keeping track of threads:

    connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);

    I obtained that code from a post marked as the correct answer on a situation like mine(trying to see when a thread's worker was complete) Basically matching the thread to the worker and starting its work method.

    So how do I target a thread that has a finished worker to move another worker to? Only way I can think of is create a map with the thread's position in the vector with the workerID, when the worker sends its info, match the ID with the thread position then move a new worker to that thread and QTimer::singleShot(0,worker,&Worker::doWork); it to start?

    Thanks,
    --James



  • example:

    worker.h

    #ifndef WORKER_H
    #define WORKER_H
    #include <QObject>
    #include <QTimer>
    class Worker : public QObject{
        Q_OBJECT
        Q_DISABLE_COPY_MOVE(Worker)
    public:
        explicit Worker(QObject* parent = nullptr)
            : QObject(parent)
            , m_input(0)
        {}
        int input() const {return m_input;}
        void setInput(int value){m_input = value;}
    public slots:
        void doWork(){
            // simulate a calculation that takes 500ms to return the double of input
            QTimer::singleShot(500,this,std::bind(&Worker::workerFinished,this,m_input*2));
        }
    signals:
        void workerFinished(int result);
    private:
        int m_input;
    };
    #endif
    

    mainobject.h

    #ifndef MAINOBJECT_H
    #define MAINOBJECT_H
    #include <QObject>
    #include <QQueue>
    #include <QVector>
    #include <QThread>
    #include <functional>
    #include "worker.h"
    #include <iostream>
    #include <QTimer>
    class MainObject : public QObject{
        Q_OBJECT
        Q_DISABLE_COPY_MOVE(MainObject)
    public:
        explicit MainObject(QObject* parent = nullptr)
            :QObject(parent)
            ,resultsToFetch(0)
        {}
        void addInputToCalculate(int value){
            inputsToCalculate.append(value);
        }
        ~MainObject(){
            for(QThread* thread : qAsConst(threads)){
                thread->quit();
                thread->wait();
            }
            qDeleteAll(workers);
        }
    public slots:
        void calculate(){
            if(inputsToCalculate.isEmpty())
                return;
            resultsToFetch = inputsToCalculate.size();
            for(int i : qAsConst(inputsToCalculate)){
                Worker* worker = new Worker;
                worker->setInput(i);
                workers.enqueue(worker);
            }
            for(int i=0;i<qMin(inputsToCalculate.size(),qMax(1,QThread::idealThreadCount()));++i){
                QThread* thread = new QThread;
                thread->start();
                threads.append(thread);
                connect(thread,&QThread::finished,thread, &QThread::deleteLater);
                startNextWorker(thread);
            }
        }
    private slots:
        void fetchWorkerResult(int input, int result){
            std::cout << "Input: " << input << " Result: " << result << std::endl;
            if(--resultsToFetch<=0)
                calculationComplete();
        }
        void startNextWorker(QThread* threadToUse){
            if(workers.isEmpty()){
                threadToUse->quit();
                threadToUse->wait();
                threads.removeAll(threadToUse);
                return;
            }
            Worker* worker = workers.dequeue();
            const int workInput = worker->input();
            worker->moveToThread(threadToUse);
            connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::startNextWorker, this, threadToUse));
            connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::fetchWorkerResult, this, workInput , std::placeholders::_1));
            connect(worker,&Worker::workerFinished,worker,&Worker::deleteLater);
            QTimer::singleShot(0,worker,&Worker::doWork);
        }
    signals:
        void calculationComplete();
    private:
        QVector<int> inputsToCalculate;
        QQueue<Worker*> workers;
        QVector<QThread*> threads;
        int resultsToFetch;
    };
    
    #endif // MAINOBJECT_H
    

    main.cpp

    #include "mainobject.h"
    #include <QCoreApplication>
    
    int main(int argc, char *argv[])
    {
        QCoreApplication a(argc, argv);
        MainObject obj;
        for(int i=0;i<100;++i)
            obj.addInputToCalculate(i);
        obj.calculate();
        QObject::connect(&obj,&MainObject::calculationComplete,[](){std::cout << "Completed!";});
        QObject::connect(&obj,&MainObject::calculationComplete,&a,&QCoreApplication::quit);
        return a.exec();
    }
    


  • @VRonin

    Wow thank you. Your code is very advanced, hard for us new folks to follow:)

    Couple of things:

    connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::startNextWorker, this, threadToUse));

    This is what I was missing. I do not understand how it puts the thread in startNextWorker as an argument but I guess I do not need to:)

    Also I noticed that for connections you do connect(worker, &worker::workerFinished...., where I use, and most examples I see, are connect(worker, SIGNAL(workerFinished())...... I do not understand how yours works and/or why it is used in this way

    So basically my design was to create the thread, add the worker, start the thread(starts worker), worker stops stop thread, rinse and repeat.

    Your design is to create the thread and start it, then add workers, when they finish, delete and add another.

    Thanks!
    --James



  • @JSher said in Keeping track of threads:

    Also I noticed that for connections you do connect(worker, &worker::workerFinished...., where I use, and most examples I see, are connect(worker, SIGNAL(workerFinished())...... I do not understand how yours works and/or why it is used in this way

    The way you are doing was the pre-Qt5 way. When Qt5 was released the new (and strictly superior) syntax was introduced. It's explained well in this article

    Couple of things:
    connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::startNextWorker, this, threadToUse));
    This is what I was missing. I do not understand how it puts the thread in startNextWorker as an argument but I guess I do not need to:)

    std::bind is the same as a lambda, it's the same as
    connect(worker, &Worker::workerFinished, this, [this,threadToUse](){startNextWorker(threadToUse);});



  • Easier example. Here all the chaining of workers and assigning them to threads is done upfront

    worker.h

    #ifndef WORKER_H
    #define WORKER_H
    #include <QObject>
    #include <QTimer>
    class Worker : public QObject{
        Q_OBJECT
        Q_DISABLE_COPY_MOVE(Worker)
    public:
        explicit Worker(QObject* parent = nullptr)
            : QObject(parent)
            , m_input(0)
        {}
        int input() const {return m_input;}
        void setInput(int value){m_input = value;}
    public slots:
        void doWork(){
            // simulate a calculation that takes 500ms to return the double of input
            QTimer::singleShot(500,this,std::bind(&Worker::workerFinished,this,m_input,m_input*2));
        }
    signals:
        void workerFinished(int input, int result);
    private:
        int m_input;
    };
    #endif
    

    mainobject.h

    #ifndef MAINOBJECT_H
    #define MAINOBJECT_H
    #include <QObject>
    #include <QPointer>
    #include <QVector>
    #include <QThread>
    #include "worker.h"
    #include <iostream>
    #include <QTimer>
    class MainObject : public QObject{
        Q_OBJECT
        Q_DISABLE_COPY_MOVE(MainObject)
    public:
        explicit MainObject(QObject* parent = nullptr)
            :QObject(parent)
            ,resultsToFetch(0)
        {
            connect(this, &MainObject::calculationComplete, this, &MainObject::cleanUp);
        }
        void addInputToCalculate(int value){
            inputsToCalculate.append(value);
        }
        ~MainObject(){
            cleanUp();
        }
    public slots:
        void calculate(){
            if(inputsToCalculate.isEmpty())
                return;
            cleanUp();
            resultsToFetch = inputsToCalculate.size();
            const int numThreads = qMin(inputsToCalculate.size(),qMax(1,QThread::idealThreadCount()));
            for(int i=0;i<numThreads;++i){
                QThread* thread = new QThread;
                threads.append(thread);
                connect(thread,&QThread::finished, thread, &QThread::deleteLater);
            }
            QVector<Worker*> previousWorker(numThreads, nullptr);
            for(int i=0;i<inputsToCalculate.size();++i){
                Worker* worker = new Worker;
                worker->setInput(inputsToCalculate.at(i));
                connect(worker,&Worker::workerFinished,this,&MainObject::fetchWorkerResult);
                connect(worker,&Worker::workerFinished,worker,&Worker::deleteLater);
                connect(thread,&QThread::finished, worker, &QThread::deleteLater);
                if(previousWorker.at(i%numThreads))
                    connect(previousWorker.at(i%numThreads),&Worker::workerFinished,worker,&Worker::doWork);
                else
                    connect(threads.at(i%numThreads),&QThread::started,worker,&Worker::doWork);
                previousWorker[i%numThreads] = worker;
                worker->moveToThread(threads.at(i%numThreads));
            }
            for(int i=0;i<previousWorker.size();++i)
                connect(previousWorker.at(i%numThreads),&Worker::workerFinished,threads.at(i%numThreads),&QThread::quit);
            for(QThread* thread : qAsConst(threads))
                thread->start();
    
        }
    private slots:
        void fetchWorkerResult(int input, int result){
            std::cout << "Input: " << input << " Result: " << result << std::endl;
            if(--resultsToFetch<=0)
                calculationComplete();
        }
        void cleanUp(){
            for(QThread* thread : qAsConst(threads)){
                if(thread){
                    thread->quit();
                    thread->wait();
                }
            }
            threads.clear();
        }
    signals:
        void calculationComplete();
    private:
        QVector<int> inputsToCalculate;
        QVector<QPointer<QThread> > threads;
        int resultsToFetch;
    
    };
    
    #endif // MAINOBJECT_H
    

    main.cpp

    #include "mainobject.h"
    #include <QCoreApplication>
    
    int main(int argc, char *argv[])
    {
        QCoreApplication a(argc, argv);
        MainObject obj;
        for(int i=0;i<100;++i)
            obj.addInputToCalculate(i);
        obj.calculate();
        QObject::connect(&obj,&MainObject::calculationComplete,[](){std::cout << "Completed!";});
        QObject::connect(&obj,&MainObject::calculationComplete,&a,&QCoreApplication::quit);
        return a.exec();
    }
    


  • @VRonin

    I have it implemented and it works great. The new QT5 way is much better as you do not have to worry about paramaters in the calls. Just an FYI, when I did it my way it worked fine until the number of workers reached about 50+ then I started received errors like QThread destroyed... once I implemented your way it works fine and I believe its faster as well(never tested).

    Thanks very much for your assistance!

    --James


Log in to reply