How to use MVC with PySerial and QtCharts?
-
Hello, I am new to Qt and I am working on a program that plot data from a dozen different COM ports. I think what I need is a Model and a View. I was able to display a QChartView in my main window, but I am having trouble understanding the documentation.
The data is a simple pressure(float), timestamp(datetime), and port_id(int). Each device is connected to a port 1-16 and using a command and PySerial I will retrieve the pressure and the timestamp of that measurement. That part I have down and can structure this output however necessary (numpy, dataframe, list, json..).
I believe I need to use one of the QXYMapper classes, or a QAbstractItemModel with a QChartView, But the documentation is so ambiguous or lacking I don't understand how to use them. I have also tried using the QSQLTableModel since I need to collect hours worth of data, it's going to be stored in a SQL database anyway. But I also did not understand the QtSQL documentation.
TL;DR I am collecting data from several serial devices (over ~2.5 hours) with custom classes and need to both store for later analysis and present this data in graphs in real-time. Is QXYMapper and QChartView the best solution for this? If not, what would you consider?
I have plot real time data with PyQtGraph and PySide6.QtCharts but it was little more than proof of concept with very poor implementation. Any assistance or input on this matter would be greatly appreciated. Thanks :)
# I have trimmed out lines I felt were not needed to express my issue. class Gauge: def __init__(self, port_id: int): self.com = Serial("COM4") self.port_id = port_id ... def read_pressure(self): ... press: float = self.com.send_command("pressure_cmd") return (self.port_id, press, datetime.now()) class GaugeCluster: def __init__(self): # Note: key == port_id self.cluster: List[Gauge] = [Gauge(1), Gauge(2), Gauge(3)] def read_pressure(self): pt_ls = [] for device in self.cluster: pt_ls.append(device.read_pressure()) return pt_ls # Note: will look like -> [(1, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 19, 630906), # (2, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 20, 630906), # (3, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 21, 630906), # (1, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 22, 630906), # (2, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 23, 630906), # ... etc..] class MainContent(QWidget): def __init__(self): QWidget.__init__(self) self.pressure_plot = QChart() self.pressure_plot.setTitle("Pressure") self.pressure_plot_view = QChartView(self.pressure_plot) self.tabs = QTabWidget() self.tabs.addTab(self.pressure_plot_view, "Pressure") grid = QGridLayout() grid.addWidget(self.tabs, 0, 0, 2, 1) grid.setRowStretch(1, 1) grid.setColumnStretch(0, 1) self.setLayout(grid) # noinspection PyAttributeOutsideInit class MainWindow(QMainWindow): def __init__(self): QMainWindow.__init__(self) self.setWindowTitle("System Pressure Control") ... # Menubar and Statusbar stuff # Main Content self.main_content = MainContent() # Window dimensions self.setCentralWidget(self.main_content) @Slot() def _update_statusbar_text(self, db_conn): self.db_conn_lbl.setText( f"Database: {'Connected' if db_conn else 'Disconnected'}" ) @Slot() def exit_app(self, checked): sys.exit() if __name__ == "__main__": try: app = QApplication(sys.argv) app.setStyle("Fusion") # ['Breeze', 'Oxygen', 'QtCurve', 'Windows', 'Fusion'] window = MainWindow() window.show() window.resize(700, 500) sys.exit(app.exec()) except Exception as e: logger.error(e)
-
Hello, I am new to Qt and I am working on a program that plot data from a dozen different COM ports. I think what I need is a Model and a View. I was able to display a QChartView in my main window, but I am having trouble understanding the documentation.
The data is a simple pressure(float), timestamp(datetime), and port_id(int). Each device is connected to a port 1-16 and using a command and PySerial I will retrieve the pressure and the timestamp of that measurement. That part I have down and can structure this output however necessary (numpy, dataframe, list, json..).
I believe I need to use one of the QXYMapper classes, or a QAbstractItemModel with a QChartView, But the documentation is so ambiguous or lacking I don't understand how to use them. I have also tried using the QSQLTableModel since I need to collect hours worth of data, it's going to be stored in a SQL database anyway. But I also did not understand the QtSQL documentation.
TL;DR I am collecting data from several serial devices (over ~2.5 hours) with custom classes and need to both store for later analysis and present this data in graphs in real-time. Is QXYMapper and QChartView the best solution for this? If not, what would you consider?
I have plot real time data with PyQtGraph and PySide6.QtCharts but it was little more than proof of concept with very poor implementation. Any assistance or input on this matter would be greatly appreciated. Thanks :)
# I have trimmed out lines I felt were not needed to express my issue. class Gauge: def __init__(self, port_id: int): self.com = Serial("COM4") self.port_id = port_id ... def read_pressure(self): ... press: float = self.com.send_command("pressure_cmd") return (self.port_id, press, datetime.now()) class GaugeCluster: def __init__(self): # Note: key == port_id self.cluster: List[Gauge] = [Gauge(1), Gauge(2), Gauge(3)] def read_pressure(self): pt_ls = [] for device in self.cluster: pt_ls.append(device.read_pressure()) return pt_ls # Note: will look like -> [(1, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 19, 630906), # (2, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 20, 630906), # (3, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 21, 630906), # (1, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 22, 630906), # (2, 5.00e-05, datetime.datetime(2021, 8, 17, 16, 19, 23, 630906), # ... etc..] class MainContent(QWidget): def __init__(self): QWidget.__init__(self) self.pressure_plot = QChart() self.pressure_plot.setTitle("Pressure") self.pressure_plot_view = QChartView(self.pressure_plot) self.tabs = QTabWidget() self.tabs.addTab(self.pressure_plot_view, "Pressure") grid = QGridLayout() grid.addWidget(self.tabs, 0, 0, 2, 1) grid.setRowStretch(1, 1) grid.setColumnStretch(0, 1) self.setLayout(grid) # noinspection PyAttributeOutsideInit class MainWindow(QMainWindow): def __init__(self): QMainWindow.__init__(self) self.setWindowTitle("System Pressure Control") ... # Menubar and Statusbar stuff # Main Content self.main_content = MainContent() # Window dimensions self.setCentralWidget(self.main_content) @Slot() def _update_statusbar_text(self, db_conn): self.db_conn_lbl.setText( f"Database: {'Connected' if db_conn else 'Disconnected'}" ) @Slot() def exit_app(self, checked): sys.exit() if __name__ == "__main__": try: app = QApplication(sys.argv) app.setStyle("Fusion") # ['Breeze', 'Oxygen', 'QtCurve', 'Windows', 'Fusion'] window = MainWindow() window.show() window.resize(700, 500) sys.exit(app.exec()) except Exception as e: logger.error(e)
@SIG_KILL You can use the following demo where:
- The DataProvider (which in your case collects the data) will send information every T seconds.
- The SqlManager that saves the data, has a model that has the last N samples and provides proxies for each sensor.
- The view associates each proxy with a string and displays it.
from functools import cached_property import random from PySide6.QtCore import QDateTime, QObject, QSortFilterProxyModel, Qt, QTimer, Signal from PySide6.QtSql import QSqlDatabase, QSqlQuery, QSqlQueryModel from PySide6.QtWidgets import QApplication, QMainWindow from PySide6.QtCharts import ( QChartView, QDateTimeAxis, QLineSeries, QValueAxis, QVXYModelMapper, ) def create_connection(): db = QSqlDatabase.addDatabase("QSQLITE") db.setDatabaseName("test.db") if not db.open(): print(db.lastError().text()) return False return True class DataProvider(QObject): date_changed = Signal(str, float, QDateTime) @cached_property def timer(self): return QTimer(interval=500, timeout=self.handle_timeout) def start(self): self.timer.start() @cached_property def id_sensors(self): return [f"sensor_{i}" for i in range(5)] def handle_timeout(self): for id_sensor in self.id_sensors: value = random.uniform(0, 10) dt = QDateTime.currentDateTime() self.date_changed.emit(id_sensor, value, dt) class FilterProxyModel(QSortFilterProxyModel): def data(self, index, role=Qt.DisplayRole): value = super().data(index, role) if index.column() == 3: return QDateTime.fromString(value, Qt.ISODate).toMSecsSinceEpoch() return value class SqlManager(QObject): range_changed = Signal(QDateTime, QDateTime) def __init__(self, parent=None): super().__init__(parent) self.create_table() self.refresh_model() @cached_property def sql_model(self): return QSqlQueryModel() def proxy_model_by_sensor(self, sensor_id): column_sensor = self.sql_model.record().indexOf("id_sensor") proxy_model = FilterProxyModel(self) proxy_model.setSourceModel(self.sql_model) proxy_model.setFilterFixedString(sensor_id) proxy_model.setFilterKeyColumn(column_sensor) return proxy_model def create_table(self): query = QSqlQuery( """ CREATE TABLE IF NOT EXISTS Sensors ( id INTEGER PRIMARY KEY AUTOINCREMENT, id_sensor INTEGER NOT NULL, value REAL NOT NULL, dt DATETIME NOT NULL ) """ ) if not query.exec(): print(query.lastError().text()) def insert_data(self, id_sensor, value, dt): query = QSqlQuery() query.prepare( """INSERT INTO Sensors (id_sensor, value, dt) VALUES (?, ?, datetime(?))""" ) query.addBindValue(id_sensor) query.addBindValue(value) query.addBindValue(dt) if not query.exec(): print(query.lastError().text()) self.refresh_model() def refresh_model(self): self.sql_model.setQuery("SELECT * FROM Sensors ORDER BY rowid DESC LIMIT 200") if self.sql_model.lastError().isValid(): print(self.sql_model.lastError().text()) return dt_max = QDateTime.fromString(self.sql_model.record(0).value("dt"), Qt.ISODate) dt_min = QDateTime.fromString( self.sql_model.record(self.sql_model.rowCount() - 1).value("dt"), Qt.ISODate ) self.range_changed.emit(dt_min, dt_max) class MainWindow(QMainWindow): def __init__(self, parent=None): super().__init__(parent) self.setCentralWidget(self.chart_view) self.axis_y.setRange(-1, 11) self.chart_view.chart().addAxis(self.axis_x, Qt.AlignBottom) self.chart_view.chart().addAxis(self.axis_y, Qt.AlignLeft) @cached_property def chart_view(self): return QChartView() @cached_property def axis_x(self): return QDateTimeAxis(format="dd-MM-yyyy h:mm:ss") @cached_property def axis_y(self): return QValueAxis() def add_sensor(self, model, name): series = QLineSeries(name=name) self.chart_view.chart().addSeries(series) series.attachAxis(self.axis_x) series.attachAxis(self.axis_y) model_mapper = QVXYModelMapper(self) model_mapper.setSeries(series) model_mapper.setModel(model) model_mapper.setXColumn(3) model_mapper.setYColumn(2) def update_range(self, dt_min, dt_max): self.axis_x.setRange(dt_min, dt_max) def main(): import sys app = QApplication(sys.argv) if not create_connection(): sys.exit(-1) data_provider = DataProvider() sql_manager = SqlManager() data_provider.date_changed.connect(sql_manager.insert_data) data_provider.start() view = MainWindow() for id_sensor in data_provider.id_sensors: proxy_model = sql_manager.proxy_model_by_sensor(id_sensor) view.add_sensor(proxy_model, id_sensor) sql_manager.range_changed.connect(view.update_range) view.resize(640, 480) view.show() sys.exit(app.exec()) if __name__ == "__main__": main()
-
@SIG_KILL You can use the following demo where:
- The DataProvider (which in your case collects the data) will send information every T seconds.
- The SqlManager that saves the data, has a model that has the last N samples and provides proxies for each sensor.
- The view associates each proxy with a string and displays it.
from functools import cached_property import random from PySide6.QtCore import QDateTime, QObject, QSortFilterProxyModel, Qt, QTimer, Signal from PySide6.QtSql import QSqlDatabase, QSqlQuery, QSqlQueryModel from PySide6.QtWidgets import QApplication, QMainWindow from PySide6.QtCharts import ( QChartView, QDateTimeAxis, QLineSeries, QValueAxis, QVXYModelMapper, ) def create_connection(): db = QSqlDatabase.addDatabase("QSQLITE") db.setDatabaseName("test.db") if not db.open(): print(db.lastError().text()) return False return True class DataProvider(QObject): date_changed = Signal(str, float, QDateTime) @cached_property def timer(self): return QTimer(interval=500, timeout=self.handle_timeout) def start(self): self.timer.start() @cached_property def id_sensors(self): return [f"sensor_{i}" for i in range(5)] def handle_timeout(self): for id_sensor in self.id_sensors: value = random.uniform(0, 10) dt = QDateTime.currentDateTime() self.date_changed.emit(id_sensor, value, dt) class FilterProxyModel(QSortFilterProxyModel): def data(self, index, role=Qt.DisplayRole): value = super().data(index, role) if index.column() == 3: return QDateTime.fromString(value, Qt.ISODate).toMSecsSinceEpoch() return value class SqlManager(QObject): range_changed = Signal(QDateTime, QDateTime) def __init__(self, parent=None): super().__init__(parent) self.create_table() self.refresh_model() @cached_property def sql_model(self): return QSqlQueryModel() def proxy_model_by_sensor(self, sensor_id): column_sensor = self.sql_model.record().indexOf("id_sensor") proxy_model = FilterProxyModel(self) proxy_model.setSourceModel(self.sql_model) proxy_model.setFilterFixedString(sensor_id) proxy_model.setFilterKeyColumn(column_sensor) return proxy_model def create_table(self): query = QSqlQuery( """ CREATE TABLE IF NOT EXISTS Sensors ( id INTEGER PRIMARY KEY AUTOINCREMENT, id_sensor INTEGER NOT NULL, value REAL NOT NULL, dt DATETIME NOT NULL ) """ ) if not query.exec(): print(query.lastError().text()) def insert_data(self, id_sensor, value, dt): query = QSqlQuery() query.prepare( """INSERT INTO Sensors (id_sensor, value, dt) VALUES (?, ?, datetime(?))""" ) query.addBindValue(id_sensor) query.addBindValue(value) query.addBindValue(dt) if not query.exec(): print(query.lastError().text()) self.refresh_model() def refresh_model(self): self.sql_model.setQuery("SELECT * FROM Sensors ORDER BY rowid DESC LIMIT 200") if self.sql_model.lastError().isValid(): print(self.sql_model.lastError().text()) return dt_max = QDateTime.fromString(self.sql_model.record(0).value("dt"), Qt.ISODate) dt_min = QDateTime.fromString( self.sql_model.record(self.sql_model.rowCount() - 1).value("dt"), Qt.ISODate ) self.range_changed.emit(dt_min, dt_max) class MainWindow(QMainWindow): def __init__(self, parent=None): super().__init__(parent) self.setCentralWidget(self.chart_view) self.axis_y.setRange(-1, 11) self.chart_view.chart().addAxis(self.axis_x, Qt.AlignBottom) self.chart_view.chart().addAxis(self.axis_y, Qt.AlignLeft) @cached_property def chart_view(self): return QChartView() @cached_property def axis_x(self): return QDateTimeAxis(format="dd-MM-yyyy h:mm:ss") @cached_property def axis_y(self): return QValueAxis() def add_sensor(self, model, name): series = QLineSeries(name=name) self.chart_view.chart().addSeries(series) series.attachAxis(self.axis_x) series.attachAxis(self.axis_y) model_mapper = QVXYModelMapper(self) model_mapper.setSeries(series) model_mapper.setModel(model) model_mapper.setXColumn(3) model_mapper.setYColumn(2) def update_range(self, dt_min, dt_max): self.axis_x.setRange(dt_min, dt_max) def main(): import sys app = QApplication(sys.argv) if not create_connection(): sys.exit(-1) data_provider = DataProvider() sql_manager = SqlManager() data_provider.date_changed.connect(sql_manager.insert_data) data_provider.start() view = MainWindow() for id_sensor in data_provider.id_sensors: proxy_model = sql_manager.proxy_model_by_sensor(id_sensor) view.add_sensor(proxy_model, id_sensor) sql_manager.range_changed.connect(view.update_range) view.resize(640, 480) view.show() sys.exit(app.exec()) if __name__ == "__main__": main()
@eyllanesc this is a fantastic example, and very understandable.
Thank you for this! :)