Proper way to Manage Information Between Threads



  • Hi everybody.
    Im currently working on an aplication with several threads. Everything is working fine, but I should want to know if im doing things on the "correct way" or maybe what im doing is an exageration.

    I have a producer class, that has an QUdpSocket. It recieves data through UDP, does some processing, and sends it to the consumer thread. The consumer thread must be processing at all times, and if theres no more data just wait for it.
    I managed this with 1 QMutex, 1 QQueue and 1 QSemaphore

    I have the following variables that can be seen by either the QThread or the Producer

    QMutex MutexProtectionQueue;//Mutex ment to protect my queue
    QQueue<QByteArray> Queue; //This is the Queue of messages
    QSemaphore SyncSemaphore;//Semaphore to sync producer with consumer
    

    On my producer class I have the following code

    QByteArray Data;//This is the data that i want to process
    
    MutexProtectionQueue.lock(); //Protect the queue
    Queue.enqueue(Data);
    MutexProtectionQueue.unlock(); //release the queue
    SyncSemaphore.release(); //Activate consumer
    

    On my consumer class i have

    void ClassProducer::run() //Reimplemented run method
    {
         while(1):
         {
              SyncSemaphore.acquire();
              MutexProtectionQueue.lock();
              QByteArray Data=Queue.dequeue();
              MutexProtectionQueue.unlock();
    
              //Process my QByteArray
         }
    }
    

    My question, is that Mutex necesary? I am afraid that if the producer wants to add data at the same time that the consumer wants to recieve it, i might get some trouble. I dont know if the class QQueue can be accesed by multiple threads at the same time.
    Is this the correct approach? should I do some changes?
    Thanks in advance



  • Hi,

    I'm fairly certain, that QQueue is not explicitly thread save, someone else might correct me here, therefore a QMutex is neccessary to prevent potential crashes.

    I would suggest using Signal/Slots to transfer data between 2 threads.
    Signal Slot Connections are thread save without you doing any extra work.



  • The semaphore is redundant here. It's still unclear how you pass the queue to the consumer class though.

    Reimplementing QThread, however, is almost never the right way to go (I know, the docs say you should do it but they are wrong)

    I'd use a worker object:

    #include <QObject>
    #include <QByteArray>
    class DataConsumer : public QObject
    {
        Q_OBJECT
        Q_DISABLE_COPY(DataConsumer)
    public:
        explicit DataConsumer(QObject *parent = nullptr)
            :QObject(parent)
        {}
    
    signals:
        void dataConsumed(int result); // return the result of the consumption 
    public slots:
        void enqueueData(const QByteArray& data){
            processData(data);
        }
    private:
        void processData(const QByteArray& data){
            // do something with data
            // send the result (here just the size) outside
            emit dataConsumed(data.size());
        }
    };
    

    now in the producer class define a signal like void rawDataRecived(const QByteArray& data);

    all you have to do now is

    consumerThread=new QThread(this);
    DataConsumer* worker=new DataConsumer;
    worker->moveToThread(consumerThread);
    connect(consumerThread,SIGNAL(finished()),worker,SLOT(deleteLater()));
    connect(this,SIGNAL(rawDataRecived(QByteArray)),worker,SLOT(enqueueData(QByteArray)));
    consumerThread->start();
    

    now you can emit rawDataRecived from the producer to start the processing. The queue is there, it's just invisible, it's managed via the implicit Qt::QueuedConnection



  • Hi,

    Thanks for the answers. I was quite disappointed to know that reimplementing the run method isnt the correct way, but everywhere on internet says that.
    The thing is, that I dont completely understand the worker approach.
    On my program, I have one thread that recieves UDP packets every 2.5 ms or less. Each of those packets needs to be processed, (second thread) and then need to be resent to a different Ip (I do this on a third thread).
    The processing thread recieves data really often, so thats why I think that having a thread that runs all the times and recieves its information via a QQueue protected by a Mutex its better than, each time I recieve a UDP Packet, creating a worker , moving it to the thread, and starting the thread (from what I understood thats what I should do if I implement the worker approach)

    Im not interested on when the thread finishes, so theres no need to emit signals when a packet is procesed. I just need a thread that never stops, so that I can recieve data and process it at the same time.

    They way I do it is, i have a thread recieving UDP data. When a packet is recieved, it places it on a Qqueue (protected by a Mutex, and then I release a Semaphore.
    I have a second thread, with reimplemented run function, that all the time is waiting for this semaphore, when it manages to .acquire() it, it takes a packet from the Qqueue, and the process beings.
    It sounds pretty safe, and its working fine, nevertheless im getting the following error message
    "Cannot create children for a parent that is in a different thread
    Parent is QUdpSocket, parents thread is QThread, current thread is ClassEnvio"

    So even if its working im guessing that I am doing something wrong.

    Could someone give me some tips on how I should implement this by the worker approach?
    When i recieve an udp packet...
    Should I create a new worker and move it to a QThread?
    Cant I just pass the information to a single working QThread?
    Should I call the "start" method each time a packet arrives? Or should I ask each time if the thread is running, and only start it if its not?

    Regarding the "worker"...
    What should I have to place on the worker? Can i simply copy the code i had on my reimplemented run() method, and place it on the :process() method of the worker?

    The main thing that Im having trouble understading is the concept of starting and finishing the thread. I want a single thread that runs all the time on a while(1) (at least thats how i learnt at school that multiple threads where managed) instead of something that starts and finishes. My program manages many connections, each connection recieves packets every 10 or 20 ms, The amount of packets processed is extremely high, thats why I thought of a Queue instead of many single smaller workers.

    Any ideas would be appreciated

    Thanks


  • Qt Champions 2016

    @VRonin said in Proper way to Manage Information Between Threads:

    The semaphore is redundant here.

    No, it is not!!!

    @darkp03 said in Proper way to Manage Information Between Threads:

    I was quite disappointed to know that reimplementing the run method isnt the correct way, but everywhere on internet says that.

    Don't be, the internet is wrong. It's a valid thing to do if you don't need to use any of the signal-slot machinery. However your queue implementation looks a bit off. Consider this one:
    Producer(s):

    QByteArray data; //< Data to put in the queue
    
    inputNeeds.acquire();	// First wait for more data to be consumed if needed (do not overfill the queue).
    
    QMutexLocker lock(&queueMutex);
    queue.enqueue(data);
    queueAvailability.release();
    

    Consumer(s):

    queueAvailability.acquire();
    QMutexLocker lock(&queueMutex);
    if (queue.size() <= 0)
        return; //< So graceful exit is possible
    
    QByteArray data = queue.dequeue();
    inputNeeds.release();
    

    Where variables are:

    QSemaphore queueAvailability, inputNeeds; //< inputNeeds is initialized to some number (e.g. 10000) in the beginning - that's the maximum number of items the queue will hold so it doesn't overflow if the consumer(s) lag behind
    QMutex queueMutex;
    QQueue<QByteArray> queue;
    


  • @darkp03 said in Proper way to Manage Information Between Threads:

    creating a worker , moving it to the thread, and starting the thread (from what I understood thats what I should do if I implement the worker approach)

    That's not what I'm doing here

    consumerThread will run continuously until you call quit and the consumer will remain ready to receive more data all the time.

    Execute the block that creates the worker just once, then emit the rawDataRecived signal every time you get something via the socket the queue is handled by Qt's connections, you don't need to implement it yourself


Log in to reply
 

Looks like your connection to Qt Forum was lost, please wait while we try to reconnect.