Blocking Queue Between QThreads



  • Hello everyone,

    Suppose I have an application with two threads, first one (producer) produces some custom messages (a-ka commands with some additional data), while the second one (executor) is responsible for executing these commands.

    The command execution is essentially some transformation plus re-transmission of those via low-level interface (SPI). The protocol there is organised such that the executor will probably have to wait for response on SPI before it can continue processing other messages, that is why I need to queue the commands.

    I think that the usual solution using signals+slots mechanism is not suitable, since what I think I actually need is the Blocking queue - i.e. I need the second thread to block until I receive a command from the producer. At the same time, the executor has to block only for a certain amount of time and if there is no new commands in the queue for, e.g., 50 ms, it has to do some regular stuff.

    So, I think I need something like
    Producer:

    BlockingQueue.enqueue(command);
    

    Executor:

    if BlockingQueue.poll(50); // argument - timeout to wait
    { // There is a new message
        command = BlockingQueue.dequeue();
        // execute command, wait for response
    }
    else
    { // There were no commands for 50 ms
        // Do some regular stuff - e.g. send some regular request via SPI
    }
    

    There is some relevant topic for this question, and I think that it is possible to implement Blocking Queue as suggested here additionally using QSemaphore::tryAcquire(int n, int timeout).

    Is that the right approach in my case? Can the signals+slots mechanism be adapted to my use case?

    Thanks for advice in advance =)


  • Qt Champions 2018

    Maybe http://doc.qt.io/qt-5/qtcore-threads-waitconditions-example.html will help you to give you the basic idea how to implement it :)


  • Lifetime Qt Champion

    Hi,

    To add to @Christian-Ehrlicher, the QSemaphore example provides another possibility.


  • Qt Champions 2017

    @podkiva said in Blocking Queue Between QThreads:

    There is some relevant topic for this question, and I think that it is possible to implement Blocking Queue as suggested here additionally using QSemaphore::tryAcquire(int n, int timeout).

    Yes, you can use that particular example with QSemaphore::tryAcquire.

    Is that the right approach in my case?

    No clue, this is something you should decide by yourself.

    Can the signals+slots mechanism be adapted to my use case?

    Sure. The usual worker object approach can work here as well. You add a slot in the worker of the consumer to handle the new commands and the implementation is trivially putting them in a queue. You also keep some kind of state (e.g. a boolean flag) if you're currently processing a command, if not you go on to process the pending ones, if not start a timer with the timeout to notify you if you don't receive any command an in the slot you execute whatever is the idle operation. Something along those lines:

    class Worker : public QObject
    {
        Q_OBJECT
    
    public:
        Worker(QObject * parent = nullptr)
            : QObejct(parent), processing(false)
    
    public slots:
        void handleCommand(const Command & cmd)
        {
            pending.enqueue(cmd);
            QMetaObject::invokeMethod(this, &Worker::process);
        }
    
    protected:
         void process(const Command & cmd) // < doing the processing
         {
              procesing = true;
              // Do whatever it is you need ...
              // ...
         }
    
    protected slots:
         void process()
         {
             if (processing)
                 return; //< Do nothing, waiting for the current command to finish processing
    
             if (!pending.empty())
                 process(pending.dequeue())
             else
                 QTimer::singleShot(50, this, &Worker::doIdle)
         }
    
         void finishProcessing()
         {
              processing = false;
              QMetaObject::invokeMethod(this, &Worker::process);
         }
    
         void doIdle()
         {
             if (processing)
                 return; //< We started some kind of task, don't do idle processing
    
             // ...
             QMetaObject::invokeMethod(this, &Worker::doIdle);
         }
    
    private:
        QQueue<Command> pending;
        bool processing;
    };
    

    Bear in mind I don't ordinarily test example code. You would use the above as usual for worker objects, but there some details you should work out depending on your exact task.



  • @kshegunov Thank a lot for advices, it seems to me that the Semaphores solution is the most elegant in my case, since the signal+slots option still requires some extra workaround in my scenario.

    P.S. It seems you forgot to call finishProcessing() at the end of process(const Command & cmd) {...}.


  • Qt Champions 2017

    @podkiva said in Blocking Queue Between QThreads:

    P.S. It seems you forgot to call finishProcessing() at the end of process(const Command & cmd) {...}.

    No, it's not an oversight. I left it for you to call whenever appropriate, as you might be doing some asynchronous operation in process(const Command & cmd), like reading a QProcess output, or reading a socket or w/e. In that case you'd connect the signal signifying the end of that async operation to the mentioned slot to push the next command to be handled.