Files
obdash/gui/main.py
T
justin f2308cd4eb P2: true multi-axis overlay + gauge view
gui/widgets.py:
- MultiAxisPlot -- overlay with one Y axis PER UNIT (psi/V/rpm/C/%), linked
  ViewBoxes on X, so mixed-scale signals are readable at true values
  (base left axis + up to 4 right axes).
- SinglePlot -- one shared axis for the Normalize (% of range) mode.
- ArcGauge -- 270deg arc gauge with peak tick + numeric readout, own dark bg.
- GaugeGrid -- scrollable grid of gauges.

gui/main.py:
- Graph page is now a multi-axis/single-axis sub-stack; Normalize toggles
  between true multi-axis (raw) and single-axis (%). curves map key->color;
  plot ops route to the active graph widget.
- Gauge View menu enabled (3rd center page); gauges update on tick with peak.
- Theme applies to both plot widgets; profile switch clears graphs/gauges.

Fix: ArcGauge QPen built via setCapStyle (the QPen(...cap=...) kwarg segfaults
PySide6). Validated headless: driving preset -> 6 unit groups across 5 axes,
gauge view renders, normalize round-trips, profile-switch clears cleanly.
obdcore + diagnostics tests still pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
2026-06-30 15:07:57 -04:00

789 lines
32 KiB
Python

"""ford-obd GUI -- vehicle-agnostic scanner shell.
Menu bar: File (captures) | Profile (vehicle profiles) | View | Help.
Toolbar: port / baud / mock / connect + per-profile preset buttons.
Left: PID browser (live values, confidence badges, checkboxes).
Center: stacked Graph view (pyqtgraph overlay) and Table view.
Vehicle data comes entirely from the active JSON profile (profiles/*.json),
so the app is not Ford-specific -- load another profile to scan another truck.
Runs against MockLink with no hardware (tick "Mock" + Connect).
"""
import os
import shutil
from PySide6 import QtCore, QtGui, QtWidgets
import pyqtgraph as pg
from obdcore import (list_profiles, profiles_dir, save_profile, load_profile,
export_csv, replay_csv, TimeSeriesStore)
from .controller import Controller
from .widgets import MultiAxisPlot, SinglePlot, GaugeGrid
PLOT_WINDOW_S = 60.0
REFRESH_MS = 100
CURVE_COLORS = [
"#e6194B", "#3cb44b", "#4363d8", "#f58231", "#911eb4", "#42d4f4",
"#f032e6", "#bfef45", "#fabed4", "#469990", "#9A6324", "#ffe119",
"#000075", "#a9a9a9", "#800000",
]
GROUP_ORDER = ["fuel", "ficm", "air", "engine", "driveline", "power", "misc"]
GROUP_LABEL = {"fuel": "Fuel / Injection", "ficm": "FICM", "air": "Air / Boost",
"engine": "Engine", "driveline": "Driveline", "power": "Power",
"misc": "Other"}
CONF_BADGE = {"verified": "", "doc": " [DOC]", "tentative": " [?]"}
THEMES = {"dark": ("#111", "#ccc"), "light": ("#fafafa", "#222")}
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
self.ctl = Controller()
self.curves = {}
self._color_i = 0
self._theme = "dark"
self._build_menubar()
self._build_connection_bar()
self._build_pid_browser()
self._build_diag_dock()
self._build_center()
self._build_statusbar()
self._refresh_title()
self._rebuild_for_profile()
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self._tick)
self.timer.setInterval(REFRESH_MS)
# ---------- menus ----------
def _build_menubar(self):
mb = self.menuBar()
filem = mb.addMenu("&File")
self._act(filem, "New Capture", self._new_capture, "Clear the current data buffers")
filem.addSeparator()
self.rec_act = self._act(filem, "Start Recording…", self._toggle_record,
"Stream samples to a CSV as they arrive")
self._act(filem, "Export Capture As… (CSV)", self._export_capture,
"Write the current in-memory buffers to a CSV")
self._act(filem, "Open Capture (Replay)…", self._open_capture,
"Load a recorded CSV and plot it (disconnect first)")
filem.addSeparator()
self._act(filem, "Quit", self.close, shortcut="Ctrl+Q")
self.profm = mb.addMenu("&Profile")
self._rebuild_profile_menu()
diagm = mb.addMenu("&Diagnostics")
self.read_dtc_act = self._act(diagm, "Read Codes", self._read_codes,
"Read stored / pending / permanent trouble codes")
self.clear_dtc_act = self._act(diagm, "Clear Codes…", self._clear_codes,
"Erase stored codes + freeze frame (mode 04)")
viewm = mb.addMenu("&View")
self.view_graph = self._act(viewm, "Graph View", lambda: self._set_view(0),
checkable=True)
self.view_table = self._act(viewm, "Table View", lambda: self._set_view(1),
checkable=True)
self.view_graph.setChecked(True)
self.view_gauge = self._act(viewm, "Gauge View", lambda: self._set_view(2),
checkable=True)
viewm.addSeparator()
self.show_pids = self._act(viewm, "Show PID Panel", self._toggle_pid_dock,
checkable=True)
self.show_pids.setChecked(True)
self.show_diag = self._act(viewm, "Show Diagnostics Panel", self._toggle_diag_dock,
checkable=True)
self.show_diag.setChecked(True)
self.norm_act = self._act(viewm, "Normalize Graph (% of range)",
self._sync_norm_from_menu, checkable=True)
viewm.addSeparator()
self.theme_act = self._act(viewm, "Light Theme", self._toggle_theme, checkable=True)
helpm = mb.addMenu("&Help")
self._act(helpm, "About ford-obd", self._about)
self._act(helpm, "PID Confidence Legend", self._legend)
self._act(helpm, "Active Profile Info", self._profile_info)
def _act(self, menu, text, slot, tip="", checkable=False, shortcut=None):
a = QtGui.QAction(text, self)
a.triggered.connect(lambda _=False: slot())
if tip:
a.setStatusTip(tip)
a.setCheckable(checkable)
if shortcut:
a.setShortcut(shortcut)
menu.addAction(a)
return a
def _rebuild_profile_menu(self):
self.profm.clear()
self._profile_group = QtGui.QActionGroup(self)
self._profile_group.setExclusive(True)
active = getattr(self.ctl.profile, "path", None)
for path, meta in list_profiles():
a = QtGui.QAction(meta.get("name", os.path.basename(path)), self)
a.setCheckable(True)
a.setChecked(active and os.path.abspath(path) == os.path.abspath(active))
a.triggered.connect(lambda _=False, p=path: self._load_profile(p))
self._profile_group.addAction(a)
self.profm.addAction(a)
self.profm.addSeparator()
self._act(self.profm, "Load Profile…", self._load_profile_dialog)
self._act(self.profm, "Import Profile…", self._import_profile)
self._act(self.profm, "Reload Active", self._reload_profile)
self._act(self.profm, "Edit Active Profile (JSON)…", self._edit_profile)
self._act(self.profm, "Export Active Profile As…", self._export_profile)
# ---------- toolbar ----------
def _build_connection_bar(self):
tb = QtWidgets.QToolBar("Connection")
tb.setMovable(False)
self.addToolBar(tb)
tb.addWidget(QtWidgets.QLabel(" Port "))
self.port_combo = QtWidgets.QComboBox()
self.port_combo.setMinimumWidth(180)
self._refresh_ports()
tb.addWidget(self.port_combo)
b = QtWidgets.QToolButton(); b.setText("↻"); b.clicked.connect(self._refresh_ports)
tb.addWidget(b)
tb.addWidget(QtWidgets.QLabel(" Baud "))
self.baud_edit = QtWidgets.QLineEdit("38400"); self.baud_edit.setFixedWidth(70)
tb.addWidget(self.baud_edit)
self.mock_chk = QtWidgets.QCheckBox("Mock"); tb.addWidget(self.mock_chk)
self.connect_btn = QtWidgets.QPushButton("Connect")
self.connect_btn.clicked.connect(self._toggle_connect)
tb.addWidget(self.connect_btn)
tb.addSeparator()
self._preset_tb = tb
self._preset_sep = tb.addSeparator()
self._preset_buttons = []
def _rebuild_preset_buttons(self):
for b in self._preset_buttons:
self._preset_tb.removeAction(b)
self._preset_buttons = []
for name in self.ctl.reg.preset_names():
btn = QtWidgets.QPushButton(name.capitalize())
btn.setEnabled(self.ctl.connected)
btn.clicked.connect(lambda _=False, n=name: self._apply_preset(n))
self._preset_buttons.append(self._preset_tb.addWidget(btn))
self._preset_buttons[-1].setProperty("presetbtn", True)
btn.setProperty("preset", True)
# ---------- PID browser ----------
def _build_pid_browser(self):
self.pid_dock = QtWidgets.QDockWidget("PIDs", self)
self.tree = QtWidgets.QTreeWidget()
self.tree.setColumnCount(2)
self.tree.setHeaderLabels(["Signal", "Value"])
self.tree.itemChanged.connect(self._on_item_changed)
self.pid_dock.setWidget(self.tree)
self.pid_dock.visibilityChanged.connect(
lambda vis: self.show_pids.setChecked(vis))
self.addDockWidget(QtCore.Qt.LeftDockWidgetArea, self.pid_dock)
def _populate_tree(self):
self.tree.blockSignals(True)
self.tree.clear()
self._items = {}
groups = {}
for p in self.ctl.reg.all():
g = groups.get(p.group)
if g is None:
g = QtWidgets.QTreeWidgetItem([GROUP_LABEL.get(p.group, p.group), ""])
g.setFlags(QtCore.Qt.ItemIsEnabled)
f = g.font(0); f.setBold(True); g.setFont(0, f)
groups[p.group] = g
it = QtWidgets.QTreeWidgetItem([f"{p.name}{CONF_BADGE.get(p.confidence,'')}", "--"])
it.setFlags(QtCore.Qt.ItemIsUserCheckable | QtCore.Qt.ItemIsEnabled)
it.setCheckState(0, QtCore.Qt.Unchecked)
it.setData(0, QtCore.Qt.UserRole, p.key)
it.setToolTip(0, f"{p.key} (mode {p.mode} {p.pid}) {p.unit} {p.notes}")
g.addChild(it)
self._items[p.key] = it
for gk in GROUP_ORDER:
if gk in groups:
self.tree.addTopLevelItem(groups[gk])
for gk, g in groups.items(): # any custom groups not in GROUP_ORDER
if gk not in GROUP_ORDER:
self.tree.addTopLevelItem(g)
self.tree.expandAll()
self.tree.resizeColumnToContents(0)
self.tree.blockSignals(False)
# ---------- diagnostics dock (DTCs) ----------
def _build_diag_dock(self):
self.diag_dock = QtWidgets.QDockWidget("Diagnostics", self)
wrap = QtWidgets.QWidget()
lay = QtWidgets.QVBoxLayout(wrap)
lay.setContentsMargins(4, 4, 4, 4)
bar = QtWidgets.QHBoxLayout()
self.diag_read_btn = QtWidgets.QPushButton("Read Codes")
self.diag_read_btn.clicked.connect(self._read_codes)
self.diag_clear_btn = QtWidgets.QPushButton("Clear Codes…")
self.diag_clear_btn.clicked.connect(self._clear_codes)
bar.addWidget(self.diag_read_btn)
bar.addWidget(self.diag_clear_btn)
bar.addStretch(1)
lay.addLayout(bar)
self.diag_tree = QtWidgets.QTreeWidget()
self.diag_tree.setColumnCount(3)
self.diag_tree.setHeaderLabels(["Code", "Description", "System"])
self.diag_tree.setRootIsDecorated(True)
self.diag_tree.header().setStretchLastSection(False)
self.diag_tree.header().setSectionResizeMode(
1, QtWidgets.QHeaderView.Stretch)
lay.addWidget(self.diag_tree)
self.diag_hint = QtWidgets.QLabel(
"Connect, then Read Codes. Bold red = no-start / drive-disabling.")
self.diag_hint.setWordWrap(True)
lay.addWidget(self.diag_hint)
self.diag_dock.setWidget(wrap)
self.diag_dock.visibilityChanged.connect(
lambda vis: self.show_diag.setChecked(vis))
self.addDockWidget(QtCore.Qt.RightDockWidgetArea, self.diag_dock)
_DIAG_GROUPS = [("stored", "Stored (mode 03)"),
("pending", "Pending (mode 07)"),
("permanent", "Permanent (mode 0A)")]
def _populate_diag(self, codes):
"""codes: {'stored':[...], 'pending':[...], 'permanent':[...]}"""
self.diag_tree.clear()
total = 0
for bucket, label in self._DIAG_GROUPS:
lst = codes.get(bucket, [])
total += len(lst)
top = QtWidgets.QTreeWidgetItem([f"{label} ({len(lst)})", "", ""])
f = top.font(0); f.setBold(True); top.setFont(0, f)
top.setFirstColumnSpanned(False)
self.diag_tree.addTopLevelItem(top)
if not lst:
none = QtWidgets.QTreeWidgetItem(["—", "(no codes)", ""])
none.setForeground(1, QtGui.QBrush(QtGui.QColor("#888")))
top.addChild(none)
for code in lst:
d = self.ctl.dtcdb.get(code)
it = QtWidgets.QTreeWidgetItem([code, d.desc, d.system])
it.setData(0, QtCore.Qt.UserRole, code)
if getattr(d, "no_start", False):
red = QtGui.QBrush(QtGui.QColor("#e6194B"))
bf = it.font(0); bf.setBold(True)
for c in range(3):
it.setFont(c, bf)
it.setForeground(c, red)
it.setToolTip(0, "No-start / drive-disabling fault")
top.addChild(it)
top.setExpanded(True)
self.diag_tree.resizeColumnToContents(0)
self.diag_tree.resizeColumnToContents(2)
return total
def _read_codes(self):
if not self.ctl.connected:
QtWidgets.QMessageBox.information(
self, "Not connected", "Connect (or tick Mock) before reading codes.")
return
try:
codes = self.ctl.read_dtcs()
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Read failed", str(e))
return
total = self._populate_diag(codes)
self.diag_dock.setVisible(True)
self.show_diag.setChecked(True)
self.status.showMessage(
f"Read codes: {total} found "
f"(stored {len(codes.get('stored', []))}, "
f"pending {len(codes.get('pending', []))}, "
f"permanent {len(codes.get('permanent', []))})."
if total else "Read codes: none stored.")
def _clear_codes(self):
if not self.ctl.connected:
QtWidgets.QMessageBox.information(
self, "Not connected", "Connect (or tick Mock) before clearing codes.")
return
btn = QtWidgets.QMessageBox.warning(
self, "Clear codes?",
"This erases stored + pending codes AND freeze-frame data, and "
"resets emissions monitors.\n\n"
"Write the codes down first — and read them on a no-start before "
"clearing. If the fault is still present the code comes right back.\n"
"Permanent codes (mode 0A) will NOT clear until the fault is fixed "
"and the vehicle self-clears them over several drive cycles.\n\n"
"Clear codes now?",
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.No)
if btn != QtWidgets.QMessageBox.Yes:
self.status.showMessage("Cancelled. No codes cleared.")
return
try:
ok = self.ctl.clear_dtcs()
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Clear failed", str(e))
return
if not ok:
QtWidgets.QMessageBox.warning(
self, "No acknowledgement",
"The ECU did not acknowledge the clear.\n"
"Make sure the key is in RUN and the vehicle is connected, then "
"try again.")
self.status.showMessage("Clear not acknowledged by ECU.")
return
# Re-read immediately so anything that came straight back is shown.
try:
codes = self.ctl.read_dtcs()
except Exception:
codes = {}
returned = self._populate_diag(codes)
if returned:
self.status.showMessage(
f"Cleared — but {returned} code(s) returned immediately "
"(active fault present).")
else:
self.status.showMessage("Cleared. No codes on re-read.")
def _toggle_diag_dock(self):
self.diag_dock.setVisible(self.show_diag.isChecked())
# ---------- center (graph + table stack) ----------
def _build_center(self):
self.stack = QtWidgets.QStackedWidget()
# graph page: a sub-stack of multi-axis (raw) and single-axis (normalized)
gpage = QtWidgets.QWidget(); gl = QtWidgets.QVBoxLayout(gpage)
gl.setContentsMargins(4, 4, 4, 4)
bar = QtWidgets.QHBoxLayout()
self.norm_chk = QtWidgets.QCheckBox("Normalize (% of range)")
self.norm_chk.toggled.connect(self._set_normalize)
bar.addWidget(self.norm_chk)
bar.addWidget(QtWidgets.QLabel(" (off = true multi-axis, one Y scale per unit)"))
bar.addStretch(1)
bar.addWidget(QtWidgets.QLabel(f"window: {int(PLOT_WINDOW_S)}s"))
gl.addLayout(bar)
self.multi = MultiAxisPlot()
self.single = SinglePlot()
self.graph_stack = QtWidgets.QStackedWidget()
self.graph_stack.addWidget(self.multi) # index 0 = multi-axis
self.graph_stack.addWidget(self.single) # index 1 = normalized
gl.addWidget(self.graph_stack)
self.stack.addWidget(gpage) # center index 0 = graph
# table page (center index 1)
self.table = QtWidgets.QTableWidget(0, 6)
self.table.setHorizontalHeaderLabels(["Signal", "Value", "Unit", "Min", "Max", "Conf"])
self.table.horizontalHeader().setStretchLastSection(True)
self.table.setEditTriggers(QtWidgets.QAbstractItemView.NoEditTriggers)
self.stack.addWidget(self.table)
# gauge page (center index 2)
self.gauges = GaugeGrid()
self.stack.addWidget(self.gauges)
self.setCentralWidget(self.stack)
self._apply_theme()
def _graph(self):
"""The active graph widget (multi-axis unless Normalize is on)."""
return self.single if self.norm_chk.isChecked() else self.multi
def _build_statusbar(self):
self.status = self.statusBar()
self.status.showMessage("Not connected. Pick a port (or Mock) and Connect.")
# ---------- profile lifecycle ----------
def _rebuild_for_profile(self):
self._populate_tree()
self._rebuild_preset_buttons()
self._populate_table_rows()
self._refresh_title()
def _refresh_title(self):
self.setWindowTitle(f"ford-obd — {self.ctl.profile.name}")
def _load_profile(self, path):
if self.ctl.connected:
QtWidgets.QMessageBox.information(self, "Disconnect first",
"Disconnect before switching vehicle profiles.")
self._rebuild_profile_menu()
return
try:
self.ctl.load_profile(path)
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Profile load failed", str(e))
return
self.curves.clear(); self._color_i = 0
self.multi.clear(); self.single.clear(); self.gauges.rebuild([])
self._rebuild_for_profile()
self._rebuild_profile_menu()
self.status.showMessage(f"Loaded profile: {self.ctl.profile.name}")
def _load_profile_dialog(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(
self, "Load vehicle profile", profiles_dir(), "Profiles (*.json)")
if path:
self._load_profile(path)
def _import_profile(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(
self, "Import vehicle profile", "", "Profiles (*.json)")
if not path:
return
dest = os.path.join(profiles_dir(), os.path.basename(path))
try:
load_profile(path) # validate before copying in
shutil.copyfile(path, dest)
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Import failed", str(e))
return
self._load_profile(dest)
def _reload_profile(self):
if getattr(self.ctl.profile, "path", None):
self._load_profile(self.ctl.profile.path)
def _export_profile(self):
path, _ = QtWidgets.QFileDialog.getSaveFileName(
self, "Export active profile", profiles_dir(), "Profiles (*.json)")
if path:
save_profile(self.ctl.profile, path)
self.status.showMessage(f"Exported profile to {path}")
def _edit_profile(self):
p = getattr(self.ctl.profile, "path", None)
if not p:
return
dlg = JsonEditDialog(p, self)
if dlg.exec() == QtWidgets.QDialog.Accepted and not self.ctl.connected:
self._load_profile(p)
def _profile_info(self):
m = self.ctl.profile.meta
text = "\n".join(f"{k}: {v}" for k, v in m.items())
QtWidgets.QMessageBox.information(self, "Active profile", text or "(no metadata)")
# ---------- connection ----------
def _refresh_ports(self):
self.port_combo.clear()
try:
from obdcore.link import find_ports
ports = find_ports()
except Exception:
ports = []
for p in ports:
self.port_combo.addItem(f"{p.device} ({p.description})", p.device)
if not ports:
self.port_combo.addItem("(no ports found)", None)
def _toggle_connect(self):
if self.ctl.connected:
self._disconnect(); return
port = self.port_combo.currentData()
try:
baud = int(self.baud_edit.text())
except ValueError:
baud = 38400
try:
ok = self.ctl.connect(port=port, baud=baud, mock=self.mock_chk.isChecked())
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Connect failed", str(e)); return
self.ctl.start(); self.timer.start()
self.connect_btn.setText("Disconnect")
for b in self.findChildren(QtWidgets.QPushButton):
if b.property("preset"):
b.setEnabled(True)
kind = "MOCK" if self.mock_chk.isChecked() else "ELM327"
self.status.showMessage(f"Connected ({kind}) protocol "
f"{getattr(self.ctl.link,'protocol','?')} "
f"{'(ECU answered)' if ok else '(no 0100 ack — key to RUN?)'}")
names = self.ctl.reg.preset_names()
if names:
self._apply_preset(names[0])
def _disconnect(self):
self.timer.stop()
for key in list(self.curves):
self._remove_curve(key)
self.ctl.stop()
self.rec_act.setText("Start Recording…")
self.tree.blockSignals(True)
for it in self._items.values():
it.setCheckState(0, QtCore.Qt.Unchecked); it.setText(1, "--")
self.tree.blockSignals(False)
self.connect_btn.setText("Connect")
for b in self.findChildren(QtWidgets.QPushButton):
if b.property("preset"):
b.setEnabled(False)
self.status.showMessage("Disconnected.")
# ---------- PID selection ----------
def _apply_preset(self, name):
if not self.ctl.connected:
return
wanted = set(self.ctl.reg.presets.get(name, []))
self.tree.blockSignals(True)
for key, it in self._items.items():
it.setCheckState(0, QtCore.Qt.Checked if key in wanted else QtCore.Qt.Unchecked)
self.tree.blockSignals(False)
for key in self._items:
self._sync_key(key)
def _on_item_changed(self, item, col):
if col == 0:
key = item.data(0, QtCore.Qt.UserRole)
if key:
self._sync_key(key)
def _sync_key(self, key):
checked = self._items[key].checkState(0) == QtCore.Qt.Checked
has = key in self.curves
if checked and not has:
if self.ctl.connected:
self.ctl.subscribe(key)
self._add_curve(key)
elif not checked and has:
self.ctl.unsubscribe(key)
self._remove_curve(key)
def _add_curve(self, key):
p = self.ctl.reg.get(key)
color = CURVE_COLORS[self._color_i % len(CURVE_COLORS)]; self._color_i += 1
self.curves[key] = color # curves maps key -> color
self._graph().add_curve(key, f"{p.name} ({p.unit})", p.unit, color)
self._refresh_gauges()
def _remove_curve(self, key):
if self.curves.pop(key, None) is None:
return
self.multi.remove_curve(key)
self.single.remove_curve(key)
self._refresh_gauges()
def _set_normalize(self, on):
"""Swap between true multi-axis (raw) and single-axis (% of range)."""
self.norm_act.setChecked(on)
self.multi.clear(); self.single.clear()
self.graph_stack.setCurrentIndex(1 if on else 0)
active = self._graph()
for key, color in self.curves.items():
p = self.ctl.reg.get(key)
active.add_curve(key, f"{p.name} ({p.unit})", p.unit, color)
self._apply_theme()
self._redraw_curves(static=not self.ctl.connected)
def _refresh_gauges(self):
specs = []
for key, color in self.curves.items():
p = self.ctl.reg.get(key)
specs.append((key, p.name, p.unit, p.vmin, p.vmax, color))
self.gauges.rebuild(specs)
# ---------- view ----------
def _set_view(self, idx):
self.stack.setCurrentIndex(idx)
self.view_graph.setChecked(idx == 0)
self.view_table.setChecked(idx == 1)
self.view_gauge.setChecked(idx == 2)
def _toggle_pid_dock(self):
self.pid_dock.setVisible(self.show_pids.isChecked())
def _sync_norm_from_menu(self):
self.norm_chk.setChecked(self.norm_act.isChecked())
def _toggle_theme(self):
self._theme = "light" if self.theme_act.isChecked() else "dark"
self._apply_theme()
def _apply_theme(self):
bg, _fg = THEMES[self._theme]
self.multi.set_background(bg)
self.single.set_background(bg)
def _populate_table_rows(self):
pids = self.ctl.reg.all()
self.table.setRowCount(len(pids))
self._table_row = {}
for r, p in enumerate(pids):
self._table_row[p.key] = r
self.table.setItem(r, 0, QtWidgets.QTableWidgetItem(p.name))
self.table.setItem(r, 1, QtWidgets.QTableWidgetItem("--"))
self.table.setItem(r, 2, QtWidgets.QTableWidgetItem(p.unit))
self.table.setItem(r, 3, QtWidgets.QTableWidgetItem("--"))
self.table.setItem(r, 4, QtWidgets.QTableWidgetItem("--"))
self.table.setItem(r, 5, QtWidgets.QTableWidgetItem(p.confidence))
self.table.resizeColumnsToContents()
# ---------- captures ----------
def _new_capture(self):
self.ctl.store.clear()
import time
self.ctl.t0 = time.time()
self.status.showMessage("New capture — buffers cleared.")
def _toggle_record(self):
if self.ctl.store.recorder is None:
path, _ = QtWidgets.QFileDialog.getSaveFileName(
self, "Record capture to", "", "CSV (*.csv)")
if not path:
return
self.ctl.record(path)
self.rec_act.setText("Stop Recording")
self.status.showMessage(f"Recording to {path}")
else:
self.ctl.stop_record()
self.rec_act.setText("Start Recording…")
self.status.showMessage("Recording stopped.")
def _export_capture(self):
path, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Export capture", "", "CSV (*.csv)")
if path:
export_csv(self.ctl.store, path)
self.status.showMessage(f"Exported capture to {path}")
def _open_capture(self):
if self.ctl.connected:
QtWidgets.QMessageBox.information(self, "Disconnect first",
"Disconnect before replaying a capture."); return
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open capture", "", "CSV (*.csv)")
if not path:
return
store = TimeSeriesStore()
replay_csv(path, store)
self.ctl.store = store
for key in list(self.curves):
self._remove_curve(key)
import time
ts = [t for s in store.snapshot().values() for t, _ in s]
self.ctl.t0 = min(ts) if ts else time.time()
self.tree.blockSignals(True)
for key in store.keys():
if key in self._items:
self._items[key].setCheckState(0, QtCore.Qt.Checked)
self._add_curve(key)
self.tree.blockSignals(False)
self._redraw_curves(static=True)
self.status.showMessage(f"Replay: {os.path.basename(path)} ({len(ts)} samples)")
# ---------- help ----------
def _about(self):
QtWidgets.QMessageBox.about(self, "About ford-obd",
"ford-obd — vehicle-agnostic OBD-II scanner\n\n"
"Open source. Vehicle data lives in JSON profiles you can add/share.\n"
"git.jpaul.io/justin/ford-obd")
def _legend(self):
QtWidgets.QMessageBox.information(self, "PID confidence",
"verified — multi-source or read on a real vehicle\n"
"[DOC] — documented in sources, not yet read on this vehicle\n"
"[?] — single-source / disputed scaling; sanity-check first")
# ---------- refresh ----------
def _redraw_curves(self, static=False):
if self.ctl.t0 is None:
return
if static:
since = None
else:
since = (self.ctl.t0 or 0) + max(0.0, self.ctl.now() - PLOT_WINDOW_S)
normalize = self.norm_chk.isChecked()
active = self._graph()
for key in self.curves:
p = self.ctl.reg.get(key)
xs, ys = [], []
for t, v in self.ctl.store.channel(key).series(since=since):
if v is None:
continue
xs.append(t - self.ctl.t0)
if normalize and p.vmax != p.vmin:
ys.append((v - p.vmin) / (p.vmax - p.vmin) * 100.0)
else:
ys.append(v)
active.set_data(key, xs, ys)
if normalize:
self.single.set_y_label("% of range")
def _tick(self):
if not self.ctl.connected:
return
self.tree.blockSignals(True)
for key, it in self._items.items():
v = self.ctl.store.latest(key)
p = self.ctl.reg.get(key)
txt = "--" if v is None else f"{v:g} {p.unit}".strip()
it.setText(1, txt)
if key in self._table_row:
r = self._table_row[key]
lo, hi = self.ctl.store.minmax(key)
self.table.item(r, 1).setText("--" if v is None else f"{v:g}")
self.table.item(r, 3).setText("--" if lo is None else f"{lo:g}")
self.table.item(r, 4).setText("--" if hi is None else f"{hi:g}")
self.tree.blockSignals(False)
if self.stack.currentIndex() == 2: # gauge view
for key in self.curves:
lo, hi = self.ctl.store.minmax(key)
self.gauges.set_value(key, self.ctl.store.latest(key), peak=hi)
else:
self._redraw_curves()
def closeEvent(self, ev):
try:
self.timer.stop()
self.ctl.stop()
finally:
super().closeEvent(ev)
class JsonEditDialog(QtWidgets.QDialog):
"""Minimal raw-JSON editor for the active profile (validates on save)."""
def __init__(self, path, parent=None):
super().__init__(parent)
self.path = path
self.setWindowTitle(f"Edit profile — {os.path.basename(path)}")
self.resize(720, 560)
lay = QtWidgets.QVBoxLayout(self)
self.edit = QtWidgets.QPlainTextEdit()
self.edit.setFont(QtGui.QFont("monospace"))
with open(path) as f:
self.edit.setPlainText(f.read())
lay.addWidget(self.edit)
bb = QtWidgets.QDialogButtonBox(
QtWidgets.QDialogButtonBox.Save | QtWidgets.QDialogButtonBox.Cancel)
bb.accepted.connect(self._save); bb.rejected.connect(self.reject)
lay.addWidget(bb)
def _save(self):
import json
text = self.edit.toPlainText()
try:
json.loads(text) # syntax check
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Invalid JSON", str(e)); return
tmp = self.path + ".tmp"
with open(tmp, "w") as f:
f.write(text)
try:
load_profile(tmp) # schema/formula validation
except Exception as e:
os.remove(tmp)
QtWidgets.QMessageBox.critical(self, "Invalid profile", str(e)); return
os.replace(tmp, self.path)
self.accept()
def run():
import sys
app = QtWidgets.QApplication(sys.argv)
win = MainWindow()
win.resize(1150, 700)
win.show()
sys.exit(app.exec())
if __name__ == "__main__":
run()