"""ford-obd GUI -- P1 shell: connection bar, PID browser (side), live overlay plot. Checking a PID in the browser subscribes it (polls + plots); unchecking removes it. Preset buttons bulk-select. 'Normalize' overlays mixed-scale PIDs (ICP vs FICM) as % of each PID's range so they're all readable on one axis. Built to run against MockLink with no hardware -- pick "Mock" and Connect. """ import time from PySide6 import QtCore, QtGui, QtWidgets import pyqtgraph as pg from obdcore import PRESETS from .controller import Controller PLOT_WINDOW_S = 60.0 # seconds of history shown REFRESH_MS = 100 # GUI redraw rate (10 Hz) CURVE_COLORS = [ "#e6194B", "#3cb44b", "#4363d8", "#f58231", "#911eb4", "#42d4f4", "#f032e6", "#bfef45", "#fabed4", "#469990", "#9A6324", "#ffe119", "#000075", "#a9a9a9", "#800000", ] GROUP_ORDER = ["fuel", "ficm", "air", "engine", "driveline", "power", "misc"] GROUP_LABEL = { "fuel": "Fuel / Injection", "ficm": "FICM", "air": "Air / Boost", "engine": "Engine", "driveline": "Driveline", "power": "Power", "misc": "Other", } CONF_BADGE = {"verified": "", "doc": " [DOC]", "tentative": " [?]"} class MainWindow(QtWidgets.QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("ford-obd -- 6.0 Power Stroke scanner") self.resize(1100, 680) self.ctl = Controller() self.curves = {} # key -> PlotDataItem self._color_i = 0 self._build_connection_bar() self._build_pid_browser() self._build_plot() self._build_statusbar() self.timer = QtCore.QTimer(self) self.timer.timeout.connect(self._tick) self.timer.setInterval(REFRESH_MS) # ---- UI construction ---- def _build_connection_bar(self): tb = QtWidgets.QToolBar("Connection") tb.setMovable(False) self.addToolBar(tb) tb.addWidget(QtWidgets.QLabel(" Port ")) self.port_combo = QtWidgets.QComboBox() self.port_combo.setMinimumWidth(180) self._refresh_ports() tb.addWidget(self.port_combo) refresh = QtWidgets.QToolButton() refresh.setText("↻") refresh.setToolTip("Rescan serial ports") refresh.clicked.connect(self._refresh_ports) tb.addWidget(refresh) tb.addWidget(QtWidgets.QLabel(" Baud ")) self.baud_edit = QtWidgets.QLineEdit("38400") self.baud_edit.setFixedWidth(70) tb.addWidget(self.baud_edit) self.mock_chk = QtWidgets.QCheckBox("Mock") self.mock_chk.setToolTip("Use simulated data (no adapter needed)") tb.addWidget(self.mock_chk) self.connect_btn = QtWidgets.QPushButton("Connect") self.connect_btn.clicked.connect(self._toggle_connect) tb.addWidget(self.connect_btn) tb.addSeparator() for name in ("crank", "driving", "vitals"): b = QtWidgets.QPushButton(name.capitalize()) b.setToolTip(f"Select the '{name}' PID set") b.clicked.connect(lambda _=False, n=name: self._apply_preset(n)) b.setEnabled(False) b.setProperty("preset", True) tb.addWidget(b) def _build_pid_browser(self): dock = QtWidgets.QDockWidget("PIDs", self) dock.setAllowedAreas(QtCore.Qt.LeftDockWidgetArea | QtCore.Qt.RightDockWidgetArea) self.tree = QtWidgets.QTreeWidget() self.tree.setColumnCount(2) self.tree.setHeaderLabels(["Signal", "Value"]) self.tree.setRootIsDecorated(True) self.tree.setUniformRowHeights(True) self.tree.itemChanged.connect(self._on_item_changed) dock.setWidget(self.tree) self.addDockWidget(QtCore.Qt.LeftDockWidgetArea, dock) self._populate_tree() def _populate_tree(self): self.tree.blockSignals(True) self.tree.clear() self._items = {} # key -> QTreeWidgetItem groups = {} for p in self.ctl.reg.all(): g = groups.get(p.group) if g is None: g = QtWidgets.QTreeWidgetItem([GROUP_LABEL.get(p.group, p.group), ""]) g.setFlags(QtCore.Qt.ItemIsEnabled) f = g.font(0); f.setBold(True); g.setFont(0, f) groups[p.group] = g it = QtWidgets.QTreeWidgetItem([f"{p.name}{CONF_BADGE.get(p.confidence,'')}", "--"]) it.setFlags(QtCore.Qt.ItemIsUserCheckable | QtCore.Qt.ItemIsEnabled) it.setCheckState(0, QtCore.Qt.Unchecked) it.setData(0, QtCore.Qt.UserRole, p.key) it.setToolTip(0, f"{p.key} (mode {p.mode} {p.pid}) {p.unit} {p.notes}") g.addChild(it) self._items[p.key] = it for gk in GROUP_ORDER: if gk in groups: self.tree.addTopLevelItem(groups[gk]) self.tree.expandAll() self.tree.resizeColumnToContents(0) self.tree.blockSignals(False) def _build_plot(self): pg.setConfigOptions(antialias=True, background="#111", foreground="#ccc") central = QtWidgets.QWidget() lay = QtWidgets.QVBoxLayout(central) lay.setContentsMargins(4, 4, 4, 4) bar = QtWidgets.QHBoxLayout() self.norm_chk = QtWidgets.QCheckBox("Normalize (% of range)") self.norm_chk.setToolTip("Scale each curve to its min..max so mixed units " "(ICP vs FICM) are all readable on one axis") bar.addWidget(self.norm_chk) bar.addStretch(1) self.window_label = QtWidgets.QLabel(f"window: {int(PLOT_WINDOW_S)}s") bar.addWidget(self.window_label) lay.addLayout(bar) self.plot = pg.PlotWidget() self.plot.addLegend(offset=(10, 10)) self.plot.showGrid(x=True, y=True, alpha=0.25) self.plot.setLabel("bottom", "time", units="s") self.plot.setLabel("left", "value") lay.addWidget(self.plot) self.setCentralWidget(central) def _build_statusbar(self): self.status = self.statusBar() self.status.showMessage("Not connected. Pick a port (or Mock) and Connect.") # ---- connection ---- def _refresh_ports(self): self.port_combo.clear() try: from obdcore.link import find_ports ports = find_ports() except Exception: ports = [] for p in ports: self.port_combo.addItem(f"{p.device} ({p.description})", p.device) if not ports: self.port_combo.addItem("(no ports found)", None) def _toggle_connect(self): if self.ctl.connected: self._disconnect() return mock = self.mock_chk.isChecked() port = self.port_combo.currentData() try: baud = int(self.baud_edit.text()) except ValueError: baud = 38400 try: ok = self.ctl.connect(port=port, baud=baud, mock=mock) except Exception as e: QtWidgets.QMessageBox.critical(self, "Connect failed", str(e)) return self.ctl.start() self.timer.start() self.connect_btn.setText("Disconnect") self._set_presets_enabled(True) proto = getattr(self.ctl.link, "protocol", "?") kind = "MOCK" if mock else "ELM327" self.status.showMessage(f"Connected ({kind}) protocol {proto} " f"{'(ECU answered)' if ok else '(no 0100 ack - key to RUN?)'}") self._apply_preset("crank") def _disconnect(self): self.timer.stop() for key in list(self.curves): self._remove_curve(key) self.ctl.stop() # uncheck everything self.tree.blockSignals(True) for it in self._items.values(): it.setCheckState(0, QtCore.Qt.Unchecked) it.setText(1, "--") self.tree.blockSignals(False) self.connect_btn.setText("Connect") self._set_presets_enabled(False) self.status.showMessage("Disconnected.") def _set_presets_enabled(self, on): for b in self.findChildren(QtWidgets.QPushButton): if b.property("preset"): b.setEnabled(on) # ---- PID selection ---- def _apply_preset(self, name): if not self.ctl.connected: return wanted = set(PRESETS.get(name, [])) self.tree.blockSignals(True) for key, it in self._items.items(): want = key in wanted it.setCheckState(0, QtCore.Qt.Checked if want else QtCore.Qt.Unchecked) self.tree.blockSignals(False) # sync subscriptions/curves to the new check state for key in self._items: self._sync_key(key) def _on_item_changed(self, item, col): if col != 0: return key = item.data(0, QtCore.Qt.UserRole) if key: self._sync_key(key) def _sync_key(self, key): it = self._items[key] checked = it.checkState(0) == QtCore.Qt.Checked has = key in self.curves if checked and not has: if self.ctl.connected: self.ctl.subscribe(key) self._add_curve(key) elif not checked and has: self.ctl.unsubscribe(key) self._remove_curve(key) def _add_curve(self, key): p = self.ctl.reg.get(key) color = CURVE_COLORS[self._color_i % len(CURVE_COLORS)] self._color_i += 1 pen = pg.mkPen(color=color, width=2) curve = self.plot.plot([], [], name=f"{p.name} ({p.unit})", pen=pen) curve.setData([], []) self.curves[key] = curve def _remove_curve(self, key): curve = self.curves.pop(key, None) if curve is not None: self.plot.removeItem(curve) legend = self.plot.plotItem.legend if legend: try: legend.removeItem(curve) except Exception: pass # ---- periodic refresh ---- def _tick(self): if not self.ctl.connected: return now = self.ctl.now() since = (self.ctl.t0 or 0) + max(0.0, now - PLOT_WINDOW_S) normalize = self.norm_chk.isChecked() # update browser values self.tree.blockSignals(True) for key, it in self._items.items(): v = self.ctl.store.latest(key) p = self.ctl.reg.get(key) it.setText(1, "--" if v is None else f"{v:g} {p.unit}".strip()) self.tree.blockSignals(False) # update plotted curves for key, curve in self.curves.items(): p = self.ctl.reg.get(key) series = self.ctl.store.channel(key).series(since=since) xs, ys = [], [] for t, v in series: if v is None: continue xs.append(t - self.ctl.t0) if normalize and p.vmax != p.vmin: ys.append((v - p.vmin) / (p.vmax - p.vmin) * 100.0) else: ys.append(v) curve.setData(xs, ys) self.plot.setLabel("left", "% of range" if normalize else "value") def closeEvent(self, ev): try: self.timer.stop() self.ctl.stop() finally: super().closeEvent(ev) def run(): import sys app = QtWidgets.QApplication(sys.argv) win = MainWindow() win.show() sys.exit(app.exec()) if __name__ == "__main__": run()