QThread and EventLoop: how can I wait for all the events (workers) to be done?



  • Hi,
    I've a working thread where I move some workers.
    I need to be able to wait that all of them are processed.
    How can I get the event loop of the working thread? Is there an easy way to wait until all its events are processed? Is there a limit on the size of the event loop?
    Cheers


  • Qt Champions 2018

    @mbruel said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    I need to be able to wait that all of them are processed.

    Why do you want to wait? Waiting in an event driven applications is a bad idea.
    Why not letting your main thread know that everything was processed?
    So, if your worker thread finished all jobs it can emit a signal to notify the main thread.



  • Well I'm using a special taskThread where I move Tasks that will just run one slot when they will be triggered from another thread ( not the gui one). After running that slots they will trigger their own destruction (deleteLater)

    So basically from my simulationThread, I'm creating Tasks in the heap, that I move to taskThread. I don't store a list of that Tasks in taskThread as I didn't see the interest but maybe that is what I could do.

    At one point I want to wait that all that Tasks have been processed. I know that they should all have been triggered and thus that there should be events in the EventLoop of the taskThread. I was thinking there might be a way to purge (wait for all the events are done) this EventLoop.

    Is the use case clear?

    I suppose I could store a list of all my Taks and make sure they are all processed but this means I will end up with an active while loop on the size of that list. This is not really pretty... Is there another way I could achieve this? using the EventLoop or not?


  • Qt Champions 2018

    This task looks more like something for QtConcurrent.



  • @mbruel said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    I was thinking there might be a way to purge (wait for all the events are done) this EventLoop.

    You can check if any pending event exists with QThread::currentThread()->eventDispatcher()->hasPendingEvents().
    I think this what you are looking for.

    You can check this at the end of the slot process and raise a signal to inform about empty event list.



  • @Christian-Ehrlicher said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    This task looks more like something for QtConcurrent.

    Well I've done this design cause I've already 2 ThreadPools for more heavy Computation tasks and I didn't want to create another one. Plus those specific Tasks will be triggered by a signal and I think the Threads of a ThreadPool don't have their event loop started. Am I wrong?

    @KroMignon said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    You can check if any pending event exists with QThread::currentThread()->eventDispatcher()->hasPendingEvents().
    I think this what you are looking for.
    You can check this at the end of the slot process and raise a signal to inform about empty event list.

    Yes thanks, this could be what I'm looking for.
    I guess instead of a signal I will rather use a QWaitCondition that I would wake at the end of the processing slot each time hasPendingEvents is empty.



  • QAbstractEventDispatcher::hasPendingEvents is marked as obsolete.
    What shall I use instead to achieve the same goal?

    Another question: will hasPendingEvents return true when I'm processing the last event (I mean inside the slot of the last event) or will it return 1?



  • @mbruel said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    I need to be able to wait that all of them are processed.

    Is there a safe way to have a shared counter that is incremented when a thread starts and decremented when it ends? Probably through some sort of atomic operation. Once that counter reaches 0 all threads are done.

    Edit: Maybe counter is in main thread and signals decrement on thread completion.
    Edit2: The last thread decrement signal would also check the counter is zero and emit another signal/call a function for what has to be done now that all threads are done.



  • @fcarney said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    Is there a safe way to have a shared counter that is incremented when a thread starts and decremented when it ends? Probably through some sort of atomic operation. Once that counter reaches 0 all threads are done.

    Yes. A semaphore can be used for this purpose.


  • Qt Champions 2017

    @fcarney said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    Is there a safe way to have a shared counter that is incremented when a thread starts and decremented when it ends? Probably through some sort of atomic operation. Once that counter reaches 0 all threads are done.

    Yes, and a pretty simple one:

    class SomeClass
    {
        int processedTasks;
    };
    
    void SomeClass::someMethod()
    {
        QObject * worker = new ...;
        QObject::connect(worker, &QObject::destroyed, this, [this] () -> void {
            processedTasks--;
            if (processedTasks == 0)
                emit finishedJob();
        });
    
        processedTasks++;
        worker->moveToThread(...);    
    }
    

    @Kent-Dorfman said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    Yes. A semaphore can be used for this purpose.

    That's an overkill. If you're only counting, then it's atomic integers, you don't need blocking nor the complexity of a semaphore.



  • @kshegunov said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    That's an overkill. If you're only counting, then it's atomic integers, you don't need blocking nor the complexity of a semaphore.

    What complexity? You don't HAVE to use the semaphore for blocking, and its access is guaranteed to be atomic. Simply initialize a counting semaphore with the max number of allowed threads and decrement when they are started, and increment when they complete.

    IMHO, either approach is equally valid. Just depends on your particular bias. I prefer the semaphore because it has the side-effect of being a lock if you want it to be.



  • @fcarney said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    Is there a safe way to have a shared counter that is incremented when a thread starts and decremented when it ends? Probably through some sort of atomic operation. Once that counter reaches 0 all threads are done.

    Hello,
    What I need to do is to wait that the event queue of my _eventThread is consumed. There is only one Thread where several hundreds workers have been moved and are supposed to be triggered.
    The thing is that I want to wait that first they are all started and then after that they are finished. The issue I have is to make sure they are all started. They will be started sequentially in the event loop of _eventThread. That is why I wanted to be able to interact directly on the event loop to know how many pending event it has and when it will have processed all. (knowing that no new events will be generated)

    For now, this what I've done:
    I'm holding a list of all the pending event Tasks. In my waitForDone method, I use a WaitCondition that is waken when an event Task finishes if the list is empty.
    It kind of seem to work.
    Any comments / advices?

    class LazyComputationManager
    {
    private:
       QThreadPool             _threadPool;
       
       QThread                 _eventThread;
       QList<ComputationTask*> _pendingEventTasks;
       QWaitCondition          _waitPendingEvents;
       QMutex                  _securePendingEvents;
       
    public:
    
       ComputationTask *_appendEventTask(ComputationTask *task)
       {
           task->moveToThread(&_eventThread);
           connect(task, &ComputationTask::eventTaskDone, this, &LazyComputationManager::handleEventTaskDone);
    
           QMutexLocker lockEventThread(&_securePendingEvents);
           _pendingEventTasks.append(task);
           return task;
       }
    
       void handleEventTaskDone(ComputationTask *task)
       {
           QMutexLocker lockEventThread(&_securePendingEvents);
           _pendingEventTasks.removeOne(task);
    
           if (_pendingEventTasks.isEmpty())
               _waitPendingEvents.wakeAll();
    
           emit task->readyForDestruction(); // this trigger the destruction of the task (deleteLater)
       }
    
       void waitForDone()
       {
           // this may trigger some event tasks (cloning or summation)
           _threadPool.waitForDone();
    
           _securePendingEvents.lock();
           if (_pendingEventTasks.size())
               _waitPendingEvents.wait(&_securePendingEvents);
           _securePendingEvents.unlock();
    
           // wait for potential new Tasks that have been triggered
           _threadPool.waitForDone();
       }
    };
    
    And basically, my ComputationTasks emit eventTaskDone at the end of the processing slot and they have readyForDestruction connected to deleteLater
    


  • Something to consider. Not related to threading.


  • Qt Champions 2017

    @mbruel said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    There is only one Thread where several hundreds workers have been moved and are supposed to be triggered.

    If all the workers are living in the same thread there's little point in having mutexes, wait conditions and other synchronization primitives; nor multiple workers for that matter. Just schedule a custom event through the thread's event loop and have one worker process them one by one.

    @Kent-Dorfman said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    What complexity?

    The implementation complexity they carry. A semaphore (depending on how it's implemented) is going to require a context switch into kernel space. For example, the simple implementation with a mutex, counter and a wait condition (which if memory serves me is how Qt implements them), means that when you manipulate the counter you're going to lock a mutex. If the mutex is a ordinary pthread mutex (i.e. a slow one) this is a kernel call each read/write of the counter. Even if you use a futex to spare some calls, on contention you're again going to require a context switch. Calling into the kernel is expensive, so it's not warranted if you can avoid it.

    You don't HAVE to use the semaphore for blocking, and its access is guaranteed to be atomic.

    True, I hadn't claimed otherwise.



  • @kshegunov said in QThread and EventLoop: how can I wait for all the events (workers) to be done?:

    If all the workers are living in the same thread there's little point in having mutexes, wait conditions and other synchronization primitives; nor multiple workers for that matter. Just schedule a custom event through the thread's event loop and have one worker process them one by one.

    Well by design of the application, I kind of need to have those multiple worker tasks as they will be triggered by more heavy computation tasks happening in a QThreadPool. When an heavy (threaded) computation finishes, a "light" pending event oriented one can be scheduled and it may trigger a new heavy computation.
    The thing is, I don't know when they will be triggered nor in what order. This is not a problem. It will happen at a particular time .

    At one point I need to be able to force all of them to happen and wait for them (first the heavy Tasks, then the event oriented ones, and finally again the heavy ones as the event oriented one can schedule 1 last).
    So the event Tasks will already have been triggered and thus be scheduled in the event loop of my _eventThread. I don't see how I could use another worker to "wait for them" or "make sure they have been processed"

    Your point about all the workers being living in the same QThread is true. But the call to waitForDone is done from another QThread. That is why I was thinking I needed Thread synchronisation and thus a WaitCondition.

    I'm not familiar with Semaphore.
    Could I use that instead? Would it be better? Basically, I need to have my "mainThread" to block on it until its value goes down to zero. So all the workers in _eventThread should be able to decrement the Semaphore while the main Thread is still waiting. Is it possible?



  • At the end, to make sure all the events are processed as others may be triggered by the last one, I've just update my waitForDone method using QThread::eventDispatcher()->hasPendingEvents()

    void waitForDone()
    {
        // this may trigger some event tasks (cloning or summation)
        _threadPool.waitForDone();
    
    
        _securePendingEvents.lock();
        if (_eventThread.eventDispatcher()->hasPendingEvents() || _pendingEventTasks.size())
        {
            if (!_waitPendingEvents.wait(&_securePendingEvents, 10000)) // 10sec as we might wait for heavy tasks
            {
                _securePendingEvents.unlock();
                qCritical() << "[MB_TRACE][LazyComputationManager::waitForDone] Timeout..., there are still : " << _pendingEventTasks.size();
                while(_eventThread.eventDispatcher()->hasPendingEvents() || _pendingEventTasks.size())
                {
                    ComputationTask *pendingTask = _pendingEventTasks.first();
                    qCritical() << "[MB_TRACE][LazyComputationManager::waitForDone] pendingTask "
                                << pendingTask->getTypeName() << ": " << pendingTask;
                    pendingTask->runAsEvent(nullptr);
                }
            }
            else
                _securePendingEvents.unlock();
        }
        else
            _securePendingEvents.unlock();
    
        // wait for potential new Tasks that have been triggered
        _threadPool.waitForDone();
    }
    

Log in to reply