Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

Signaling threadpool finished without blocking UI + slot thread confusion



  • Hello,

    I'm quite new to Qt and Pyside so I'd like to get some thoughts on my application.

    In general I need to start several long running tasks and get informed that all work is done. It seems to me that QThreadPool with QRunnable could be the right tool for that. It does not have any signal for the end of exectution of all threads so I probably need to start another thread with waitForDone() if I do not want to block my UI. Following the "QThread done right" I created following code. Can you please tell me if it is a correct Qt way how to do such thing?

    Infortunately the code has also strange issue with thread/slots. The slots write() and write2() are fired in different threads based just on attribute type. If it is int then write2() is not executed in MainThread but in self._thread context. How can I control which thread is slot executed in? And why this depends on slot attribute type? (Bug in PySide?)

    Moreover when self.edit change is done from other thread than main one application segfaults. I thought that mutex should solve it but it's not the case...

    best regards
    Jan

    from PySide2 import QtCore, QtGui
    from PySide2.QtWidgets import *
    from PySide2.QtCore import *
    import random
    import time
    import threading
    
    
    class GenericWorker(QObject):
    
        def __init__(self, function, *args, **kwargs):
            super(GenericWorker, self).__init__()
    
            self.function = function
            self.args = args
            self.kwargs = kwargs
    
        result = Signal(object)
        finished = Signal()
    
        @Slot()
        def run(self):
            try:
                result = self.function(*self.args, **self.kwargs)
            except BaseException as ex:
                print(ex)
            else:
                self.result.emit(result)
            finally:
                self.finished.emit()
    
    
    class WorkerSignals(QObject):
        finished = Signal()
        error = Signal()
        result = Signal(object)
        progress = Signal(int)
    
    
    class Worker(QRunnable):
        def __init__(self, fn, *args, **kwargs):
            super(Worker, self).__init__()
    
            self.fn = fn
            self.args = args
            self.kwargs = kwargs
            self.signals = WorkerSignals()
            self.kwargs['progress_callback'] = self.signals.progress
    
        @Slot()
        def run(self):
            try:
                result = self.fn(*self.args, **self.kwargs)
            except BaseException as ex:
                print(ex)
                self.signals.error.emit()
            else:
                self.signals.result.emit(result)
            finally:
                self.signals.finished.emit()
    
    
    class Form(QDialog):
    
        def __init__(self, parent=None):
            super().__init__(parent)
            self.setWindowTitle("My Form")
    
            self.edit = QPlainTextEdit()
            self.button = QPushButton("Test")
    
            layout = QVBoxLayout()
            layout.addWidget(self.edit)
            layout.addWidget(self.button)
            self.setLayout(layout)
    
            self.threadpool = QtCore.QThreadPool()
    
            self.button.clicked.connect(self.run_threadpool)
    
            self.mtx = QMutex()
    
        def _write(self, text):
            self.mtx.lock()
            self.edit.moveCursor(QtGui.QTextCursor.End)
            self.edit.insertPlainText(text + f' {threading.current_thread().name}' + '\n')
            self.mtx.unlock()
    
        @Slot(str)  # runs in MainThread
        def write(self, text=''):
            text = f'w {text} {threading.current_thread().name}'
            self._write(text)
            print(text)
    
        @Slot(int)  # if the attribute is int, this does not run in MainThread
        def write2(self, i):
            text = f'w2 {i} {threading.current_thread().name}'
            self._write(text)
            print(text)
    
        def worker(self, id, *args, **kwargs):
            s = random.randint(1, 5)
            time.sleep(s)
            kwargs['progress_callback'].emit(id)
            return
    
        def _run_threadpool(self):
            print(f'_run_threadpool {threading.current_thread().name}')
            for i in range(5):
                w = Worker(self.worker, i)
                w.signals.result.connect(self.write)
                w.signals.progress.connect(self.write2)
                self.threadpool.start(w)
    
            self.threadpool.waitForDone()
    
        def run_threadpool(self):
            print(f'run_get_info {threading.current_thread().name}')
            self.edit.clear()
            self.edit.setDisabled(True)
    
            self._thread = QtCore.QThread()
            self._g_worker = GenericWorker(self._run_threadpool)
            self._g_worker.moveToThread(self._thread)
            self._thread.started.connect(self._g_worker.run)
            self._g_worker.finished.connect(lambda: self.edit.setDisabled(False))
            self._g_worker.finished.connect(self._thread.quit)
            self._g_worker.finished.connect(self._g_worker.deleteLater)
            self._thread.finished.connect(self._thread.deleteLater)
    
            self._thread.start()
    
    
    if __name__ == '__main__':
        app = QApplication()
    
        form = Form()
        form.show()
    
        app.exec_()
    

    output of this code is following:

    run_get_info MainThread
    _run_threadpool Dummy-1
    w None MainThread
    w None MainThread
    w None MainThread
    w None MainThread
    w None MainThread
    w2 0 Dummy-1
    w2 1 Dummy-1
    

  • Banned

    Perhaps I am not understanding your question can you please clarify. You state you want to start some process that the Main Application needs to wait on for completion -- that seems counter-intuitive to the purposes of launching a thread. Threads are created so that your Main Application need not wait while those threaded processes are running and thus can go about doing other things. So is the Main Application actually waiting and if yes then just call a normal function.

    Now as to getting a response I believe there is a triggered event that needs to be set up within the Main Application to receive this and I think that is the purpose of the WhenDone Signal/Slot thing -- note the proper terminology evades me right now but look it up its there.

    Lastly I do pyqt5 not pyside so some of the references might be a bit different but the main functionality I believe is the same.



  • @denni-0 I updated the wording of my question a bit, maybe it is now more understandable. I want to get a signal when all threads from threadpool were finished...


  • Banned

    Okay @xhpohanka first this statement "It does not have any signal for the end of exectution" is partially false as you can create a signal for anything including when the work is done -- here is something I came across when I was digging into threading:

    class WorkerSignals(QObject):
     #  Defines the signals available from a running worker thread.
     #  Supported signals are:
     #     WrksDone
     #        No data
     #     WrkError
     #        `tuple` (exctype, value, traceback.format_exc() )
     #     result
     #        `object` data returned from processing, anything
     #     WrkPrgrs
     #        `int` indicating % progress 
     #
        WrksDone = pyqtSignal()
        WrkError = pyqtSignal(tuple)
        WrkRsult = pyqtSignal(object)
        WrkPrgrs = pyqtSignal(int)
    
    class Worker(QRunnable):
     #  Worker thread
     #     Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
     #        :param callback: The function callback to run on this worker thread. Supplied 
     #                         args and kwargs will be passed through to the runner.
     #        :type callback: function
     #        :param args: Arguments to pass to the callback function
     #        :param kwargs: Keywords to pass to the callback function
     #
        def __init__(self, fn, *args, **kwargs):
            super(Worker, self).__init__()
            # Store constructor arguments (re-used for processing)
            self.fn = fn
            self.args = args
            self.kwargs = kwargs
            self.signals = WorkerSignals()    
    
            # Add the callback to our kwargs
            self.kwargs['progress_callback'] = self.signals.WrkPrgrs        
    
        @pyqtSlot()
        def run(self):
          # Initialise the runner function with passed args, kwargs.
          # Retrieve args/kwargs here; and fire processing using them
            try:
                WrkRsult = self.fn(*self.args, **self.kwargs)
            except:
                traceback.print_exc()
                exctype, value = sys.exc_info()[:2]
                self.signals.WrkError.emit((exctype, value, traceback.format_exc()))
            else:
                self.signals.WrkRsult.emit(WrkRsult)  # Return the results of the processing
            finally:
                self.signals.WrksDone.emit()      # Done
    
    # ... some where in your main
                # Pass the function to execute
                worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function
                worker.signals.WrkRsult.connect(self.print_output)
                worker.signals.WrksDone.connect(self.thread_complete)
                worker.signals.WrkPrgrs.connect(self.progress_fn)
                
                # Execute
                self.threadpool.start(worker)
    
      # If anyone wants more extensive free help I run an online lab-like classroom-like 
      # message server feel free and drop by you will not be able to post until I clear 
      # you as a student as this prevents spammers so if interested here is the invite
      # https://discord.gg/3D8huKC
    

    As you can see it has a Work Is Done Signal that it sends out -- I did not end up using this because I do not have a Work is Done state but maybe it would be useful to you -- I would give you the URL but I could not find it



  • @denni-0 Sorry but we do not understand each other. You have pasted almost exactly the same code as I'm using and showed it in first post. You are right that there is a signal for end of each thread, but my question is about getting a signal when ALL threads from threadpool are done.QThreadPoll provides function waitForDone() so actually I'm firing another thread with this waiting and emiitting a signal at the end. It works, but I'd like to know if there is some other/better way.


  • Banned

    Oh okay yes that was not clear in your initial post -- I am not aware of anything that works like what you ask about but that could easily be engineered by having a central counter flag that you use to keep track of a thread being opened and the threads as they close -- aka .. CurrentOpenThreads += 1 (when opening a new thread) and then CurrentOpenThreads -= 1 (when a thread signals that it is done) -- should be a fairly simple and straight forward mechanism to use within code.


Log in to reply