QRunnable not blocking requests in QThreadPool
-
So I have a Python GUI (PySide6) that needs a worker to send HTTP requests and aggregate the results in a single dict (or other structure). I've been butting my head for a while on Signals before I realized that my worker simply does not wait for a response and dashes straight to the finishing emissions.
class RestWorkerSignals(QObject): started = Signal() finished = Signal() returned = Signal(dict) class SingleBayWorker(QRunnable): def __init__(self, i): super().__init__() self.i = i self.signals = RestWorkerSignals() def run(self): pydevd.settrace(suspend=False) self.signals.started.emit() bay_data = requests.get(f'http://{CONFIGS["MODBUS"]["HOST"]}/data/json?{self.i}').json() print(self.i, bay_data) self.signals.finished.emit() self.signals.returned.emit({str(self.i): bay_data})
The caller function looks like :
def refresh_ui(self): mutex = QMutex() results = {"bays": {}} def update_results(data): print("Updating:", data) with QMutexLocker(mutex): results["bays"].update(data) bay_workers = [SingleBayWorker(i) for i in range(1, 16)] for worker in bay_workers: worker.signals.started.connect(print("started")) worker.signals.finished.connect(print("finished")) worker.signals.returned.connect(update_results) self.threadpool.start(worker) self.threadpool.waitForDone()
My console just prints "started"/"finished" 15 times straight, tho when I step in the run() with a debugger to force some wait time, when I step out it goes into update_results() properly.
How can I get QThreadPool to properly wait for my requests to complete?
From what I found online, it's simpler to just use a regular threadpool so I ended up just instanciating a multiprocessing.pool.ThreadPool in my main and passing it to a worker handling all my requests :
def run(self): pydevd.settrace(suspend=False) self.signals.started.emit() results = { "position": get_current_position(), "bays": {} } def fetch(data): response = requests.get(data[0]) print("Fetched", data[1]) return (response.json(), data[1]) urls = [(f'http://{CONFIGS["MODBUS"]["HOST"]}/data/json?Bay{i}', i) for i in range(1, 16)] bays = self.pool.map(fetch, urls) for bay, i in bays: results["bays"][i] = bay self.signals.finished.emit(results)
But I'm afraid it might cause some issues around memory or race condition to have a separate threadpool from Qt's implementation.
Can't I stick to QThreadPool for this? Thank you!!
-
@NikolaQt said in QRunnable not blocking requests in QThreadPool:
My console just prints "started"/"finished" 15 times straight
I presume this not quite what you see? Certainly it misled me reading your question. Do you not see the
self.i
fromprint(self.i, bay_data)
in between them each time? And what you are reporting is that thebay_data
is empty?Please add a
print(self.i)
just above therequests.get()
line. Just to confirm your output has1,1 2,2 3,3 ...
rather than1,2,3 ... 1,2,3
so we're quite sure of behaviour.I don't know if this is relevant, but where is
requests
(which I assume is a singleQNetworkRequest
) defined (does it live once in main thread or in each sub-thread)? Actually, I don't understand what class is forrequests.get(f'http:...')
, could you give me a docs reference? -
EDIT: I'll leave this here, but I think the solution to my problem was the signal connection, using a DirectConnection for the returned signal seem to do the trick. Thanks for the help!
I didn't realize there's a Qt module for requests, perhaps it would play nicer with Qt's threadpool, tho since I'll have to implement some PyModbus calls in a pool too so I should ideally get to the root cause... which turns out wasn't from requests at all.
They say that asking a question is answering it, I've retyped this post like 3 times because of what I kept realizing while testing the minimal demo to share. I'll skip the details of all that I had wrong in the original post to keep it concise, but this example translates properly to my actual code :
import requests import pydevd import sys import traceback from PySide6.QtWidgets import QMainWindow, QApplication from PySide6.QtCore import QObject, QRunnable, Signal, QThreadPool, QMutex, QMutexLocker class MainWindow(QMainWindow): def __init__(self): super().__init__() self.threadpool = QThreadPool() self.refresh_ui() def refresh_ui(self): mutex = QMutex() results = {} def update_results(data): pydevd.settrace(suspend=False) print("Update:", data) with QMutexLocker(mutex): results.update(data) print("Updated:", results) workers = [SingleBayWorker(i) for i in range(1, 6)] for worker in workers: worker.signals.started.connect(lambda: print("started")) worker.signals.finished.connect(lambda: print("finished")) worker.signals.returned.connect(update_results) self.threadpool.start(worker) self.threadpool.waitForDone() self.parse_results(results) def parse_results(self, results): print("Results:", results) class RestWorkerSignals(QObject): started = Signal() finished = Signal() returned = Signal(dict) class SingleBayWorker(QRunnable): def __init__(self, i): super().__init__() self.i = i self.signals = RestWorkerSignals() def run(self): self.signals.started.emit() pydevd.settrace(suspend=False) data = requests.get(f'https://example.com').status_code print("Run:", self.i, data) self.signals.finished.emit() self.signals.returned.emit({str(self.i): data}) if __name__ == "__main__": try: app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec()) except Exception as e: print(traceback.format_exc()) sys.exit(1)
The output almost works :
Run: 3 200 Run: 1 200 Run: 4 200 Run: 2 200 Run: 5 200 Results: {} started started started started finished Update: {'3': 200} Updated: {'3': 200} started finished Update: {'1': 200} Updated: {'3': 200, '1': 200} finished Update: {'4': 200} Updated: {'3': 200, '1': 200, '4': 200} finished Update: {'2': 200} Updated: {'3': 200, '1': 200, '4': 200, '2': 200} finished Update: {'5': 200} Updated: {'3': 200, '1': 200, '4': 200, '2': 200, '5': 200}
But there's something I'm mismanaging for the thread. Looking at my app loop :
app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec())
The
run
functions and the "results" trigger within my MainWindow() instanciation, but myupdate_results
callbacks only run after theapp.exec()
is triggered.If I put my refresh_ui in a loop (as I have in my main code) with :
self.timer = QTimer(self) self.timer.timeout.connect(self.refresh_ui) self.timer.start(10)
The print outputs remains the same : run x n -> empty results -> update x n.
So I imagine it's something along the lines of my slots callbacks getting added to the end of the main thread's queue while the parse_results is ahead of them, but I'm a bit lost as to what is misimplemented here. I'm not sure if it's poor design from all the processing I have inside my MainWindow class, or if I'm just missing something to properly synchronize my executions.
Thanks a bunch <3
(resubmitted so it's not under a post reply)
-
@NikolaQt said in QRunnable not blocking requests in QThreadPool:
I didn't realize there's a Qt module for requests, perhaps it would play nicer with Qt's threadpool
QNetworkAccessManager uses event loop. No explicit thread pool required.
Heavily edited to skip to the point:
class MainWindow(QMainWindow): def refresh_ui(self): workers = [SingleBayWorker(i) for i in range(1, 6)]
SingleBayWorker instances are created in the GUI thread...
class SingleBayWorker(QRunnable): def __init__(self, i): self.signals = RestWorkerSignals()
...which means that RestWorkerSignals instances are also created in the GUI thread...
def run(self): self.signals.started.emit()
... but QRunnable.run() is called from a pool thread. The RestWorkerSignals instances maintain GUI thread affinity because they are not explicitly moved. As such, their concept of the correct way to deliver a signal to a GUI thread object for an automatic connection is wrong.
The symptom can be fixed by changing the connection type, or moving the signal-emitting objects. The architecture seems to be unnecessarily complicated anyway. For example, the mutex locker in update_results() is probably unnecessary, as queued signal delivery will serialize invocations of the slot anyway.