Skip to content
  • Categories
  • Recent
  • Tags
  • Popular
  • Users
  • Groups
  • Search
  • Get Qt Extensions
  • Unsolved
Collapse
Brand Logo
  1. Home
  2. Qt Development
  3. General and Desktop
  4. Keeping track of threads
Forum Updated to NodeBB v4.3 + New Features

Keeping track of threads

Scheduled Pinned Locked Moved Solved General and Desktop
21 Posts 5 Posters 5.6k Views 1 Watching
  • Oldest to Newest
  • Newest to Oldest
  • Most Votes
Reply
  • Reply as topic
Log in to reply
This topic has been deleted. Only users with topic management privileges can see it.
  • J Offline
    J Offline
    JSher
    wrote on last edited by
    #12

    @VRonin

    I like this idea but am unsure how to implement it.

    I will ALWAYS have more workers than threads. I already spawn idealThreadcount - 1 Threads.

    It is functioning correctly except the thread does not die so my threadFinished() is never called from the connector

    connect(thread, SIGNAL(finished()),this, SLOT(threadFinished()));
    

    My design was once a thread dies, start another and throw a worker into it. I am receiving the information back from the worker when its complete through connect(worker, SIGNAL(sendWorkerInfo(int,double)),this,SLOT(receiveWorkerInfo(int,double)));.

    Can you describe a little more detail of how to chain the workers together as well as why my threads are not dying? I thought they died when doWork() ended?

    Thanks,
    --James

    1 Reply Last reply
    0
    • J Offline
      J Offline
      JSher
      wrote on last edited by JSher
      #13

      I have it working but there is a warning being thrown QObject::connect: No such slot Worker::threadFinished() and I cannot figure out why. Not sure if it is done the correct way but here it is:

      I have 2 vectors for threads and workers:

      std::vector<Worker*> workers;
      std::vector<QThread*> threads;
      

      I create the workers in a loop here:

      Worker *worker = new Worker();
      fillWorker(worker);
      worker->id = wid;
      wid++;
      worker->decisionMonths = bt;
       worker->recalcFreq = rc;
      worker->percentSummerized = ps;
      worker->decTrade = f;
      connect(worker, SIGNAL(sendWorkerInfo(int,double)),this,SLOT(receiveWorkerInfo(int,double)));
      workers.push_back(worker);
      

      Then I create the threads(cpu-1) here:

       QThread *thread = new QThread();
      Worker *worker = getWorker();//grabs the next non started worker from vector
      worker->isStarted = true;
      worker->moveToThread(thread);
      connect(thread, SIGNAL(finished()),this, SLOT(threadFinished()));
      connect(thread, SIGNAL(started()), worker, SLOT(doWork()));
      connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);
      thread->start();
      threads.push_back(thread);
      

      Then when the worker is complete, it sends required info to receiveWorkerInfo(int,double) and I store it then deal with it once all is complete.
      When the thread is finished(not destroyed), threadFinished() runs. It grabs a non running thread and throws another worker in and starts it here:

      Worker *worker = getWorker();//grabs a non started worker
          if(worker != nullptr){
              //got a worker, get an idle thread and use it
              QThread *thread = nullptr;
              while(thread == nullptr){
                  thread = getThread();//gets an idle thread from vector
              }
             
              worker->isStarted = true;
              worker->moveToThread(thread);
              connect(thread, SIGNAL(finished()),worker, SLOT(threadFinished()));//WARNING THROWN HERE
              connect(thread, SIGNAL(started()), worker, SLOT(doWork()));
              connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);
              thread->start();
          }
      

      Although it works, why am I getting the warning when trying to connect the SLOT threadFinished()?
      EDIT: Im an idiot, this is not a new thread and cannot connect the SLOT again...;) But is this the correct way to accomplish this?

      Also, is this the correct way to do this?

      Thanks,
      --James

      VRoninV 1 Reply Last reply
      0
      • J JSher

        Hello,

        This is my first kick at QT Threading.

        I have a program that up until now ran everything in the main thread. The GUI program has 25+ user input variables, reads data from a db, performs calculations on the data and spits out a spreadsheet. This can take up to 1 minute to accomplish.

        Now I wish to do this in a loop of up to 1000 so I would like to multithread it for performance.

        I have it almost complete for running it once and now have some questions and looking for some advise on best practice.

        The threads need to update their progress while running and then send back the information when complete.

        I am using QThread with slots/signals to communicate from the thread to the main thread and the Worker variables to communicate to the thread. My worker class is:

        class Worker : public QObject
        {
            Q_OBJECT
        
        private:
            //variables used in the calculations    
        public:
            ///25 user input variables here
        
            explicit Worker(QObject *parent = 0);
        
        signals:
            void sendRunOnce(std::vector<QString> rows);
            void sendProgress(int amount);
            void sendFinalAmount(int id, double amount);//used when looping the mutiple thread runOnce
            void sendLog(QString text);
        
        public slots:
            double doWork();//where the calculations are done
        
        };
        
        

        Then I create and run the thread/worker here:

                QThread* thread = new QThread();
                Worker* worker = new Worker();
                //get all the ui elements and
                //set the 25 worker variables
        
                worker->moveToThread(thread);
                //set connections
        
                connect(thread, SIGNAL(finished()),worker, SLOT(deleteLater()));
                connect(thread, SIGNAL(started()), worker, SLOT(doWork()));
                connect(worker, SIGNAL(sendRunOnce(std::vector<QString>)),this, SLOT(receiveRunOnce(std::vector<QString>)));
        
                thread->start();
        

        This all works fine.

        So my questions are:
        1: am I doing this via best practice
        2: what is the best way to monitor this worker thread to see when it is done as I wish to grey the UI until its complete(this will also assist in question 3)
        3: The big question....Now I wish to loop this process and keep track of the threads created, and when they are done.

        I am thinking of making a vector of Threads, using QThread::idealThreadCount to see how many should be running at one time and of course looping to watch the vector for completion of threads and then starting a new one in its place.

        I am just unsure how to accomplish this. I dont think I can do a while loop to monitor the threads withing the main thread as the signals will not come through? Do I create a 'monitor' thread to do this work, if so what would that look like?

        I have done research on this but cannot seem to put it together.

        Any insight would be appreciated.

        Thanks,
        --James

        A Offline
        A Offline
        Asperamanca
        wrote on last edited by
        #14

        @JSher
        If you have a list of data sets that can be worked on independently, there may be simpler solutions:

        • QtConcurrent::map works on a sequence or container, and will call a processing method on each item (in separate threads, of course). Progress monitoring is built-in with QFuture, which updates you on the progress using signals and slots
        • std::transform with a parallel execution policy works in a similar fashion, but without the bells and whistles of QFuture
        1 Reply Last reply
        0
        • J JSher

          I have it working but there is a warning being thrown QObject::connect: No such slot Worker::threadFinished() and I cannot figure out why. Not sure if it is done the correct way but here it is:

          I have 2 vectors for threads and workers:

          std::vector<Worker*> workers;
          std::vector<QThread*> threads;
          

          I create the workers in a loop here:

          Worker *worker = new Worker();
          fillWorker(worker);
          worker->id = wid;
          wid++;
          worker->decisionMonths = bt;
           worker->recalcFreq = rc;
          worker->percentSummerized = ps;
          worker->decTrade = f;
          connect(worker, SIGNAL(sendWorkerInfo(int,double)),this,SLOT(receiveWorkerInfo(int,double)));
          workers.push_back(worker);
          

          Then I create the threads(cpu-1) here:

           QThread *thread = new QThread();
          Worker *worker = getWorker();//grabs the next non started worker from vector
          worker->isStarted = true;
          worker->moveToThread(thread);
          connect(thread, SIGNAL(finished()),this, SLOT(threadFinished()));
          connect(thread, SIGNAL(started()), worker, SLOT(doWork()));
          connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);
          thread->start();
          threads.push_back(thread);
          

          Then when the worker is complete, it sends required info to receiveWorkerInfo(int,double) and I store it then deal with it once all is complete.
          When the thread is finished(not destroyed), threadFinished() runs. It grabs a non running thread and throws another worker in and starts it here:

          Worker *worker = getWorker();//grabs a non started worker
              if(worker != nullptr){
                  //got a worker, get an idle thread and use it
                  QThread *thread = nullptr;
                  while(thread == nullptr){
                      thread = getThread();//gets an idle thread from vector
                  }
                 
                  worker->isStarted = true;
                  worker->moveToThread(thread);
                  connect(thread, SIGNAL(finished()),worker, SLOT(threadFinished()));//WARNING THROWN HERE
                  connect(thread, SIGNAL(started()), worker, SLOT(doWork()));
                  connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);
                  thread->start();
              }
          

          Although it works, why am I getting the warning when trying to connect the SLOT threadFinished()?
          EDIT: Im an idiot, this is not a new thread and cannot connect the SLOT again...;) But is this the correct way to accomplish this?

          Also, is this the correct way to do this?

          Thanks,
          --James

          VRoninV Offline
          VRoninV Offline
          VRonin
          wrote on last edited by VRonin
          #15

          @JSher said in Keeping track of threads:

          connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);

          Why? 1st don't use Qt::DirectConnection for items on different threads, 2nd if you still need the thread for another worker there is no point stopping it.

          Put the workers in a QQueue, dequeue a worker, move it to a thread and then use QTimer::singleShot(0,worker,&Worker::doWork); to start it, do it for as many threads as you have.
          When a worker is done (i.e. when it emits finished), dequeue another worker and do the same.
          Once you have no more workers to process stop and cleanup the threads

          "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
          ~Napoleon Bonaparte

          On a crusade to banish setIndexWidget() from the holy land of Qt

          1 Reply Last reply
          3
          • J Offline
            J Offline
            JSher
            wrote on last edited by
            #16

            @VRonin said in Keeping track of threads:

            connect(worker, SIGNAL(finished()), thread, SLOT(quit()), Qt::DirectConnection);

            I obtained that code from a post marked as the correct answer on a situation like mine(trying to see when a thread's worker was complete) Basically matching the thread to the worker and starting its work method.

            So how do I target a thread that has a finished worker to move another worker to? Only way I can think of is create a map with the thread's position in the vector with the workerID, when the worker sends its info, match the ID with the thread position then move a new worker to that thread and QTimer::singleShot(0,worker,&Worker::doWork); it to start?

            Thanks,
            --James

            1 Reply Last reply
            0
            • VRoninV Offline
              VRoninV Offline
              VRonin
              wrote on last edited by VRonin
              #17

              example:

              worker.h

              #ifndef WORKER_H
              #define WORKER_H
              #include <QObject>
              #include <QTimer>
              class Worker : public QObject{
                  Q_OBJECT
                  Q_DISABLE_COPY_MOVE(Worker)
              public:
                  explicit Worker(QObject* parent = nullptr)
                      : QObject(parent)
                      , m_input(0)
                  {}
                  int input() const {return m_input;}
                  void setInput(int value){m_input = value;}
              public slots:
                  void doWork(){
                      // simulate a calculation that takes 500ms to return the double of input
                      QTimer::singleShot(500,this,std::bind(&Worker::workerFinished,this,m_input*2));
                  }
              signals:
                  void workerFinished(int result);
              private:
                  int m_input;
              };
              #endif
              

              mainobject.h

              #ifndef MAINOBJECT_H
              #define MAINOBJECT_H
              #include <QObject>
              #include <QQueue>
              #include <QVector>
              #include <QThread>
              #include <functional>
              #include "worker.h"
              #include <iostream>
              #include <QTimer>
              class MainObject : public QObject{
                  Q_OBJECT
                  Q_DISABLE_COPY_MOVE(MainObject)
              public:
                  explicit MainObject(QObject* parent = nullptr)
                      :QObject(parent)
                      ,resultsToFetch(0)
                  {}
                  void addInputToCalculate(int value){
                      inputsToCalculate.append(value);
                  }
                  ~MainObject(){
                      for(QThread* thread : qAsConst(threads)){
                          thread->quit();
                          thread->wait();
                      }
                      qDeleteAll(workers);
                  }
              public slots:
                  void calculate(){
                      if(inputsToCalculate.isEmpty())
                          return;
                      resultsToFetch = inputsToCalculate.size();
                      for(int i : qAsConst(inputsToCalculate)){
                          Worker* worker = new Worker;
                          worker->setInput(i);
                          workers.enqueue(worker);
                      }
                      for(int i=0;i<qMin(inputsToCalculate.size(),qMax(1,QThread::idealThreadCount()));++i){
                          QThread* thread = new QThread;
                          thread->start();
                          threads.append(thread);
                          connect(thread,&QThread::finished,thread, &QThread::deleteLater);
                          startNextWorker(thread);
                      }
                  }
              private slots:
                  void fetchWorkerResult(int input, int result){
                      std::cout << "Input: " << input << " Result: " << result << std::endl;
                      if(--resultsToFetch<=0)
                          calculationComplete();
                  }
                  void startNextWorker(QThread* threadToUse){
                      if(workers.isEmpty()){
                          threadToUse->quit();
                          threadToUse->wait();
                          threads.removeAll(threadToUse);
                          return;
                      }
                      Worker* worker = workers.dequeue();
                      const int workInput = worker->input();
                      worker->moveToThread(threadToUse);
                      connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::startNextWorker, this, threadToUse));
                      connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::fetchWorkerResult, this, workInput , std::placeholders::_1));
                      connect(worker,&Worker::workerFinished,worker,&Worker::deleteLater);
                      QTimer::singleShot(0,worker,&Worker::doWork);
                  }
              signals:
                  void calculationComplete();
              private:
                  QVector<int> inputsToCalculate;
                  QQueue<Worker*> workers;
                  QVector<QThread*> threads;
                  int resultsToFetch;
              };
              
              #endif // MAINOBJECT_H
              

              main.cpp

              #include "mainobject.h"
              #include <QCoreApplication>
              
              int main(int argc, char *argv[])
              {
                  QCoreApplication a(argc, argv);
                  MainObject obj;
                  for(int i=0;i<100;++i)
                      obj.addInputToCalculate(i);
                  obj.calculate();
                  QObject::connect(&obj,&MainObject::calculationComplete,[](){std::cout << "Completed!";});
                  QObject::connect(&obj,&MainObject::calculationComplete,&a,&QCoreApplication::quit);
                  return a.exec();
              }
              

              "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
              ~Napoleon Bonaparte

              On a crusade to banish setIndexWidget() from the holy land of Qt

              1 Reply Last reply
              5
              • J Offline
                J Offline
                JSher
                wrote on last edited by
                #18

                @VRonin

                Wow thank you. Your code is very advanced, hard for us new folks to follow:)

                Couple of things:

                connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::startNextWorker, this, threadToUse));

                This is what I was missing. I do not understand how it puts the thread in startNextWorker as an argument but I guess I do not need to:)

                Also I noticed that for connections you do connect(worker, &worker::workerFinished...., where I use, and most examples I see, are connect(worker, SIGNAL(workerFinished())...... I do not understand how yours works and/or why it is used in this way

                So basically my design was to create the thread, add the worker, start the thread(starts worker), worker stops stop thread, rinse and repeat.

                Your design is to create the thread and start it, then add workers, when they finish, delete and add another.

                Thanks!
                --James

                VRoninV 1 Reply Last reply
                0
                • J JSher

                  @VRonin

                  Wow thank you. Your code is very advanced, hard for us new folks to follow:)

                  Couple of things:

                  connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::startNextWorker, this, threadToUse));

                  This is what I was missing. I do not understand how it puts the thread in startNextWorker as an argument but I guess I do not need to:)

                  Also I noticed that for connections you do connect(worker, &worker::workerFinished...., where I use, and most examples I see, are connect(worker, SIGNAL(workerFinished())...... I do not understand how yours works and/or why it is used in this way

                  So basically my design was to create the thread, add the worker, start the thread(starts worker), worker stops stop thread, rinse and repeat.

                  Your design is to create the thread and start it, then add workers, when they finish, delete and add another.

                  Thanks!
                  --James

                  VRoninV Offline
                  VRoninV Offline
                  VRonin
                  wrote on last edited by
                  #19

                  @JSher said in Keeping track of threads:

                  Also I noticed that for connections you do connect(worker, &worker::workerFinished...., where I use, and most examples I see, are connect(worker, SIGNAL(workerFinished())...... I do not understand how yours works and/or why it is used in this way

                  The way you are doing was the pre-Qt5 way. When Qt5 was released the new (and strictly superior) syntax was introduced. It's explained well in this article

                  Couple of things:
                  connect(worker, &Worker::workerFinished, this, std::bind(&MainObject::startNextWorker, this, threadToUse));
                  This is what I was missing. I do not understand how it puts the thread in startNextWorker as an argument but I guess I do not need to:)

                  std::bind is the same as a lambda, it's the same as
                  connect(worker, &Worker::workerFinished, this, [this,threadToUse](){startNextWorker(threadToUse);});

                  "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
                  ~Napoleon Bonaparte

                  On a crusade to banish setIndexWidget() from the holy land of Qt

                  1 Reply Last reply
                  3
                  • VRoninV Offline
                    VRoninV Offline
                    VRonin
                    wrote on last edited by VRonin
                    #20

                    Easier example. Here all the chaining of workers and assigning them to threads is done upfront

                    worker.h

                    #ifndef WORKER_H
                    #define WORKER_H
                    #include <QObject>
                    #include <QTimer>
                    class Worker : public QObject{
                        Q_OBJECT
                        Q_DISABLE_COPY_MOVE(Worker)
                    public:
                        explicit Worker(QObject* parent = nullptr)
                            : QObject(parent)
                            , m_input(0)
                        {}
                        int input() const {return m_input;}
                        void setInput(int value){m_input = value;}
                    public slots:
                        void doWork(){
                            // simulate a calculation that takes 500ms to return the double of input
                            QTimer::singleShot(500,this,std::bind(&Worker::workerFinished,this,m_input,m_input*2));
                        }
                    signals:
                        void workerFinished(int input, int result);
                    private:
                        int m_input;
                    };
                    #endif
                    

                    mainobject.h

                    #ifndef MAINOBJECT_H
                    #define MAINOBJECT_H
                    #include <QObject>
                    #include <QPointer>
                    #include <QVector>
                    #include <QThread>
                    #include "worker.h"
                    #include <iostream>
                    #include <QTimer>
                    class MainObject : public QObject{
                        Q_OBJECT
                        Q_DISABLE_COPY_MOVE(MainObject)
                    public:
                        explicit MainObject(QObject* parent = nullptr)
                            :QObject(parent)
                            ,resultsToFetch(0)
                        {
                            connect(this, &MainObject::calculationComplete, this, &MainObject::cleanUp);
                        }
                        void addInputToCalculate(int value){
                            inputsToCalculate.append(value);
                        }
                        ~MainObject(){
                            cleanUp();
                        }
                    public slots:
                        void calculate(){
                            if(inputsToCalculate.isEmpty())
                                return;
                            cleanUp();
                            resultsToFetch = inputsToCalculate.size();
                            const int numThreads = qMin(inputsToCalculate.size(),qMax(1,QThread::idealThreadCount()));
                            for(int i=0;i<numThreads;++i){
                                QThread* thread = new QThread;
                                threads.append(thread);
                                connect(thread,&QThread::finished, thread, &QThread::deleteLater);
                            }
                            QVector<Worker*> previousWorker(numThreads, nullptr);
                            for(int i=0;i<inputsToCalculate.size();++i){
                                Worker* worker = new Worker;
                                worker->setInput(inputsToCalculate.at(i));
                                connect(worker,&Worker::workerFinished,this,&MainObject::fetchWorkerResult);
                                connect(worker,&Worker::workerFinished,worker,&Worker::deleteLater);
                                connect(thread,&QThread::finished, worker, &QThread::deleteLater);
                                if(previousWorker.at(i%numThreads))
                                    connect(previousWorker.at(i%numThreads),&Worker::workerFinished,worker,&Worker::doWork);
                                else
                                    connect(threads.at(i%numThreads),&QThread::started,worker,&Worker::doWork);
                                previousWorker[i%numThreads] = worker;
                                worker->moveToThread(threads.at(i%numThreads));
                            }
                            for(int i=0;i<previousWorker.size();++i)
                                connect(previousWorker.at(i%numThreads),&Worker::workerFinished,threads.at(i%numThreads),&QThread::quit);
                            for(QThread* thread : qAsConst(threads))
                                thread->start();
                    
                        }
                    private slots:
                        void fetchWorkerResult(int input, int result){
                            std::cout << "Input: " << input << " Result: " << result << std::endl;
                            if(--resultsToFetch<=0)
                                calculationComplete();
                        }
                        void cleanUp(){
                            for(QThread* thread : qAsConst(threads)){
                                if(thread){
                                    thread->quit();
                                    thread->wait();
                                }
                            }
                            threads.clear();
                        }
                    signals:
                        void calculationComplete();
                    private:
                        QVector<int> inputsToCalculate;
                        QVector<QPointer<QThread> > threads;
                        int resultsToFetch;
                    
                    };
                    
                    #endif // MAINOBJECT_H
                    

                    main.cpp

                    #include "mainobject.h"
                    #include <QCoreApplication>
                    
                    int main(int argc, char *argv[])
                    {
                        QCoreApplication a(argc, argv);
                        MainObject obj;
                        for(int i=0;i<100;++i)
                            obj.addInputToCalculate(i);
                        obj.calculate();
                        QObject::connect(&obj,&MainObject::calculationComplete,[](){std::cout << "Completed!";});
                        QObject::connect(&obj,&MainObject::calculationComplete,&a,&QCoreApplication::quit);
                        return a.exec();
                    }
                    

                    "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
                    ~Napoleon Bonaparte

                    On a crusade to banish setIndexWidget() from the holy land of Qt

                    1 Reply Last reply
                    4
                    • J Offline
                      J Offline
                      JSher
                      wrote on last edited by
                      #21

                      @VRonin

                      I have it implemented and it works great. The new QT5 way is much better as you do not have to worry about paramaters in the calls. Just an FYI, when I did it my way it worked fine until the number of workers reached about 50+ then I started received errors like QThread destroyed... once I implemented your way it works fine and I believe its faster as well(never tested).

                      Thanks very much for your assistance!

                      --James

                      1 Reply Last reply
                      0

                      • Login

                      • Login or register to search.
                      • First post
                        Last post
                      0
                      • Categories
                      • Recent
                      • Tags
                      • Popular
                      • Users
                      • Groups
                      • Search
                      • Get Qt Extensions
                      • Unsolved