Skip to content
  • Categories
  • Recent
  • Tags
  • Popular
  • Users
  • Groups
  • Search
  • Get Qt Extensions
  • Unsolved
Collapse
Brand Logo
  1. Home
  2. Qt Development
  3. General and Desktop
  4. Proper way to Manage Information Between Threads
QtWS25 Last Chance

Proper way to Manage Information Between Threads

Scheduled Pinned Locked Moved Solved General and Desktop
6 Posts 4 Posters 5.2k Views
  • Oldest to Newest
  • Newest to Oldest
  • Most Votes
Reply
  • Reply as topic
Log in to reply
This topic has been deleted. Only users with topic management privileges can see it.
  • D Offline
    D Offline
    darkp03
    wrote on last edited by darkp03
    #1

    Hi everybody.
    Im currently working on an aplication with several threads. Everything is working fine, but I should want to know if im doing things on the "correct way" or maybe what im doing is an exageration.

    I have a producer class, that has an QUdpSocket. It recieves data through UDP, does some processing, and sends it to the consumer thread. The consumer thread must be processing at all times, and if theres no more data just wait for it.
    I managed this with 1 QMutex, 1 QQueue and 1 QSemaphore

    I have the following variables that can be seen by either the QThread or the Producer

    QMutex MutexProtectionQueue;//Mutex ment to protect my queue
    QQueue<QByteArray> Queue; //This is the Queue of messages
    QSemaphore SyncSemaphore;//Semaphore to sync producer with consumer
    

    On my producer class I have the following code

    QByteArray Data;//This is the data that i want to process
    
    MutexProtectionQueue.lock(); //Protect the queue
    Queue.enqueue(Data);
    MutexProtectionQueue.unlock(); //release the queue
    SyncSemaphore.release(); //Activate consumer
    

    On my consumer class i have

    void ClassProducer::run() //Reimplemented run method
    {
         while(1):
         {
              SyncSemaphore.acquire();
              MutexProtectionQueue.lock();
              QByteArray Data=Queue.dequeue();
              MutexProtectionQueue.unlock();
    
              //Process my QByteArray
         }
    }
    

    My question, is that Mutex necesary? I am afraid that if the producer wants to add data at the same time that the consumer wants to recieve it, i might get some trouble. I dont know if the class QQueue can be accesed by multiple threads at the same time.
    Is this the correct approach? should I do some changes?
    Thanks in advance

    1 Reply Last reply
    0
    • J.HilkJ Offline
      J.HilkJ Offline
      J.Hilk
      Moderators
      wrote on last edited by
      #2

      Hi,

      I'm fairly certain, that QQueue is not explicitly thread save, someone else might correct me here, therefore a QMutex is neccessary to prevent potential crashes.

      I would suggest using Signal/Slots to transfer data between 2 threads.
      Signal Slot Connections are thread save without you doing any extra work.


      Be aware of the Qt Code of Conduct, when posting : https://forum.qt.io/topic/113070/qt-code-of-conduct


      Q: What's that?
      A: It's blue light.
      Q: What does it do?
      A: It turns blue.

      1 Reply Last reply
      1
      • VRoninV Offline
        VRoninV Offline
        VRonin
        wrote on last edited by VRonin
        #3

        The semaphore is redundant here. It's still unclear how you pass the queue to the consumer class though.

        Reimplementing QThread, however, is almost never the right way to go (I know, the docs say you should do it but they are wrong)

        I'd use a worker object:

        #include <QObject>
        #include <QByteArray>
        class DataConsumer : public QObject
        {
            Q_OBJECT
            Q_DISABLE_COPY(DataConsumer)
        public:
            explicit DataConsumer(QObject *parent = nullptr)
                :QObject(parent)
            {}
        
        signals:
            void dataConsumed(int result); // return the result of the consumption 
        public slots:
            void enqueueData(const QByteArray& data){
                processData(data);
            }
        private:
            void processData(const QByteArray& data){
                // do something with data
                // send the result (here just the size) outside
                emit dataConsumed(data.size());
            }
        };
        

        now in the producer class define a signal like void rawDataRecived(const QByteArray& data);

        all you have to do now is

        consumerThread=new QThread(this);
        DataConsumer* worker=new DataConsumer;
        worker->moveToThread(consumerThread);
        connect(consumerThread,SIGNAL(finished()),worker,SLOT(deleteLater()));
        connect(this,SIGNAL(rawDataRecived(QByteArray)),worker,SLOT(enqueueData(QByteArray)));
        consumerThread->start();
        

        now you can emit rawDataRecived from the producer to start the processing. The queue is there, it's just invisible, it's managed via the implicit Qt::QueuedConnection

        "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
        ~Napoleon Bonaparte

        On a crusade to banish setIndexWidget() from the holy land of Qt

        1 Reply Last reply
        3
        • D Offline
          D Offline
          darkp03
          wrote on last edited by
          #4

          Hi,

          Thanks for the answers. I was quite disappointed to know that reimplementing the run method isnt the correct way, but everywhere on internet says that.
          The thing is, that I dont completely understand the worker approach.
          On my program, I have one thread that recieves UDP packets every 2.5 ms or less. Each of those packets needs to be processed, (second thread) and then need to be resent to a different Ip (I do this on a third thread).
          The processing thread recieves data really often, so thats why I think that having a thread that runs all the times and recieves its information via a QQueue protected by a Mutex its better than, each time I recieve a UDP Packet, creating a worker , moving it to the thread, and starting the thread (from what I understood thats what I should do if I implement the worker approach)

          Im not interested on when the thread finishes, so theres no need to emit signals when a packet is procesed. I just need a thread that never stops, so that I can recieve data and process it at the same time.

          They way I do it is, i have a thread recieving UDP data. When a packet is recieved, it places it on a Qqueue (protected by a Mutex, and then I release a Semaphore.
          I have a second thread, with reimplemented run function, that all the time is waiting for this semaphore, when it manages to .acquire() it, it takes a packet from the Qqueue, and the process beings.
          It sounds pretty safe, and its working fine, nevertheless im getting the following error message
          "Cannot create children for a parent that is in a different thread
          Parent is QUdpSocket, parents thread is QThread, current thread is ClassEnvio"

          So even if its working im guessing that I am doing something wrong.

          Could someone give me some tips on how I should implement this by the worker approach?
          When i recieve an udp packet...
          Should I create a new worker and move it to a QThread?
          Cant I just pass the information to a single working QThread?
          Should I call the "start" method each time a packet arrives? Or should I ask each time if the thread is running, and only start it if its not?

          Regarding the "worker"...
          What should I have to place on the worker? Can i simply copy the code i had on my reimplemented run() method, and place it on the :process() method of the worker?

          The main thing that Im having trouble understading is the concept of starting and finishing the thread. I want a single thread that runs all the time on a while(1) (at least thats how i learnt at school that multiple threads where managed) instead of something that starts and finishes. My program manages many connections, each connection recieves packets every 10 or 20 ms, The amount of packets processed is extremely high, thats why I thought of a Queue instead of many single smaller workers.

          Any ideas would be appreciated

          Thanks

          kshegunovK VRoninV 2 Replies Last reply
          0
          • D darkp03

            Hi,

            Thanks for the answers. I was quite disappointed to know that reimplementing the run method isnt the correct way, but everywhere on internet says that.
            The thing is, that I dont completely understand the worker approach.
            On my program, I have one thread that recieves UDP packets every 2.5 ms or less. Each of those packets needs to be processed, (second thread) and then need to be resent to a different Ip (I do this on a third thread).
            The processing thread recieves data really often, so thats why I think that having a thread that runs all the times and recieves its information via a QQueue protected by a Mutex its better than, each time I recieve a UDP Packet, creating a worker , moving it to the thread, and starting the thread (from what I understood thats what I should do if I implement the worker approach)

            Im not interested on when the thread finishes, so theres no need to emit signals when a packet is procesed. I just need a thread that never stops, so that I can recieve data and process it at the same time.

            They way I do it is, i have a thread recieving UDP data. When a packet is recieved, it places it on a Qqueue (protected by a Mutex, and then I release a Semaphore.
            I have a second thread, with reimplemented run function, that all the time is waiting for this semaphore, when it manages to .acquire() it, it takes a packet from the Qqueue, and the process beings.
            It sounds pretty safe, and its working fine, nevertheless im getting the following error message
            "Cannot create children for a parent that is in a different thread
            Parent is QUdpSocket, parents thread is QThread, current thread is ClassEnvio"

            So even if its working im guessing that I am doing something wrong.

            Could someone give me some tips on how I should implement this by the worker approach?
            When i recieve an udp packet...
            Should I create a new worker and move it to a QThread?
            Cant I just pass the information to a single working QThread?
            Should I call the "start" method each time a packet arrives? Or should I ask each time if the thread is running, and only start it if its not?

            Regarding the "worker"...
            What should I have to place on the worker? Can i simply copy the code i had on my reimplemented run() method, and place it on the :process() method of the worker?

            The main thing that Im having trouble understading is the concept of starting and finishing the thread. I want a single thread that runs all the time on a while(1) (at least thats how i learnt at school that multiple threads where managed) instead of something that starts and finishes. My program manages many connections, each connection recieves packets every 10 or 20 ms, The amount of packets processed is extremely high, thats why I thought of a Queue instead of many single smaller workers.

            Any ideas would be appreciated

            Thanks

            kshegunovK Offline
            kshegunovK Offline
            kshegunov
            Moderators
            wrote on last edited by kshegunov
            #5

            @VRonin said in Proper way to Manage Information Between Threads:

            The semaphore is redundant here.

            No, it is not!!!

            @darkp03 said in Proper way to Manage Information Between Threads:

            I was quite disappointed to know that reimplementing the run method isnt the correct way, but everywhere on internet says that.

            Don't be, the internet is wrong. It's a valid thing to do if you don't need to use any of the signal-slot machinery. However your queue implementation looks a bit off. Consider this one:
            Producer(s):

            QByteArray data; //< Data to put in the queue
            
            inputNeeds.acquire();	// First wait for more data to be consumed if needed (do not overfill the queue).
            
            QMutexLocker lock(&queueMutex);
            queue.enqueue(data);
            queueAvailability.release();
            

            Consumer(s):

            queueAvailability.acquire();
            QMutexLocker lock(&queueMutex);
            if (queue.size() <= 0)
                return; //< So graceful exit is possible
            
            QByteArray data = queue.dequeue();
            inputNeeds.release();
            

            Where variables are:

            QSemaphore queueAvailability, inputNeeds; //< inputNeeds is initialized to some number (e.g. 10000) in the beginning - that's the maximum number of items the queue will hold so it doesn't overflow if the consumer(s) lag behind
            QMutex queueMutex;
            QQueue<QByteArray> queue;
            

            Read and abide by the Qt Code of Conduct

            1 Reply Last reply
            2
            • D darkp03

              Hi,

              Thanks for the answers. I was quite disappointed to know that reimplementing the run method isnt the correct way, but everywhere on internet says that.
              The thing is, that I dont completely understand the worker approach.
              On my program, I have one thread that recieves UDP packets every 2.5 ms or less. Each of those packets needs to be processed, (second thread) and then need to be resent to a different Ip (I do this on a third thread).
              The processing thread recieves data really often, so thats why I think that having a thread that runs all the times and recieves its information via a QQueue protected by a Mutex its better than, each time I recieve a UDP Packet, creating a worker , moving it to the thread, and starting the thread (from what I understood thats what I should do if I implement the worker approach)

              Im not interested on when the thread finishes, so theres no need to emit signals when a packet is procesed. I just need a thread that never stops, so that I can recieve data and process it at the same time.

              They way I do it is, i have a thread recieving UDP data. When a packet is recieved, it places it on a Qqueue (protected by a Mutex, and then I release a Semaphore.
              I have a second thread, with reimplemented run function, that all the time is waiting for this semaphore, when it manages to .acquire() it, it takes a packet from the Qqueue, and the process beings.
              It sounds pretty safe, and its working fine, nevertheless im getting the following error message
              "Cannot create children for a parent that is in a different thread
              Parent is QUdpSocket, parents thread is QThread, current thread is ClassEnvio"

              So even if its working im guessing that I am doing something wrong.

              Could someone give me some tips on how I should implement this by the worker approach?
              When i recieve an udp packet...
              Should I create a new worker and move it to a QThread?
              Cant I just pass the information to a single working QThread?
              Should I call the "start" method each time a packet arrives? Or should I ask each time if the thread is running, and only start it if its not?

              Regarding the "worker"...
              What should I have to place on the worker? Can i simply copy the code i had on my reimplemented run() method, and place it on the :process() method of the worker?

              The main thing that Im having trouble understading is the concept of starting and finishing the thread. I want a single thread that runs all the time on a while(1) (at least thats how i learnt at school that multiple threads where managed) instead of something that starts and finishes. My program manages many connections, each connection recieves packets every 10 or 20 ms, The amount of packets processed is extremely high, thats why I thought of a Queue instead of many single smaller workers.

              Any ideas would be appreciated

              Thanks

              VRoninV Offline
              VRoninV Offline
              VRonin
              wrote on last edited by
              #6

              @darkp03 said in Proper way to Manage Information Between Threads:

              creating a worker , moving it to the thread, and starting the thread (from what I understood thats what I should do if I implement the worker approach)

              That's not what I'm doing here

              consumerThread will run continuously until you call quit and the consumer will remain ready to receive more data all the time.

              Execute the block that creates the worker just once, then emit the rawDataRecived signal every time you get something via the socket the queue is handled by Qt's connections, you don't need to implement it yourself

              "La mort n'est rien, mais vivre vaincu et sans gloire, c'est mourir tous les jours"
              ~Napoleon Bonaparte

              On a crusade to banish setIndexWidget() from the holy land of Qt

              1 Reply Last reply
              1

              • Login

              • Login or register to search.
              • First post
                Last post
              0
              • Categories
              • Recent
              • Tags
              • Popular
              • Users
              • Groups
              • Search
              • Get Qt Extensions
              • Unsolved