Important: Please read the Qt Code of Conduct - https://forum.qt.io/topic/113070/qt-code-of-conduct

Multiple Threads (with Paho MQTT) - UI Window now not refreshing



  • This is a follow on from a previous forum question: Qt Designer - avoiding 'different thread' type errors
    That problem was ostensibly solved though maybe not, as the display is not now refreshing!

    The signals, emits, and connects solution to the previous problem appear to be working ok. The MyApp UI window methods that change UI widgets (see code below) are correctly executing with arguments without apparent errors. Except the UI window is not updating after any widget value changes!
    If the window is recreated with a self.show() then the widgets are redrawn with the correct new values.
    The application reads and displays values from two different MQTT brokers.
    Am a little stuck now. Any assistance would be appreciated.
    New to PyQT5 it could easily be something simple.

    Have reproduced the entire code below apart from Qt Designer generated file and authentication details that are in a json file:

    import paho.mqtt.client as mqtt
    import json
    import base64
    import time
    import sys
    import threading
    import multiprocessing as mp
    import logging
    import binascii
    import array
    from PyQt5 import QtCore, QtGui, QtWidgets, uic
    from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot
    import pyqtgraph as pg

    class MyApp(QtWidgets.QMainWindow):

    lcd1Signal =  pyqtSignal(str)
    lcd2Signal =  pyqtSignal(str)
    plotSignal =  pyqtSignal(list)
    valveSignal = pyqtSignal(int)
    
    def __init__(self, *args, **kwargs):
        super(MyApp, self).__init__(*args, **kwargs)
        uic.loadUi("/media/sam/venvs/tankdisplay/levels.ui",self)
        bargraph = pg.BarGraphItem(x=list(range(-2,-62,-2)), height=[0]*30, width=1.5, brush='b')
        self.graphWidget.addItem(bargraph)
        self.graphWidget.setXRange(-60,-2, padding = 0.1)
        self.graphWidget.setYRange(0,10, padding = 0.1)
        self.graphWidget.setTitle("Bucket Tips")
        self.graphWidget.setLabel('left', 'Tips')
        self.graphWidget.setLabel('bottom', 'Minutes')
        self.graphWidget.setBackground((138,69,19))
        self.lcd1Signal.connect(self.lcdNumber1val)
        self.lcd2Signal.connect(self.lcdNumber2val)
        self.plotSignal.connect(self.plotvals)
        self.valveSignal.connect(self.valveval)
    
    
    @QtCore.pyqtSlot(str)
    def lcdNumber1val(self, val):
        # self.lcdNumber1.value(val)
        print("Tank Level = " + val)
        self.lcdNumber1.setProperty("value", val)
        percentage = int(val) * 100 / 300
        self.houseLevelProgress.setProperty("value", percentage)
    
    @QtCore.pyqtSlot(str)
    def lcdNumber2val(self, val):
        print("litres/hour = " + val)
        # self.lcdNumber2.display(val)
        self.lcdNumber2.setProperty("value", val)
    
    def valveval(self, val):
        self.valve.setStyleSheet("QProgressBar::chunk "
                                "{"
                                "background-color: red;"
                                "}")
        if (val == 0):  # 1 is open 0 is closed
            self.valve.setProperty("value", 100)
            self.valvelabel.setText("Closed")
        else:
            self.valve.setProperty("value", 0)
            self.valvelabel.setText("Open")
        self.valvelabel.adjustSize()
    
    @QtCore.pyqtSlot(list)
    def plotvals(self, y):
        bargraph = pg.BarGraphItem(x=list(range(-2,-62,-2)), height=y, width=1.5, brush='b')
        self.graphWidget.addItem(bargraph)
    

    def twoscomp(x):
    """This returns a 16-bit signed number (two's complement)"""
    if (0x8000 & x):
    x = - (0x010000 - x)
    return x

    class houseTank(QtCore.QObject):
    # def init(self):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        t = int(time.time())
        self.clientid = "housetank" + str(t)
        self.client = mqtt.Client(self.clientid)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe
        self.client.on_log = self.on_log
        self.client.on_disconnect = self.on_disconnect
        data = json.load(open("mqtt_brokers.jsn"))
        self.client.username_pw_set(data['mail']['username'], data['mail']['password'])
        self.client.connect(data['mail']['host'], data['mail']['port'], data['mail']['timeout'])
        self.myapp = MyApp()
        
    def start(self):
        print("housetank starting")
        self.client.loop_start()
        print("housetank loop started")
        # threading.Thread(target=self._init_bucle, daemon=True).start()
    
    def stop(self):
        print("housetank stopping")
        self.client.loop_stop()
        print("housetank loop stopped")
    
    def on_connect(self, mqtt_housetank, obj, flags, rc):
        print("rc: "+str(rc))
        print("housetank connected")
        self.client.subscribe("application/10/device/70b3d5cd000103ee/rx", 1)
    
    def on_message(self, mqtt_housetank, obj, msg):
        print("housetank message arrived")
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
        # try:
        data = json.loads(msg.payload.decode('utf-8'))
        if 'data' in data:
            result = base64.b64decode(str(data['data']) + "=====").hex()
            print(result[0:16])
            serial = int(result[0:2]+result[2:4], 16)
            depth = int(result[6:8]+result[8:10], 16)
            temp = int(result[10:12]+result[12:14], 16)/100.0
            bat = int(result[14:16], 16)/10
            print("depth = {:3d} cm".format(depth) + " temperature = {:.2f}".format(
                temp) + " deg C  battery = {:.1f}".format(bat) + " volts")
            self.myapp.lcd1Signal.emit("{0:<3d}".format(depth))
            payload = {'depth': depth, 'temperature': temp, 'battery': bat}
            topic = "application/10/device/" + str(serial)
            info = mqtt_housetank.publish(
                topic, json.dumps(payload).strip('"'))
        # except:
        #     print("data error")
    
    def on_publish(self, mqtt_housetank, obj, mid):
        print("mid: "+str(mid))
    
    
    def on_subscribe(self, mqtt_housetank, obj, mid, granted_qos):
        print("Housetank Subscribed: "+str(mid)+" "+str(granted_qos))
    
    
    def on_log(self, mqtt_housetank, obj, level, string):
        print(string)
    
    
    def on_disconnect(self, mqtt_housetank, userdata, rc=0):
        logging.debug("DisConnected result code "+str(rc))
        print("housetank disconnected")
        self.client.loop_stop()
    

    class roofWater(QtCore.QObject):
    # def init(self):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        t = int(time.time())
        self.clientid = "roofwater" + str(t)
        self.client = mqtt.Client(self.clientid)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe
        self.client.on_log = self.on_log
        self.client.on_disconnect = self.on_disconnect
        data = json.load(open("mqtt_brokers.jsn"))
        self.client.username_pw_set(data['noddy']['username'], data['noddy']['password'])
        self.client.connect(data['noddy']['host'], data['noddy']['port'], data['noddy']['timeout'])
        self.myapp = MyApp()
        
    def start(self):
        print("roofwater starting")
        self.client.loop_start()
        print("roofwater loop started")
        # threading.Thread(target=self._init_bucle, daemon=True).start()
    
    def stop(self):
        print("roofwater stopping")
        self.client.loop_stop()
        print("roofwater loop stopped")
    
    def on_connect(self, mqtt_roofwater, obj, flags, rc):
        print("rc: "+str(rc))
        self.client.subscribe("esp32/davis/litresperhour", 1)
    
    
    def on_message(self, mqtt_roofwater, obj, msg):
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
        try:
            data = json.loads(msg.payload.decode('utf-8'))
            if 'value' in data:
                print("roofwater = {0:<3d}".format(data['value']))
                # self.myapp.lcd2Signal.emit("{0:<3d}".format(data['value']))
                # Test value below
                self.myapp.lcd2Signal.emit(str(20))
        except:
            print("data error")
    
    
    def on_publish(self, mqtt_roofwater, obj, mid):
        print("mid: "+str(mid))
    
    
    def on_subscribe(self, mqtt_roofwater, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))
    
    
    def on_log(self, mqtt_roofwater, obj, level, string):
        print(string)
    
    
    def on_disconnect(self, mqtt_roofwater, userdata, rc=0):
        logging.debug("DisConnected result code "+str(rc))
        mqtt_roofwater.loop_stop()
    

    class valveStatus(QtCore.QObject):
    # def init(self):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        t = int(time.time())
        self.clientid = "valvestatus" + str(t)
        self.client = mqtt.Client(self.clientid)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe
        self.client.on_log = self.on_log
        self.client.on_disconnect = self.on_disconnect
        data = json.load(open("mqtt_brokers.jsn"))
        self.client.username_pw_set(data['noddy']['username'], data['noddy']['password'])
        self.client.connect(data['noddy']['host'], data['noddy']['port'], data['noddy']['timeout'])
        self.myapp = MyApp()
    
    def start(self):
        print("valvestatus starting")
        self.client.loop_start()
        print("valvestatus loop started")
        # threading.Thread(target=self._init_bucle, daemon=True).start()
    
    def stop(self):
        print("valvestatus stopping")
        self.client.loop_stop()
        print("valvestatus loop stopped")
    
    
    def on_connect(self, mqtt_valvestatus, obj, flags, rc):
        print("rc: "+str(rc))
        self.client.subscribe("esp32/valve/status/1", 1)
    
    
    def on_message(self, mqtt_valvestatus, obj, msg):
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
        try:
            data = json.loads(msg.payload.decode('utf-8'))
            if data is not None:
                print("Valvestatus = {:d}".format(data))
                self.myapp.valveSignal.emit(data)
        except:
            print("data error")
    
    
    def on_publish(self, mqtt_valvestatus, obj, mid):
        print("mid: "+str(mid))
    
    
    def on_subscribe(self, mqtt_valvestatus, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))
    
    
    def on_log(self, mqtt_valvestatus, obj, level, string):
        print(string)
    
    
    def on_disconnect(self, mqtt_valvestatus, userdata, rc=0):
        logging.debug("DisConnected result code "+str(rc))
        self.client.loop_stop()
    

    class lastHoursRain(QtCore.QObject):
    # def init(self):

    # plotSignal =  pyqtSignal(list)
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        t = int(time.time())
        self.clientid = "lasthoursrain" + str(t)
        self.client = mqtt.Client(self.clientid)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe
        self.client.on_log = self.on_log
        self.client.on_disconnect = self.on_disconnect
        data = json.load(open("mqtt_brokers.jsn"))
        self.client.username_pw_set(data['noddy']['username'], data['noddy']['password'])
        self.client.connect(data['noddy']['host'], data['noddy']['port'], data['noddy']['timeout'])
        self.myapp = MyApp()
    
    def start(self):
        print("lasthoursrain starting")
        self.client.loop_start()
        print("lasthoursrain loop started")
        # threading.Thread(target=self._init_bucle, daemon=True).start()
    
    def stop(self):
        print("lasthoursrain stopping")
        self.client.loop_stop()
        print("lasthoursrain loop stopped")
    
    
    def on_connect(self, mqtt_lasthoursrain, obj, flags, rc):
        print("rc: "+str(rc))
        self.client.subscribe("esp32/davis/lasthoursrain", 1)
    
    
    def on_message(self, mqtt_lasthoursrain, obj, msg):
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
        # try:
        data = json.loads(msg.payload.decode('utf-8'))
        if data is not None:
            rainlist = []
            for i in range(len(data)):
                rainlist.append(data[i])
            self.myapp.plotSignal.emit(rainlist)
            # myapp.plotvals(rainlist)
        # except:
            # print("data error")
    
    
    def on_publish(self, mqtt_lasthoursrain, obj, mid):
        print("mid: "+str(mid))
    
    
    def on_subscribe(self, mqtt_lasthoursrain, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))
    
    
    def on_log(self, mqtt_lasthoursrain, obj, level, string):
        print(string)
    
    
    def on_disconnect(self, mqtt_lasthoursrain, userdata, rc=0):
        logging.debug("DisConnected result code "+str(rc))
        self.client.loop_stop()
    

    if name == 'main':

    # task queue to overcome issue with paho when using multiple threads:
    #   https://github.com/eclipse/paho.mqtt.python/issues/354
    task_queue = mp.Queue()
    
    app = QtWidgets.QApplication(sys.argv)
    
    data = json.load(open("mqtt_brokers.jsn"))
    myapp = MyApp()
    myapp.lcdNumber1val("{0:<3d}".format(0))
    myapp.lcdNumber2val("{0:<3d}".format(0))
    myapp.valveval(1)  # Open
    myapp.show()
    
    mqtt_housetank = houseTank()
    mqtt_housetank.start()
    mqtt_roofwater = roofWater()
    mqtt_roofwater.start()
    mqtt_valvestatus = valveStatus()
    mqtt_valvestatus.start()
    mqtt_lasthoursrain = lastHoursRain()
    mqtt_lasthoursrain.start()
    print("mqtt clients are looping")
    
    sys.exit(app.exec_())
    
    # process all tasks on queue
    try:
        while True:
            task = task_queue.get()
            task()
    except (KeyboardInterrupt, SystemExit):
        print("Received keyboard interrupt, quitting ...")
        # mqtt_housetank.loop_stop()
        mqtt_housetank.stop()
        mqtt_roofwater.stop()
        mqtt_valvestatus.loop_stop()
        mqtt_lasthoursrain.stop()
        exit(0)

  • Lifetime Qt Champion

    Hi,

    You are interacting with two different MyApp objects.


  • Lifetime Qt Champion

    Hi,

    You are interacting with two different MyApp objects.



  • Hi SGaist,
    Thank you. That was a pretty fundamental class error you picked up, clearly nothing to do with Qt, so appreciate your help.
    Have reproduced the code again below, which seems to be working fine now. The different MQTT classes aren't really needed so I will refactor eventually, but at least it is an example of simple use of signal, connect, emit in later PyQt4/5, particularly as many tutorials out there are based on earlier versions of Qt.

    #!/media/sam/venvs/tankdisplay/bin/python3

    import paho.mqtt.client as mqtt
    import json
    import base64
    import time
    import sys
    import threading
    import multiprocessing as mp
    import logging
    import binascii
    import array
    from PyQt5 import QtCore, QtGui, QtWidgets, uic
    from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot
    import pyqtgraph as pg

    class MainWindow(QtWidgets.QMainWindow):

    lcd1Signal =  pyqtSignal(str)
    lcd2Signal =  pyqtSignal(str)
    plotSignal =  pyqtSignal(list)
    valveSignal = pyqtSignal(int)
    
    def __init__(self, *args, **kwargs):
        super(MainWindow, self).__init__(*args, **kwargs)
        uic.loadUi("/media/sam/venvs/tankdisplay/levels.ui",self)
        bargraph = pg.BarGraphItem(x=list(range(-2,-62,-2)), height=[0]*30, width=1.5, brush='b')
        self.graphWidget.addItem(bargraph)
        self.graphWidget.setXRange(-60,-2, padding = 0.1)
        self.graphWidget.setYRange(0,10, padding = 0.1)
        self.graphWidget.setTitle("Bucket Tips")
        self.graphWidget.setLabel('left', 'Tips')
        self.graphWidget.setLabel('bottom', 'Minutes')
        self.graphWidget.setBackground((138,69,19))
        self.lcdNumber1val("{0:<3d}".format(0))
        self.lcdNumber2val("{0:<3d}".format(0))
        self.valveval(1)  # Open
        self.lcd1Signal.connect(self.lcdNumber1val)
        self.lcd2Signal.connect(self.lcdNumber2val)
        self.plotSignal.connect(self.plotvals)
        self.valveSignal.connect(self.valveval)
    
    
    @QtCore.pyqtSlot(str)
    def lcdNumber1val(self, val):
        print("Tank Level = " + val)
        self.lcdNumber1.setProperty("value", val)
        percentage = int(val) * 100 / 300
        self.houseLevelProgress.setProperty("value", percentage)
    
    @QtCore.pyqtSlot(str)
    def lcdNumber2val(self, val):
        print("Litres/hour = " + val)
        # self.lcdNumber2.display(val)
        self.lcdNumber2.setProperty("value", val)
    
    @QtCore.pyqtSlot(int)
    def valveval(self, val):
        self.valve.setStyleSheet("QProgressBar::chunk "
                                "{"
                                "background-color: red;"
                                "}")
        if (val == 0):  # 1 is open 0 is closed
            self.valve.setProperty("value", 100)
            self.valvelabel.setText("Closed")
            print("Valve Closed")
        else:
            self.valve.setProperty("value", 0)
            self.valvelabel.setText("Open")
            print("Valve Open")
        self.valvelabel.adjustSize()
    
    @QtCore.pyqtSlot(list)
    def plotvals(self, y):
        bargraph = pg.BarGraphItem(x=list(range(-2,-62,-2)), height=y, width=1.5, brush='b')
        self.graphWidget.addItem(bargraph)
        self.graphWidget.repaint()
        # print("Bar graph updated")
    

    class houseTank(QtCore.QObject):
    def init(self, *args, **kwargs):
    super().init(*args, **kwargs)

        t = int(time.time())
        self.clientid = "housetank" + str(t)
        self.client = mqtt.Client(self.clientid)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe
        self.client.on_log = self.on_log
        self.client.on_disconnect = self.on_disconnect
        data = json.load(open("mqtt_brokers.jsn"))
        self.client.username_pw_set(data['mail']['username'], data['mail']['password'])
        self.client.connect(data['mail']['host'], data['mail']['port'], data['mail']['timeout'])
        
    def start(self):
        print("housetank starting")
        self.client.loop_start()
        print("housetank loop started")
    
    def stop(self):
        print("housetank stopping")
        self.client.loop_stop()
        print("housetank loop stopped")
    
    def on_connect(self, mqtt_housetank, obj, flags, rc):
        print("rc: "+str(rc))
        print("housetank connected")
        self.client.subscribe("application/10/device/70b3d5cd000103ee/rx", 1)
    
    def on_message(self, mqtt_housetank, obj, msg):
        print("housetank message arrived")
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
        try:
            data = json.loads(msg.payload.decode('utf-8'))
            if 'data' in data:
                result = base64.b64decode(str(data['data']) + "=====").hex()
                # print(result[0:16])
                serial = int(result[0:2]+result[2:4], 16)
                depth = int(result[6:8]+result[8:10], 16)
                temp = int(result[10:12]+result[12:14], 16)/100.0
                bat = int(result[14:16], 16)/10
                # print("depth = {:3d} cm".format(depth) + " temperature = {:.2f}".format(
                #     temp) + " deg C  battery = {:.1f}".format(bat) + " volts")
                mainwindow.lcd1Signal.emit("{0:<3d}".format(depth))
                payload = {'depth': depth, 'temperature': temp, 'battery': bat}
                topic = "application/10/device/" + str(serial)
                info = mqtt_housetank.publish(
                    topic, json.dumps(payload).strip('"'))
        except:
            print("data error")
    
    def on_publish(self, mqtt_housetank, obj, mid):
        print("mid: "+str(mid))
    
    
    def on_subscribe(self, mqtt_housetank, obj, mid, granted_qos):
        print("Housetank Subscribed: "+str(mid)+" "+str(granted_qos))
    
    
    def on_log(self, mqtt_housetank, obj, level, string):
        print(string)
    
    
    def on_disconnect(self, mqtt_housetank, userdata, rc=0):
        logging.debug("DisConnected result code "+str(rc))
        print("housetank disconnected")
        self.client.loop_stop()
    

    class roofWater(QtCore.QObject):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        t = int(time.time())
        self.clientid = "roofwater" + str(t)
        self.client = mqtt.Client(self.clientid)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe
        self.client.on_log = self.on_log
        self.client.on_disconnect = self.on_disconnect
        data = json.load(open("mqtt_brokers.jsn"))
        self.client.username_pw_set(data['noddy']['username'], data['noddy']['password'])
        self.client.connect(data['noddy']['host'], data['noddy']['port'], data['noddy']['timeout'])
        
    def start(self):
        print("roofwater starting")
        self.client.loop_start()
        print("roofwater loop started")
        # threading.Thread(target=self._init_bucle, daemon=True).start()
    
    def stop(self):
        print("roofwater stopping")
        self.client.loop_stop()
        print("roofwater loop stopped")
    
    def on_connect(self, mqtt_roofwater, obj, flags, rc):
        print("rc: "+str(rc))
        self.client.subscribe("esp32/davis/litresperhour", 1)
    
    
    def on_message(self, mqtt_roofwater, obj, msg):
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
        try:
            data = json.loads(msg.payload.decode('utf-8'))
            if 'value' in data:
                # print("roofwater = {0:<3d}".format(data['value']))
                mainwindow.lcd2Signal.emit("{0:<3d}".format(data['value']))
                # Test value below
                # mainwindow.lcd2Signal.emit(str(20))
        except:
            print("data error")
    
    
    def on_publish(self, mqtt_roofwater, obj, mid):
        print("mid: "+str(mid))
    
    
    def on_subscribe(self, mqtt_roofwater, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))
    
    
    def on_log(self, mqtt_roofwater, obj, level, string):
        print(string)
    
    
    def on_disconnect(self, mqtt_roofwater, userdata, rc=0):
        logging.debug("DisConnected result code "+str(rc))
        mqtt_roofwater.loop_stop()
    

    class valveStatus(QtCore.QObject):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        t = int(time.time())
        self.clientid = "valvestatus" + str(t)
        self.client = mqtt.Client(self.clientid)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe
        self.client.on_log = self.on_log
        self.client.on_disconnect = self.on_disconnect
        data = json.load(open("mqtt_brokers.jsn"))
        self.client.username_pw_set(data['noddy']['username'], data['noddy']['password'])
        self.client.connect(data['noddy']['host'], data['noddy']['port'], data['noddy']['timeout'])
    
    def start(self):
        print("valvestatus starting")
        self.client.loop_start()
        print("valvestatus loop started")
        # threading.Thread(target=self._init_bucle, daemon=True).start()
    
    def stop(self):
        print("valvestatus stopping")
        self.client.loop_stop()
        print("valvestatus loop stopped")
    
    
    def on_connect(self, mqtt_valvestatus, obj, flags, rc):
        print("rc: "+str(rc))
        self.client.subscribe("esp32/valve/status/1", 1)
    
    
    def on_message(self, mqtt_valvestatus, obj, msg):
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
        try:
            data = json.loads(msg.payload.decode('utf-8'))
            if data is not None:
                # print("Valvestatus = {:d}".format(data))
                mainwindow.valveSignal.emit(data)
        except:
            print("data error")
    
    
    def on_publish(self, mqtt_valvestatus, obj, mid):
        print("mid: "+str(mid))
    
    
    def on_subscribe(self, mqtt_valvestatus, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))
    
    
    def on_log(self, mqtt_valvestatus, obj, level, string):
        print(string)
    
    
    def on_disconnect(self, mqtt_valvestatus, userdata, rc=0):
        logging.debug("DisConnected result code "+str(rc))
        self.client.loop_stop()
    

    class lastHoursRain(QtCore.QObject):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        t = int(time.time())
        self.clientid = "lasthoursrain" + str(t)
        self.client = mqtt.Client(self.clientid)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe
        self.client.on_log = self.on_log
        self.client.on_disconnect = self.on_disconnect
        data = json.load(open("mqtt_brokers.jsn"))
        self.client.username_pw_set(data['noddy']['username'], data['noddy']['password'])
        self.client.connect(data['noddy']['host'], data['noddy']['port'], data['noddy']['timeout'])
    
    def start(self):
        print("lasthoursrain starting")
        self.client.loop_start()
        print("lasthoursrain loop started")
        # threading.Thread(target=self._init_bucle, daemon=True).start()
    
    def stop(self):
        print("lasthoursrain stopping")
        self.client.loop_stop()
        print("lasthoursrain loop stopped")
    
    
    def on_connect(self, mqtt_lasthoursrain, obj, flags, rc):
        print("rc: "+str(rc))
        self.client.subscribe("esp32/davis/lasthoursrain", 1)
    
    
    def on_message(self, mqtt_lasthoursrain, obj, msg):
        print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))
        try:
            data = json.loads(msg.payload.decode('utf-8'))
            if data is not None:
                rainlist = []
                for i in range(len(data)):
                    rainlist.append(data[i])
                mainwindow.plotSignal.emit(rainlist)
        except:
            print("data error")
    
    
    def on_publish(self, mqtt_lasthoursrain, obj, mid):
        print("mid: "+str(mid))
    
    
    def on_subscribe(self, mqtt_lasthoursrain, obj, mid, granted_qos):
        print("Subscribed: "+str(mid)+" "+str(granted_qos))
    
    
    def on_log(self, mqtt_lasthoursrain, obj, level, string):
        print(string)
    
    
    def on_disconnect(self, mqtt_lasthoursrain, userdata, rc=0):
        logging.debug("DisConnected result code "+str(rc))
        self.client.loop_stop()
    

    if name == 'main':

    # task queue to overcome issue with paho when using multiple threads:
    #   https://github.com/eclipse/paho.mqtt.python/issues/354
    task_queue = mp.Queue()
    
    app = QtWidgets.QApplication(sys.argv)
    
    data = json.load(open("mqtt_brokers.jsn"))
    
    mainwindow = MainWindow()
    mainwindow.show()
    
    
    mqtt_housetank = houseTank()
    mqtt_housetank.start()
    mqtt_roofwater = roofWater()
    mqtt_roofwater.start()
    mqtt_valvestatus = valveStatus()
    mqtt_valvestatus.start()
    mqtt_lasthoursrain = lastHoursRain()
    mqtt_lasthoursrain.start()
    
    print("mqtt clients are looping")
    
    sys.exit(app.exec_())
    
    # process all tasks on queue
    try:
        while True:
            task = task_queue.get()
            task()
    except (KeyboardInterrupt, SystemExit):
        print("Received keyboard interrupt, quitting ...")
        mqtt_housetank.stop()
        mqtt_roofwater.stop()
        mqtt_valvestatus.stop()
        mqtt_lasthoursrain.stop()
        exit(0)

Log in to reply