Make app vehicle-agnostic: JSON vehicle profiles + menu bar

Vehicle data is now DATA, not code. PIDs/scaling/DTCs/presets live in
profiles/*.json; the app loads them at runtime, so it works across vehicles
and others can contribute profiles (open source).

Core:
- obdcore/formula.py: safe AST evaluator for scaling formulas (A/B/... byte
  vars, Torque/FORScan convention). Only arithmetic/bitwise + min/max/abs/
  round/int/float; names/attrs/arbitrary calls rejected at load -> a community
  profile CANNOT execute code.
- obdcore/profile.py: load/save/list profiles; compiles each formula into a
  decode callable. registry.py now profile-backed (PidRegistry/DtcDatabase
  take a Profile); hardcoded Ford table removed.
- store.py: clear()/snapshot()/export_csv() for capture management.

Profiles:
- profiles/ford-6.0-powerstroke.json (27 PIDs, verified formulas, DTCs)
- profiles/generic-obd2.json (standard SAE Mode-01 base, any vehicle)
- profiles/README.md (schema + formula language + contributing)

GUI:
- Menu bar: File (new/record/export/replay capture, quit), Profile (switch/
  load/import/reload/edit-JSON/export, live profile list), View (Graph/Table
  views, gauges P2, toggle PID dock, normalize, light/dark theme), Help
  (about/confidence legend/profile info).
- PID browser + presets rebuild on profile switch; added Table view; raw-JSON
  profile editor dialog (validates schema+formulas before saving).

Tests: profiles load+compile, formula sandbox rejects hostile input, decoders
still match real truck bytes, crank/derived/dead-PID/replay -- all pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
2026-06-30 14:34:33 -04:00
parent 45691334e1
commit f3f0bf2a77
12 changed files with 966 additions and 295 deletions
Binary file not shown.

After

Width:  |  Height:  |  Size: 111 KiB

+11 -2
View File
@@ -6,7 +6,8 @@ a timer; the scheduler thread does the serial work.
""" """
import time import time
from obdcore import PidRegistry, TimeSeriesStore, PollScheduler, CsvRecorder from obdcore import (PidRegistry, DtcDatabase, TimeSeriesStore, PollScheduler,
CsvRecorder, load_default, load_profile)
from obdcore.mock import MockLink from obdcore.mock import MockLink
# default poll rates (Hz) -- fast for the no-start metrics, slower for the rest # default poll rates (Hz) -- fast for the no-start metrics, slower for the rest
@@ -17,13 +18,21 @@ FAST_HZ = 5
class Controller: class Controller:
def __init__(self): def __init__(self):
self.reg = PidRegistry() self.profile = load_default()
self.reg = PidRegistry(self.profile)
self.dtcdb = DtcDatabase(self.profile)
self.store = TimeSeriesStore() self.store = TimeSeriesStore()
self.link = None self.link = None
self.sched = None self.sched = None
self.t0 = None self.t0 = None
self.connected = False self.connected = False
def load_profile(self, path):
"""Switch the active vehicle profile (only allowed while disconnected)."""
self.profile = load_profile(path)
self.reg = PidRegistry(self.profile)
self.dtcdb = DtcDatabase(self.profile)
def connect(self, port=None, baud=38400, mock=False): def connect(self, port=None, baud=38400, mock=False):
if mock: if mock:
self.link = MockLink(clock=time.time) self.link = MockLink(clock=time.time)
+420 -133
View File
@@ -1,111 +1,182 @@
"""ford-obd GUI -- P1 shell: connection bar, PID browser (side), live overlay plot. """ford-obd GUI -- vehicle-agnostic scanner shell.
Checking a PID in the browser subscribes it (polls + plots); unchecking removes Menu bar: File (captures) | Profile (vehicle profiles) | View | Help.
it. Preset buttons bulk-select. 'Normalize' overlays mixed-scale PIDs (ICP vs Toolbar: port / baud / mock / connect + per-profile preset buttons.
FICM) as % of each PID's range so they're all readable on one axis. Left: PID browser (live values, confidence badges, checkboxes).
Center: stacked Graph view (pyqtgraph overlay) and Table view.
Built to run against MockLink with no hardware -- pick "Mock" and Connect. Vehicle data comes entirely from the active JSON profile (profiles/*.json),
so the app is not Ford-specific -- load another profile to scan another truck.
Runs against MockLink with no hardware (tick "Mock" + Connect).
""" """
import time import os
import shutil
from PySide6 import QtCore, QtGui, QtWidgets from PySide6 import QtCore, QtGui, QtWidgets
import pyqtgraph as pg import pyqtgraph as pg
from obdcore import PRESETS from obdcore import (list_profiles, profiles_dir, save_profile, load_profile,
export_csv, replay_csv, TimeSeriesStore)
from .controller import Controller from .controller import Controller
PLOT_WINDOW_S = 60.0 # seconds of history shown PLOT_WINDOW_S = 60.0
REFRESH_MS = 100 # GUI redraw rate (10 Hz) REFRESH_MS = 100
CURVE_COLORS = [ CURVE_COLORS = [
"#e6194B", "#3cb44b", "#4363d8", "#f58231", "#911eb4", "#e6194B", "#3cb44b", "#4363d8", "#f58231", "#911eb4", "#42d4f4",
"#42d4f4", "#f032e6", "#bfef45", "#fabed4", "#469990", "#f032e6", "#bfef45", "#fabed4", "#469990", "#9A6324", "#ffe119",
"#9A6324", "#ffe119", "#000075", "#a9a9a9", "#800000", "#000075", "#a9a9a9", "#800000",
] ]
GROUP_ORDER = ["fuel", "ficm", "air", "engine", "driveline", "power", "misc"] GROUP_ORDER = ["fuel", "ficm", "air", "engine", "driveline", "power", "misc"]
GROUP_LABEL = { GROUP_LABEL = {"fuel": "Fuel / Injection", "ficm": "FICM", "air": "Air / Boost",
"fuel": "Fuel / Injection", "ficm": "FICM", "air": "Air / Boost", "engine": "Engine", "driveline": "Driveline", "power": "Power",
"engine": "Engine", "driveline": "Driveline", "power": "Power", "misc": "Other", "misc": "Other"}
}
CONF_BADGE = {"verified": "", "doc": " [DOC]", "tentative": " [?]"} CONF_BADGE = {"verified": "", "doc": " [DOC]", "tentative": " [?]"}
THEMES = {"dark": ("#111", "#ccc"), "light": ("#fafafa", "#222")}
class MainWindow(QtWidgets.QMainWindow): class MainWindow(QtWidgets.QMainWindow):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.setWindowTitle("ford-obd -- 6.0 Power Stroke scanner")
self.resize(1100, 680)
self.ctl = Controller() self.ctl = Controller()
self.curves = {} # key -> PlotDataItem self.curves = {}
self._color_i = 0 self._color_i = 0
self._theme = "dark"
self._build_menubar()
self._build_connection_bar() self._build_connection_bar()
self._build_pid_browser() self._build_pid_browser()
self._build_plot() self._build_center()
self._build_statusbar() self._build_statusbar()
self._refresh_title()
self._rebuild_for_profile()
self.timer = QtCore.QTimer(self) self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self._tick) self.timer.timeout.connect(self._tick)
self.timer.setInterval(REFRESH_MS) self.timer.setInterval(REFRESH_MS)
# ---- UI construction ---- # ---------- menus ----------
def _build_menubar(self):
mb = self.menuBar()
filem = mb.addMenu("&File")
self._act(filem, "New Capture", self._new_capture, "Clear the current data buffers")
filem.addSeparator()
self.rec_act = self._act(filem, "Start Recording…", self._toggle_record,
"Stream samples to a CSV as they arrive")
self._act(filem, "Export Capture As… (CSV)", self._export_capture,
"Write the current in-memory buffers to a CSV")
self._act(filem, "Open Capture (Replay)…", self._open_capture,
"Load a recorded CSV and plot it (disconnect first)")
filem.addSeparator()
self._act(filem, "Quit", self.close, shortcut="Ctrl+Q")
self.profm = mb.addMenu("&Profile")
self._rebuild_profile_menu()
viewm = mb.addMenu("&View")
self.view_graph = self._act(viewm, "Graph View", lambda: self._set_view(0),
checkable=True)
self.view_table = self._act(viewm, "Table View", lambda: self._set_view(1),
checkable=True)
self.view_graph.setChecked(True)
gauges = self._act(viewm, "Gauge View (P2)", lambda: None, checkable=True)
gauges.setEnabled(False)
viewm.addSeparator()
self.show_pids = self._act(viewm, "Show PID Panel", self._toggle_pid_dock,
checkable=True)
self.show_pids.setChecked(True)
self.norm_act = self._act(viewm, "Normalize Graph (% of range)",
self._sync_norm_from_menu, checkable=True)
viewm.addSeparator()
self.theme_act = self._act(viewm, "Light Theme", self._toggle_theme, checkable=True)
helpm = mb.addMenu("&Help")
self._act(helpm, "About ford-obd", self._about)
self._act(helpm, "PID Confidence Legend", self._legend)
self._act(helpm, "Active Profile Info", self._profile_info)
def _act(self, menu, text, slot, tip="", checkable=False, shortcut=None):
a = QtGui.QAction(text, self)
a.triggered.connect(lambda _=False: slot())
if tip:
a.setStatusTip(tip)
a.setCheckable(checkable)
if shortcut:
a.setShortcut(shortcut)
menu.addAction(a)
return a
def _rebuild_profile_menu(self):
self.profm.clear()
self._profile_group = QtGui.QActionGroup(self)
self._profile_group.setExclusive(True)
active = getattr(self.ctl.profile, "path", None)
for path, meta in list_profiles():
a = QtGui.QAction(meta.get("name", os.path.basename(path)), self)
a.setCheckable(True)
a.setChecked(active and os.path.abspath(path) == os.path.abspath(active))
a.triggered.connect(lambda _=False, p=path: self._load_profile(p))
self._profile_group.addAction(a)
self.profm.addAction(a)
self.profm.addSeparator()
self._act(self.profm, "Load Profile…", self._load_profile_dialog)
self._act(self.profm, "Import Profile…", self._import_profile)
self._act(self.profm, "Reload Active", self._reload_profile)
self._act(self.profm, "Edit Active Profile (JSON)…", self._edit_profile)
self._act(self.profm, "Export Active Profile As…", self._export_profile)
# ---------- toolbar ----------
def _build_connection_bar(self): def _build_connection_bar(self):
tb = QtWidgets.QToolBar("Connection") tb = QtWidgets.QToolBar("Connection")
tb.setMovable(False) tb.setMovable(False)
self.addToolBar(tb) self.addToolBar(tb)
tb.addWidget(QtWidgets.QLabel(" Port ")) tb.addWidget(QtWidgets.QLabel(" Port "))
self.port_combo = QtWidgets.QComboBox() self.port_combo = QtWidgets.QComboBox()
self.port_combo.setMinimumWidth(180) self.port_combo.setMinimumWidth(180)
self._refresh_ports() self._refresh_ports()
tb.addWidget(self.port_combo) tb.addWidget(self.port_combo)
b = QtWidgets.QToolButton(); b.setText(""); b.clicked.connect(self._refresh_ports)
refresh = QtWidgets.QToolButton() tb.addWidget(b)
refresh.setText("")
refresh.setToolTip("Rescan serial ports")
refresh.clicked.connect(self._refresh_ports)
tb.addWidget(refresh)
tb.addWidget(QtWidgets.QLabel(" Baud ")) tb.addWidget(QtWidgets.QLabel(" Baud "))
self.baud_edit = QtWidgets.QLineEdit("38400") self.baud_edit = QtWidgets.QLineEdit("38400"); self.baud_edit.setFixedWidth(70)
self.baud_edit.setFixedWidth(70)
tb.addWidget(self.baud_edit) tb.addWidget(self.baud_edit)
self.mock_chk = QtWidgets.QCheckBox("Mock"); tb.addWidget(self.mock_chk)
self.mock_chk = QtWidgets.QCheckBox("Mock")
self.mock_chk.setToolTip("Use simulated data (no adapter needed)")
tb.addWidget(self.mock_chk)
self.connect_btn = QtWidgets.QPushButton("Connect") self.connect_btn = QtWidgets.QPushButton("Connect")
self.connect_btn.clicked.connect(self._toggle_connect) self.connect_btn.clicked.connect(self._toggle_connect)
tb.addWidget(self.connect_btn) tb.addWidget(self.connect_btn)
tb.addSeparator() tb.addSeparator()
for name in ("crank", "driving", "vitals"): self._preset_tb = tb
b = QtWidgets.QPushButton(name.capitalize()) self._preset_sep = tb.addSeparator()
b.setToolTip(f"Select the '{name}' PID set") self._preset_buttons = []
b.clicked.connect(lambda _=False, n=name: self._apply_preset(n))
b.setEnabled(False)
b.setProperty("preset", True)
tb.addWidget(b)
def _rebuild_preset_buttons(self):
for b in self._preset_buttons:
self._preset_tb.removeAction(b)
self._preset_buttons = []
for name in self.ctl.reg.preset_names():
btn = QtWidgets.QPushButton(name.capitalize())
btn.setEnabled(self.ctl.connected)
btn.clicked.connect(lambda _=False, n=name: self._apply_preset(n))
self._preset_buttons.append(self._preset_tb.addWidget(btn))
self._preset_buttons[-1].setProperty("presetbtn", True)
btn.setProperty("preset", True)
# ---------- PID browser ----------
def _build_pid_browser(self): def _build_pid_browser(self):
dock = QtWidgets.QDockWidget("PIDs", self) self.pid_dock = QtWidgets.QDockWidget("PIDs", self)
dock.setAllowedAreas(QtCore.Qt.LeftDockWidgetArea | QtCore.Qt.RightDockWidgetArea)
self.tree = QtWidgets.QTreeWidget() self.tree = QtWidgets.QTreeWidget()
self.tree.setColumnCount(2) self.tree.setColumnCount(2)
self.tree.setHeaderLabels(["Signal", "Value"]) self.tree.setHeaderLabels(["Signal", "Value"])
self.tree.setRootIsDecorated(True)
self.tree.setUniformRowHeights(True)
self.tree.itemChanged.connect(self._on_item_changed) self.tree.itemChanged.connect(self._on_item_changed)
dock.setWidget(self.tree) self.pid_dock.setWidget(self.tree)
self.addDockWidget(QtCore.Qt.LeftDockWidgetArea, dock) self.pid_dock.visibilityChanged.connect(
self._populate_tree() lambda vis: self.show_pids.setChecked(vis))
self.addDockWidget(QtCore.Qt.LeftDockWidgetArea, self.pid_dock)
def _populate_tree(self): def _populate_tree(self):
self.tree.blockSignals(True) self.tree.blockSignals(True)
self.tree.clear() self.tree.clear()
self._items = {} # key -> QTreeWidgetItem self._items = {}
groups = {} groups = {}
for p in self.ctl.reg.all(): for p in self.ctl.reg.all():
g = groups.get(p.group) g = groups.get(p.group)
@@ -118,45 +189,124 @@ class MainWindow(QtWidgets.QMainWindow):
it.setFlags(QtCore.Qt.ItemIsUserCheckable | QtCore.Qt.ItemIsEnabled) it.setFlags(QtCore.Qt.ItemIsUserCheckable | QtCore.Qt.ItemIsEnabled)
it.setCheckState(0, QtCore.Qt.Unchecked) it.setCheckState(0, QtCore.Qt.Unchecked)
it.setData(0, QtCore.Qt.UserRole, p.key) it.setData(0, QtCore.Qt.UserRole, p.key)
it.setToolTip(0, f"{p.key} (mode {p.mode} {p.pid}) {p.unit} {p.notes}") it.setToolTip(0, f"{p.key} (mode {p.mode} {p.pid}) {p.unit} {p.notes}")
g.addChild(it) g.addChild(it)
self._items[p.key] = it self._items[p.key] = it
for gk in GROUP_ORDER: for gk in GROUP_ORDER:
if gk in groups: if gk in groups:
self.tree.addTopLevelItem(groups[gk]) self.tree.addTopLevelItem(groups[gk])
for gk, g in groups.items(): # any custom groups not in GROUP_ORDER
if gk not in GROUP_ORDER:
self.tree.addTopLevelItem(g)
self.tree.expandAll() self.tree.expandAll()
self.tree.resizeColumnToContents(0) self.tree.resizeColumnToContents(0)
self.tree.blockSignals(False) self.tree.blockSignals(False)
def _build_plot(self): # ---------- center (graph + table stack) ----------
pg.setConfigOptions(antialias=True, background="#111", foreground="#ccc") def _build_center(self):
central = QtWidgets.QWidget() self.stack = QtWidgets.QStackedWidget()
lay = QtWidgets.QVBoxLayout(central)
lay.setContentsMargins(4, 4, 4, 4)
# graph page
gpage = QtWidgets.QWidget(); gl = QtWidgets.QVBoxLayout(gpage)
gl.setContentsMargins(4, 4, 4, 4)
bar = QtWidgets.QHBoxLayout() bar = QtWidgets.QHBoxLayout()
self.norm_chk = QtWidgets.QCheckBox("Normalize (% of range)") self.norm_chk = QtWidgets.QCheckBox("Normalize (% of range)")
self.norm_chk.setToolTip("Scale each curve to its min..max so mixed units " self.norm_chk.toggled.connect(lambda v: self.norm_act.setChecked(v))
"(ICP vs FICM) are all readable on one axis") bar.addWidget(self.norm_chk); bar.addStretch(1)
bar.addWidget(self.norm_chk) bar.addWidget(QtWidgets.QLabel(f"window: {int(PLOT_WINDOW_S)}s"))
bar.addStretch(1) gl.addLayout(bar)
self.window_label = QtWidgets.QLabel(f"window: {int(PLOT_WINDOW_S)}s")
bar.addWidget(self.window_label)
lay.addLayout(bar)
self.plot = pg.PlotWidget() self.plot = pg.PlotWidget()
self.plot.addLegend(offset=(10, 10)) self.plot.addLegend(offset=(10, 10))
self.plot.showGrid(x=True, y=True, alpha=0.25) self.plot.showGrid(x=True, y=True, alpha=0.25)
self.plot.setLabel("bottom", "time", units="s") self.plot.setLabel("bottom", "time", units="s")
self.plot.setLabel("left", "value") gl.addWidget(self.plot)
lay.addWidget(self.plot) self.stack.addWidget(gpage)
self.setCentralWidget(central)
# table page
self.table = QtWidgets.QTableWidget(0, 6)
self.table.setHorizontalHeaderLabels(["Signal", "Value", "Unit", "Min", "Max", "Conf"])
self.table.horizontalHeader().setStretchLastSection(True)
self.table.setEditTriggers(QtWidgets.QAbstractItemView.NoEditTriggers)
self.stack.addWidget(self.table)
self.setCentralWidget(self.stack)
self._apply_theme()
def _build_statusbar(self): def _build_statusbar(self):
self.status = self.statusBar() self.status = self.statusBar()
self.status.showMessage("Not connected. Pick a port (or Mock) and Connect.") self.status.showMessage("Not connected. Pick a port (or Mock) and Connect.")
# ---- connection ---- # ---------- profile lifecycle ----------
def _rebuild_for_profile(self):
self._populate_tree()
self._rebuild_preset_buttons()
self._populate_table_rows()
self._refresh_title()
def _refresh_title(self):
self.setWindowTitle(f"ford-obd — {self.ctl.profile.name}")
def _load_profile(self, path):
if self.ctl.connected:
QtWidgets.QMessageBox.information(self, "Disconnect first",
"Disconnect before switching vehicle profiles.")
self._rebuild_profile_menu()
return
try:
self.ctl.load_profile(path)
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Profile load failed", str(e))
return
self.curves.clear(); self._color_i = 0
self._rebuild_for_profile()
self._rebuild_profile_menu()
self.status.showMessage(f"Loaded profile: {self.ctl.profile.name}")
def _load_profile_dialog(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(
self, "Load vehicle profile", profiles_dir(), "Profiles (*.json)")
if path:
self._load_profile(path)
def _import_profile(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(
self, "Import vehicle profile", "", "Profiles (*.json)")
if not path:
return
dest = os.path.join(profiles_dir(), os.path.basename(path))
try:
load_profile(path) # validate before copying in
shutil.copyfile(path, dest)
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Import failed", str(e))
return
self._load_profile(dest)
def _reload_profile(self):
if getattr(self.ctl.profile, "path", None):
self._load_profile(self.ctl.profile.path)
def _export_profile(self):
path, _ = QtWidgets.QFileDialog.getSaveFileName(
self, "Export active profile", profiles_dir(), "Profiles (*.json)")
if path:
save_profile(self.ctl.profile, path)
self.status.showMessage(f"Exported profile to {path}")
def _edit_profile(self):
p = getattr(self.ctl.profile, "path", None)
if not p:
return
dlg = JsonEditDialog(p, self)
if dlg.exec() == QtWidgets.QDialog.Accepted and not self.ctl.connected:
self._load_profile(p)
def _profile_info(self):
m = self.ctl.profile.meta
text = "\n".join(f"{k}: {v}" for k, v in m.items())
QtWidgets.QMessageBox.information(self, "Active profile", text or "(no metadata)")
# ---------- connection ----------
def _refresh_ports(self): def _refresh_ports(self):
self.port_combo.clear() self.port_combo.clear()
try: try:
@@ -171,73 +321,65 @@ class MainWindow(QtWidgets.QMainWindow):
def _toggle_connect(self): def _toggle_connect(self):
if self.ctl.connected: if self.ctl.connected:
self._disconnect() self._disconnect(); return
return
mock = self.mock_chk.isChecked()
port = self.port_combo.currentData() port = self.port_combo.currentData()
try: try:
baud = int(self.baud_edit.text()) baud = int(self.baud_edit.text())
except ValueError: except ValueError:
baud = 38400 baud = 38400
try: try:
ok = self.ctl.connect(port=port, baud=baud, mock=mock) ok = self.ctl.connect(port=port, baud=baud, mock=self.mock_chk.isChecked())
except Exception as e: except Exception as e:
QtWidgets.QMessageBox.critical(self, "Connect failed", str(e)) QtWidgets.QMessageBox.critical(self, "Connect failed", str(e)); return
return self.ctl.start(); self.timer.start()
self.ctl.start()
self.timer.start()
self.connect_btn.setText("Disconnect") self.connect_btn.setText("Disconnect")
self._set_presets_enabled(True) for b in self.findChildren(QtWidgets.QPushButton):
proto = getattr(self.ctl.link, "protocol", "?") if b.property("preset"):
kind = "MOCK" if mock else "ELM327" b.setEnabled(True)
self.status.showMessage(f"Connected ({kind}) protocol {proto} " kind = "MOCK" if self.mock_chk.isChecked() else "ELM327"
f"{'(ECU answered)' if ok else '(no 0100 ack - key to RUN?)'}") self.status.showMessage(f"Connected ({kind}) protocol "
self._apply_preset("crank") f"{getattr(self.ctl.link,'protocol','?')} "
f"{'(ECU answered)' if ok else '(no 0100 ack — key to RUN?)'}")
names = self.ctl.reg.preset_names()
if names:
self._apply_preset(names[0])
def _disconnect(self): def _disconnect(self):
self.timer.stop() self.timer.stop()
for key in list(self.curves): for key in list(self.curves):
self._remove_curve(key) self._remove_curve(key)
self.ctl.stop() self.ctl.stop()
# uncheck everything self.rec_act.setText("Start Recording…")
self.tree.blockSignals(True) self.tree.blockSignals(True)
for it in self._items.values(): for it in self._items.values():
it.setCheckState(0, QtCore.Qt.Unchecked) it.setCheckState(0, QtCore.Qt.Unchecked); it.setText(1, "--")
it.setText(1, "--")
self.tree.blockSignals(False) self.tree.blockSignals(False)
self.connect_btn.setText("Connect") self.connect_btn.setText("Connect")
self._set_presets_enabled(False)
self.status.showMessage("Disconnected.")
def _set_presets_enabled(self, on):
for b in self.findChildren(QtWidgets.QPushButton): for b in self.findChildren(QtWidgets.QPushButton):
if b.property("preset"): if b.property("preset"):
b.setEnabled(on) b.setEnabled(False)
self.status.showMessage("Disconnected.")
# ---- PID selection ---- # ---------- PID selection ----------
def _apply_preset(self, name): def _apply_preset(self, name):
if not self.ctl.connected: if not self.ctl.connected:
return return
wanted = set(PRESETS.get(name, [])) wanted = set(self.ctl.reg.presets.get(name, []))
self.tree.blockSignals(True) self.tree.blockSignals(True)
for key, it in self._items.items(): for key, it in self._items.items():
want = key in wanted it.setCheckState(0, QtCore.Qt.Checked if key in wanted else QtCore.Qt.Unchecked)
it.setCheckState(0, QtCore.Qt.Checked if want else QtCore.Qt.Unchecked)
self.tree.blockSignals(False) self.tree.blockSignals(False)
# sync subscriptions/curves to the new check state
for key in self._items: for key in self._items:
self._sync_key(key) self._sync_key(key)
def _on_item_changed(self, item, col): def _on_item_changed(self, item, col):
if col != 0: if col == 0:
return key = item.data(0, QtCore.Qt.UserRole)
key = item.data(0, QtCore.Qt.UserRole) if key:
if key: self._sync_key(key)
self._sync_key(key)
def _sync_key(self, key): def _sync_key(self, key):
it = self._items[key] checked = self._items[key].checkState(0) == QtCore.Qt.Checked
checked = it.checkState(0) == QtCore.Qt.Checked
has = key in self.curves has = key in self.curves
if checked and not has: if checked and not has:
if self.ctl.connected: if self.ctl.connected:
@@ -249,46 +391,134 @@ class MainWindow(QtWidgets.QMainWindow):
def _add_curve(self, key): def _add_curve(self, key):
p = self.ctl.reg.get(key) p = self.ctl.reg.get(key)
color = CURVE_COLORS[self._color_i % len(CURVE_COLORS)] color = CURVE_COLORS[self._color_i % len(CURVE_COLORS)]; self._color_i += 1
self._color_i += 1 self.curves[key] = self.plot.plot([], [], name=f"{p.name} ({p.unit})",
pen = pg.mkPen(color=color, width=2) pen=pg.mkPen(color=color, width=2))
curve = self.plot.plot([], [], name=f"{p.name} ({p.unit})", pen=pen)
curve.setData([], [])
self.curves[key] = curve
def _remove_curve(self, key): def _remove_curve(self, key):
curve = self.curves.pop(key, None) c = self.curves.pop(key, None)
if curve is not None: if c is not None:
self.plot.removeItem(curve) self.plot.removeItem(c)
legend = self.plot.plotItem.legend leg = self.plot.plotItem.legend
if legend: if leg:
try: try:
legend.removeItem(curve) leg.removeItem(c)
except Exception: except Exception:
pass pass
# ---- periodic refresh ---- # ---------- view ----------
def _tick(self): def _set_view(self, idx):
if not self.ctl.connected: self.stack.setCurrentIndex(idx)
self.view_graph.setChecked(idx == 0)
self.view_table.setChecked(idx == 1)
def _toggle_pid_dock(self):
self.pid_dock.setVisible(self.show_pids.isChecked())
def _sync_norm_from_menu(self):
self.norm_chk.setChecked(self.norm_act.isChecked())
def _toggle_theme(self):
self._theme = "light" if self.theme_act.isChecked() else "dark"
self._apply_theme()
def _apply_theme(self):
bg, fg = THEMES[self._theme]
self.plot.setBackground(bg)
self.plot.getAxis("bottom").setPen(fg); self.plot.getAxis("left").setPen(fg)
self.plot.getAxis("bottom").setTextPen(fg); self.plot.getAxis("left").setTextPen(fg)
def _populate_table_rows(self):
pids = self.ctl.reg.all()
self.table.setRowCount(len(pids))
self._table_row = {}
for r, p in enumerate(pids):
self._table_row[p.key] = r
self.table.setItem(r, 0, QtWidgets.QTableWidgetItem(p.name))
self.table.setItem(r, 1, QtWidgets.QTableWidgetItem("--"))
self.table.setItem(r, 2, QtWidgets.QTableWidgetItem(p.unit))
self.table.setItem(r, 3, QtWidgets.QTableWidgetItem("--"))
self.table.setItem(r, 4, QtWidgets.QTableWidgetItem("--"))
self.table.setItem(r, 5, QtWidgets.QTableWidgetItem(p.confidence))
self.table.resizeColumnsToContents()
# ---------- captures ----------
def _new_capture(self):
self.ctl.store.clear()
import time
self.ctl.t0 = time.time()
self.status.showMessage("New capture — buffers cleared.")
def _toggle_record(self):
if self.ctl.store.recorder is None:
path, _ = QtWidgets.QFileDialog.getSaveFileName(
self, "Record capture to", "", "CSV (*.csv)")
if not path:
return
self.ctl.record(path)
self.rec_act.setText("Stop Recording")
self.status.showMessage(f"Recording to {path}")
else:
self.ctl.stop_record()
self.rec_act.setText("Start Recording…")
self.status.showMessage("Recording stopped.")
def _export_capture(self):
path, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Export capture", "", "CSV (*.csv)")
if path:
export_csv(self.ctl.store, path)
self.status.showMessage(f"Exported capture to {path}")
def _open_capture(self):
if self.ctl.connected:
QtWidgets.QMessageBox.information(self, "Disconnect first",
"Disconnect before replaying a capture."); return
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open capture", "", "CSV (*.csv)")
if not path:
return return
now = self.ctl.now() store = TimeSeriesStore()
since = (self.ctl.t0 or 0) + max(0.0, now - PLOT_WINDOW_S) replay_csv(path, store)
normalize = self.norm_chk.isChecked() self.ctl.store = store
for key in list(self.curves):
# update browser values self._remove_curve(key)
import time
ts = [t for s in store.snapshot().values() for t, _ in s]
self.ctl.t0 = min(ts) if ts else time.time()
self.tree.blockSignals(True) self.tree.blockSignals(True)
for key, it in self._items.items(): for key in store.keys():
v = self.ctl.store.latest(key) if key in self._items:
p = self.ctl.reg.get(key) self._items[key].setCheckState(0, QtCore.Qt.Checked)
it.setText(1, "--" if v is None else f"{v:g} {p.unit}".strip()) self._add_curve(key)
self.tree.blockSignals(False) self.tree.blockSignals(False)
self._redraw_curves(static=True)
self.status.showMessage(f"Replay: {os.path.basename(path)} ({len(ts)} samples)")
# update plotted curves # ---------- help ----------
def _about(self):
QtWidgets.QMessageBox.about(self, "About ford-obd",
"ford-obd — vehicle-agnostic OBD-II scanner\n\n"
"Open source. Vehicle data lives in JSON profiles you can add/share.\n"
"git.jpaul.io/justin/ford-obd")
def _legend(self):
QtWidgets.QMessageBox.information(self, "PID confidence",
"verified — multi-source or read on a real vehicle\n"
"[DOC] — documented in sources, not yet read on this vehicle\n"
"[?] — single-source / disputed scaling; sanity-check first")
# ---------- refresh ----------
def _redraw_curves(self, static=False):
if self.ctl.t0 is None:
return
if static:
since = None
else:
since = (self.ctl.t0 or 0) + max(0.0, self.ctl.now() - PLOT_WINDOW_S)
normalize = self.norm_chk.isChecked()
for key, curve in self.curves.items(): for key, curve in self.curves.items():
p = self.ctl.reg.get(key) p = self.ctl.reg.get(key)
series = self.ctl.store.channel(key).series(since=since)
xs, ys = [], [] xs, ys = [], []
for t, v in series: for t, v in self.ctl.store.channel(key).series(since=since):
if v is None: if v is None:
continue continue
xs.append(t - self.ctl.t0) xs.append(t - self.ctl.t0)
@@ -299,6 +529,24 @@ class MainWindow(QtWidgets.QMainWindow):
curve.setData(xs, ys) curve.setData(xs, ys)
self.plot.setLabel("left", "% of range" if normalize else "value") self.plot.setLabel("left", "% of range" if normalize else "value")
def _tick(self):
if not self.ctl.connected:
return
self.tree.blockSignals(True)
for key, it in self._items.items():
v = self.ctl.store.latest(key)
p = self.ctl.reg.get(key)
txt = "--" if v is None else f"{v:g} {p.unit}".strip()
it.setText(1, txt)
if key in self._table_row:
r = self._table_row[key]
lo, hi = self.ctl.store.minmax(key)
self.table.item(r, 1).setText("--" if v is None else f"{v:g}")
self.table.item(r, 3).setText("--" if lo is None else f"{lo:g}")
self.table.item(r, 4).setText("--" if hi is None else f"{hi:g}")
self.tree.blockSignals(False)
self._redraw_curves()
def closeEvent(self, ev): def closeEvent(self, ev):
try: try:
self.timer.stop() self.timer.stop()
@@ -307,10 +555,49 @@ class MainWindow(QtWidgets.QMainWindow):
super().closeEvent(ev) super().closeEvent(ev)
class JsonEditDialog(QtWidgets.QDialog):
"""Minimal raw-JSON editor for the active profile (validates on save)."""
def __init__(self, path, parent=None):
super().__init__(parent)
self.path = path
self.setWindowTitle(f"Edit profile — {os.path.basename(path)}")
self.resize(720, 560)
lay = QtWidgets.QVBoxLayout(self)
self.edit = QtWidgets.QPlainTextEdit()
self.edit.setFont(QtGui.QFont("monospace"))
with open(path) as f:
self.edit.setPlainText(f.read())
lay.addWidget(self.edit)
bb = QtWidgets.QDialogButtonBox(
QtWidgets.QDialogButtonBox.Save | QtWidgets.QDialogButtonBox.Cancel)
bb.accepted.connect(self._save); bb.rejected.connect(self.reject)
lay.addWidget(bb)
def _save(self):
import json
text = self.edit.toPlainText()
try:
json.loads(text) # syntax check
except Exception as e:
QtWidgets.QMessageBox.critical(self, "Invalid JSON", str(e)); return
tmp = self.path + ".tmp"
with open(tmp, "w") as f:
f.write(text)
try:
load_profile(tmp) # schema/formula validation
except Exception as e:
os.remove(tmp)
QtWidgets.QMessageBox.critical(self, "Invalid profile", str(e)); return
os.replace(tmp, self.path)
self.accept()
def run(): def run():
import sys import sys
app = QtWidgets.QApplication(sys.argv) app = QtWidgets.QApplication(sys.argv)
win = MainWindow() win = MainWindow()
win.resize(1150, 700)
win.show() win.show()
sys.exit(app.exec()) sys.exit(app.exec())
+21 -12
View File
@@ -1,20 +1,29 @@
"""obdcore -- headless OBD-II acquisition core for the ford-obd project. """obdcore -- headless, vehicle-agnostic OBD-II acquisition core.
Layered, GUI-agnostic foundation shared by the terminal tool and the Vehicle data (PIDs, scaling, DTCs, presets) lives in JSON profiles under
forthcoming PySide6 + pyqtgraph Windows app: profiles/ -- loaded at runtime, not hardcoded -- so the app works across
vehicles and others can contribute profiles.
link.py ElmLink -- ELM327 serial transport (+ MockLink in mock.py) formula.py safe A/B/... scaling-formula evaluator (no code execution)
registry.py PidRegistry -- verified Ford 6.0 PID table + DTC database profile.py load/save/list vehicle profiles (JSON)
scheduler.py PollScheduler -- prioritized round-robin polling engine registry.py PidRegistry / DtcDatabase model + lookups
store.py TimeSeriesStore -- ring buffers, min/max, record/replay link.py ElmLink ELM327 serial transport (+ MockLink in mock.py)
scheduler.py PollScheduler prioritized polling engine
store.py TimeSeriesStore ring buffers + record/replay
See ARCHITECTURE.md for the full design and roadmap. See ARCHITECTURE.md and profiles/README.md.
""" """
from .registry import PidRegistry, DtcDatabase, Pid, Dtc, PRESETS from .registry import PidRegistry, DtcDatabase, Pid, Dtc
from .store import TimeSeriesStore, CsvRecorder, replay_csv from .profile import (Profile, load_profile, save_profile, list_profiles,
profiles_dir, default_profile_path, load_default)
from .formula import compile_formula, FormulaError
from .store import TimeSeriesStore, CsvRecorder, replay_csv, export_csv
from .scheduler import PollScheduler from .scheduler import PollScheduler
__all__ = [ __all__ = [
"PidRegistry", "DtcDatabase", "Pid", "Dtc", "PRESETS", "PidRegistry", "DtcDatabase", "Pid", "Dtc",
"TimeSeriesStore", "CsvRecorder", "replay_csv", "PollScheduler", "Profile", "load_profile", "save_profile", "list_profiles",
"profiles_dir", "default_profile_path", "load_default",
"compile_formula", "FormulaError",
"TimeSeriesStore", "CsvRecorder", "replay_csv", "export_csv", "PollScheduler",
] ]
+98
View File
@@ -0,0 +1,98 @@
"""Safe formula evaluator for vehicle-profile PID scaling.
Profiles are community-contributed data, so decode formulas must NOT be able to
execute arbitrary code. Formulas are arithmetic expressions over named
variables -- the de-facto OBD convention used by Torque / FORScan / ScanGauge:
raw-mode PIDs: variables A, B, C, ... = response data bytes 0, 1, 2, ...
e.g. "(A*256+B)*0.57" "A-40" "(A>>1)&1" "A//2"
derived PIDs: variables are other PID keys
e.g. "MAP - BARO"
Only numeric literals, the named variables, arithmetic/bitwise operators, and a
small whitelist of functions are allowed. No names, attributes, subscripts,
comprehensions, or calls outside the whitelist -- anything else raises
FormulaError at compile time, so a bad/hostile profile fails loudly on load.
"""
import ast
import operator
_BIN = {
ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.BitAnd: operator.and_, ast.BitOr: operator.or_, ast.BitXor: operator.xor,
ast.LShift: operator.lshift, ast.RShift: operator.rshift,
}
_UNARY = {ast.USub: operator.neg, ast.UAdd: operator.pos, ast.Invert: operator.invert}
_FUNCS = {"min": min, "max": max, "abs": abs, "round": round,
"int": int, "float": float}
class FormulaError(ValueError):
pass
def _validate(node, allowed):
if isinstance(node, ast.Expression):
return _validate(node.body, allowed)
if isinstance(node, ast.BinOp):
if type(node.op) not in _BIN:
raise FormulaError(f"operator not allowed: {type(node.op).__name__}")
_validate(node.left, allowed)
_validate(node.right, allowed)
return
if isinstance(node, ast.UnaryOp):
if type(node.op) not in _UNARY:
raise FormulaError(f"unary op not allowed: {type(node.op).__name__}")
_validate(node.operand, allowed)
return
if isinstance(node, ast.Constant):
if not isinstance(node.value, (int, float)) or isinstance(node.value, bool):
raise FormulaError("only numeric constants allowed")
return
if isinstance(node, ast.Name):
if node.id not in allowed:
raise FormulaError(f"unknown variable {node.id!r} (allowed: {sorted(allowed)})")
return
if isinstance(node, ast.Call):
if not isinstance(node.func, ast.Name) or node.func.id not in _FUNCS:
raise FormulaError("only min/max/abs/round/int/float calls allowed")
if node.keywords:
raise FormulaError("keyword args not allowed")
for a in node.args:
_validate(a, allowed)
return
raise FormulaError(f"expression not allowed: {type(node).__name__}")
def _eval(node, names):
if isinstance(node, ast.Expression):
return _eval(node.body, names)
if isinstance(node, ast.BinOp):
return _BIN[type(node.op)](_eval(node.left, names), _eval(node.right, names))
if isinstance(node, ast.UnaryOp):
return _UNARY[type(node.op)](_eval(node.operand, names))
if isinstance(node, ast.Constant):
return node.value
if isinstance(node, ast.Name):
return names[node.id]
if isinstance(node, ast.Call):
return _FUNCS[node.func.id](*[_eval(a, names) for a in node.args])
raise FormulaError(f"expression not allowed: {type(node).__name__}")
def compile_formula(expr, allowed_names):
"""Return fn(names_dict) -> number. Raises FormulaError on disallowed input."""
try:
tree = ast.parse(expr, mode="eval")
except SyntaxError as e:
raise FormulaError(f"bad formula {expr!r}: {e}")
allowed = set(allowed_names)
_validate(tree, allowed)
def fn(names):
return _eval(tree, names)
fn.expr = expr
return fn
+155
View File
@@ -0,0 +1,155 @@
"""Vehicle profiles -- load/save/list the JSON files under profiles/.
A profile is pure data: vehicle metadata, PID definitions (with safe formula
strings), DTC meanings, and named presets (perspectives). Loading a profile
compiles each PID's formula into a decode callable; nothing in a profile can
execute arbitrary code (see formula.py).
JSON schema (schema=1):
{
"schema": 1,
"meta": {"name","make","model","years","engine","author","version",
"protocol","notes"},
"pids": [{"key","name","mode","pid","nbytes","formula","unit","group",
"vmin","vmax","confidence","round","deps","notes"}, ...],
"presets": {"crank":[keys...], ...},
"dtcs": [{"code","desc","system","no_start","causes"}, ...]
}
"""
import glob
import json
import os
from dataclasses import dataclass, field
from .formula import compile_formula
from .registry import Pid, Dtc
SCHEMA = 1
BYTE_VARS = [chr(65 + i) for i in range(8)] # A..H
@dataclass
class Profile:
meta: dict
pids: list
dtcs: list
presets: dict
path: str = None
@property
def name(self):
return self.meta.get("name", "Unnamed profile")
def profiles_dir():
return os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"profiles")
def _round(v, rnd):
if rnd is None:
return v
return int(round(v)) if rnd == 0 else round(v, rnd)
def _build_decode(d):
mode = d.get("mode", "22")
rnd = d.get("round")
if mode == "atrv":
return None
formula = d.get("formula", "")
if mode == "derived":
deps = tuple(d.get("deps", ()))
fn = compile_formula(formula, deps)
def dec(vals, fn=fn, deps=deps, rnd=rnd):
return _round(fn(dict(zip(deps, vals))), rnd)
return dec
fn = compile_formula(formula, BYTE_VARS)
def dec(raw, fn=fn, rnd=rnd):
names = {BYTE_VARS[i]: raw[i] for i in range(min(len(raw), 8))}
return _round(fn(names), rnd)
return dec
def _pid_from_dict(d):
return Pid(
key=d["key"], name=d.get("name", d["key"]), mode=d.get("mode", "22"),
pid=d.get("pid", ""), nbytes=d.get("nbytes", 2),
formula=d.get("formula", ""), decode=_build_decode(d),
unit=d.get("unit", ""), group=d.get("group", "misc"),
vmin=d.get("vmin", 0.0), vmax=d.get("vmax", 100.0),
confidence=d.get("confidence", "verified"), round=d.get("round"),
deps=tuple(d.get("deps", ())), notes=d.get("notes", ""),
)
def load_profile(path):
with open(path) as f:
raw = json.load(f)
if raw.get("schema", 1) != SCHEMA:
raise ValueError(f"unsupported profile schema {raw.get('schema')} in {path}")
pids = [_pid_from_dict(d) for d in raw.get("pids", [])]
dtcs = [Dtc(code=x["code"], desc=x.get("desc", ""), system=x.get("system", "powertrain"),
no_start=x.get("no_start", False), causes=x.get("causes", ""))
for x in raw.get("dtcs", [])]
return Profile(meta=raw.get("meta", {}), pids=pids, dtcs=dtcs,
presets=raw.get("presets", {}), path=path)
def _pid_to_dict(p):
d = {"key": p.key, "name": p.name, "mode": p.mode}
if p.pid:
d["pid"] = p.pid
if p.mode in ("01", "22"):
d["nbytes"] = p.nbytes
if p.formula:
d["formula"] = p.formula
if p.deps:
d["deps"] = list(p.deps)
d.update({"unit": p.unit, "group": p.group, "vmin": p.vmin, "vmax": p.vmax,
"confidence": p.confidence})
if p.round is not None:
d["round"] = p.round
if p.notes:
d["notes"] = p.notes
return d
def save_profile(profile, path=None):
path = path or profile.path
out = {
"schema": SCHEMA,
"meta": profile.meta,
"pids": [_pid_to_dict(p) for p in profile.pids],
"presets": profile.presets,
"dtcs": [{"code": d.code, "desc": d.desc, "system": d.system,
"no_start": d.no_start, "causes": d.causes} for d in profile.dtcs],
}
with open(path, "w") as f:
json.dump(out, f, indent=2)
return path
def list_profiles(directory=None):
"""Return [(path, meta_dict), ...] for every *.json profile in directory."""
directory = directory or profiles_dir()
out = []
for p in sorted(glob.glob(os.path.join(directory, "*.json"))):
try:
with open(p) as f:
meta = json.load(f).get("meta", {})
out.append((p, meta))
except Exception:
continue
return out
DEFAULT_PROFILE = "ford-6.0-powerstroke.json"
def default_profile_path():
return os.path.join(profiles_dir(), DEFAULT_PROFILE)
def load_default():
return load_profile(default_profile_path())
+30 -144
View File
@@ -1,137 +1,49 @@
"""PID + DTC registry for the Ford 6.0L Power Stroke (plus generic OBD-II). """PID + DTC data model and registry, backed by a vehicle Profile.
Canonical home for the verified Mode-22 addresses, scaling, and the DTC The actual PID numbers, scaling formulas, and DTC meanings live in JSON
database. Decoders are plain callables on the raw byte list. Confidence: vehicle profiles under profiles/ (data, not code) so the app is vehicle-
verified -- multi-source AND confirmed on the truck's scan/crank agnostic and others can contribute profiles. This module is the in-memory
doc -- corroborated in sources, not (yet) read on the truck model + lookups; profile.py loads/saves the JSON.
tentative -- single-source or disputed scaling
PID numbers/scaling corrected 2026-06-29 by the ford-60-pid-hunt workflow;
see diagnostics/2026-06-29-no-start/pid-research.md. 09D0 (FICM Main) was
confirmed on-truck 2026-06-30 (read 48.0V during a crank, intermittent).
""" """
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Callable, Tuple from typing import Callable, Tuple
def _u16(b):
return (b[0] << 8) + b[1]
@dataclass @dataclass
class Pid: class Pid:
key: str key: str
name: str name: str
mode: str # "01" | "22" | "atrv" | "derived" mode: str = "22" # "01" | "22" | "atrv" | "derived"
pid: str = "" # hex: "1446" (m22) or "0C" (m01) pid: str = "" # hex: "1446" (m22) or "0C" (m01)
nbytes: int = 2 nbytes: int = 2
decode: Callable = None # m01/m22: f(raw_bytes); derived: f(dep_values) formula: str = "" # scaling expr in A/B/... (raw) or dep keys (derived)
decode: Callable = None # built from formula by profile loader
unit: str = "" unit: str = ""
group: str = "misc" # fuel | ficm | air | engine | driveline | power group: str = "misc" # fuel | ficm | air | engine | driveline | power | misc
vmin: float = 0.0 vmin: float = 0.0
vmax: float = 100.0 vmax: float = 100.0
confidence: str = "verified" confidence: str = "verified" # verified | doc | tentative
deps: Tuple[str, ...] = () # for derived channels round: int = None # display rounding (None=raw float, 0=int)
deps: Tuple[str, ...] = ()
notes: str = "" notes: str = ""
def _build(): @dataclass
P = [] class Dtc:
a = P.append code: str
# ---- Ford-enhanced Mode 22 -- pressures / fuel ---- desc: str
a(Pid("ICP", "Injection Control Pressure", "22", "1446", 2, system: str = "powertrain"
lambda b: round(_u16(b) * 0.57, 1), "psi", "fuel", 0, 3500, no_start: bool = False
"verified", notes="need ~500+ psi to fire")) causes: str = ""
a(Pid("ICP_V", "ICP Sensor Voltage", "22", "16AD", 2,
lambda b: round(_u16(b) * 0.000072, 4), "V", "fuel", 0, 5,
"tentative", notes="single-source"))
a(Pid("IPR", "Injection Pressure Regulator", "22", "1434", 1,
lambda b: round(b[0] * 13.53 / 35, 1), "%", "fuel", 0, 100,
"tentative", notes="KOEO ~14-15%, cranking ~30-40%"))
a(Pid("MAP", "Manifold Absolute Pressure", "22", "1440", 2,
lambda b: round(_u16(b) * 0.03625, 2), "psia", "air", 0, 60,
"verified"))
a(Pid("BARO", "Barometric Pressure", "22", "1442", 2,
lambda b: round(_u16(b) * 0.03625, 2), "psia", "air", 0, 20,
"verified"))
a(Pid("EBP", "Exhaust Back Pressure", "22", "1445", 2,
lambda b: round(_u16(b) * 0.03625, 2), "psia", "air", 0, 60,
"verified", notes="minus BARO = gauge"))
a(Pid("EOT", "Engine Oil Temperature", "22", "1310", 2,
lambda b: round(_u16(b) / 100.0 - 40, 1), "C", "engine", -40, 160,
"verified"))
# ---- FICM ----
a(Pid("FICM_M", "FICM Main Power", "22", "09D0", 2,
lambda b: round(_u16(b) / 256.0, 1), "V", "ficm", 0, 55,
"verified", notes="~48V; <45 suspect; reads intermittently while cranking"))
a(Pid("FICM_L", "FICM Logic Power", "22", "09CF", 2,
lambda b: round(_u16(b) / 256.0, 1), "V", "ficm", 0, 16,
"doc"))
a(Pid("FICM_V", "FICM Vehicle Power", "22", "09CE", 2,
lambda b: round(_u16(b) / 256.0, 1), "V", "ficm", 0, 16,
"doc"))
a(Pid("FICM_SYNC", "FICM Sync", "22", "09CD", 1,
lambda b: (b[0] >> 1) & 1, "", "ficm", 0, 1,
"doc", notes="1=in sync, 0=no sync"))
# ---- Driveline ----
a(Pid("GEAR", "Current Gear", "22", "11B3", 1,
lambda b: b[0] // 2, "", "driveline", 0, 6, "verified"))
a(Pid("TSS", "Trans Input Shaft Speed", "22", "11B4", 2,
lambda b: round(_u16(b) / 4), "rpm", "driveline", 0, 4000, "verified"))
# ---- Generic Mode 01 ----
a(Pid("RPM", "Engine RPM", "01", "0C", 2,
lambda b: round(_u16(b) / 4), "rpm", "engine", 0, 4000, "verified"))
a(Pid("ECT", "Engine Coolant Temp", "01", "05", 1,
lambda b: b[0] - 40, "C", "engine", -40, 160, "verified"))
a(Pid("IAT", "Intake Air Temp", "01", "0F", 1,
lambda b: b[0] - 40, "C", "air", -40, 160, "verified"))
a(Pid("LOAD", "Engine Load", "01", "04", 1,
lambda b: round(b[0] * 100 / 255), "%", "engine", 0, 100, "verified"))
a(Pid("VPCM", "Module Voltage", "01", "42", 2,
lambda b: round(_u16(b) / 1000.0, 2), "V", "power", 0, 16, "verified"))
# ---- More documented PIDs from the workflow (not yet truck-verified) ----
a(Pid("VGT", "VGT Duty Cycle", "22", "096D", 2,
lambda b: round(_u16(b) * 100 / 32767, 1), "%", "air", 0, 100,
"doc", notes="turbo vane duty"))
a(Pid("FAN", "Fan Speed", "22", "099F", 2,
lambda b: round(_u16(b) / 4), "rpm", "engine", 0, 4000,
"doc", notes="real ceiling ~3500"))
a(Pid("INJ_TIMING", "Injection Timing", "22", "09CC", 2,
lambda b: round(_u16(b) * 10 / 64, 1), "degBTDC", "fuel", -10, 30,
"tentative", notes="scaling disputed; using *10/64 (ScanGauge), not /10"))
a(Pid("VBAT", "Battery (PCM)", "22", "1172", 1,
lambda b: round(b[0] / 16, 1), "V", "power", 0, 16,
"tentative", notes="PCM-reported B+; distinct from ATRV port voltage"))
a(Pid("FUEL_PUMP", "Fuel Pump Duty (HFCM)", "22", "1672", 1,
lambda b: round(b[0] * 100 / 128, 1), "%", "fuel", 0, 100,
"tentative", notes="sits ~100%, drops on high EOT"))
a(Pid("FUEL_LVL", "Fuel Level", "22", "16C1", 2,
lambda b: round(_u16(b) * 100 / 328, 1), "%", "misc", 0, 100,
"tentative", notes="UNCALIBRATED -- needs per-truck full/empty cal"))
a(Pid("MFDES", "Mass Fuel Desired", "22", "1411", 2,
lambda b: _u16(b), "raw", "fuel", 0, 65535,
"tentative", notes="~mg/stroke internal count; no verified GPH formula"))
# ---- Pseudo / derived ----
a(Pid("BATT", "Battery (OBD port)", "atrv", "", 0,
None, "V", "power", 0, 16, "verified"))
a(Pid("BOOST", "Boost (MGP)", "derived", "", 0,
lambda vals: round(vals[0] - vals[1], 2), "psi", "air", -5, 40,
"verified", deps=("MAP", "BARO"), notes="MAP - BARO"))
return P
# Subscription presets per perspective (key -> default poll Hz set by scheduler)
PRESETS = {
"crank": ["ICP", "FICM_M", "BATT", "RPM"],
"driving": ["BOOST", "VGT", "EOT", "ECT", "EBP", "LOAD", "RPM", "IPR", "BATT"],
"vitals": ["ICP", "FICM_M", "FICM_L", "IPR", "BATT", "RPM", "ECT", "EOT",
"IAT", "VPCM"],
}
class PidRegistry: class PidRegistry:
def __init__(self): """In-memory PID set + presets for the active vehicle profile."""
self._by_key = {p.key: p for p in _build()}
def __init__(self, profile):
self.profile = profile
self._by_key = {p.key: p for p in profile.pids}
self.presets = dict(profile.presets)
def get(self, key): def get(self, key):
return self._by_key.get(key) return self._by_key.get(key)
@@ -143,41 +55,15 @@ class PidRegistry:
return [p for p in self._by_key.values() if p.group == g] return [p for p in self._by_key.values() if p.group == g]
def preset(self, name): def preset(self, name):
return [self._by_key[k] for k in PRESETS.get(name, []) if k in self._by_key] return [self._by_key[k] for k in self.presets.get(name, []) if k in self._by_key]
def preset_names(self):
# --------------------------------------------------------------------------- return list(self.presets.keys())
# DTC database -- generic SAE + notable Ford 6.0 codes. The full Ford code
# DB is being built by a separate cross-verified workflow; this is the seed.
# ---------------------------------------------------------------------------
@dataclass
class Dtc:
code: str
desc: str
system: str = "powertrain"
no_start: bool = False
causes: str = ""
def _dtcs():
rows = [
Dtc("P0087", "Fuel rail/system pressure too LOW", "fuel", True),
Dtc("P0088", "Fuel rail/system pressure too HIGH", "fuel"),
Dtc("P0148", "Fuel delivery error (low pressure / HPOP / IPR)", "fuel", True),
Dtc("P0335", "Crankshaft position (CKP) sensor circuit", "engine", True),
Dtc("P0340", "Camshaft position (CMP) sensor circuit", "engine", True),
Dtc("P0611", "FICM performance", "ficm", True),
Dtc("P1316", "Injector circuit/FICM codes detected", "ficm", True),
Dtc("P0606", "PCM processor fault", "power", True),
Dtc("U0100", "Lost communication with PCM/ECM", "network", True),
Dtc("P0670", "Glow plug control module circuit", "engine"),
]
return {d.code: d for d in rows}
class DtcDatabase: class DtcDatabase:
def __init__(self): def __init__(self, profile):
self._db = _dtcs() self._db = {d.code: d for d in profile.dtcs}
def get(self, code): def get(self, code):
return self._db.get(code) or Dtc(code, "(unknown - look up this code)") return self._db.get(code) or Dtc(code, "(unknown - look up this code)")
+29
View File
@@ -87,6 +87,35 @@ class TimeSeriesStore:
with self._lock: with self._lock:
return list(self._ch.keys()) return list(self._ch.keys())
def clear(self):
"""Empty every channel's history + min/max (start a fresh capture)."""
with self._lock:
chans = list(self._ch.values())
for c in chans:
with c._lock:
c.buf.clear()
c.lo = c.hi = c.last_v = c.last_t = None
def snapshot(self):
"""Return {key: [(t, v), ...]} of all current channel history."""
with self._lock:
chans = dict(self._ch)
return {k: c.series() for k, c in chans.items()}
def export_csv(store, path):
"""Write a store's current buffers to a long-format CSV (t,key,value)."""
rows = []
for key, series in store.snapshot().items():
for t, v in series:
rows.append((t, key, v))
rows.sort(key=lambda r: r[0])
with open(path, "w") as f:
f.write("t,key,value\n")
for t, key, v in rows:
f.write(f"{t:.3f},{key},{'' if v is None else v}\n")
return path
class CsvRecorder: class CsvRecorder:
"""Long-format session recorder: one row per sample (t,key,value). """Long-format session recorder: one row per sample (t,key,value).
+79
View File
@@ -0,0 +1,79 @@
# Vehicle Profiles
Each `*.json` file here is a **vehicle profile** — pure data that makes the
ford-obd app vehicle-agnostic. A profile defines a vehicle's PIDs (with safe
scaling formulas), DTC meanings, and named presets. Load one in the app via
**Profile → Load**, or drop a new file in this folder and it appears in the list.
**Contributions welcome** — add a profile for your vehicle and open a PR.
## Current profiles
| File | Vehicle | Notes |
|---|---|---|
| `ford-6.0-powerstroke.json` | Ford 6.0L Power Stroke (20032007) | Verified Mode-22 PIDs (ICP, FICM, EBP, MAP/BARO, EOT, …) + DTCs |
| `generic-obd2.json` | Any OBD-II vehicle (1996+) | Standard SAE Mode-01 PIDs only — a base to fork from |
## Schema (`schema: 1`)
```jsonc
{
"schema": 1,
"meta": {
"name": "Ford 6.0L Power Stroke", // shown in the Profile menu
"make": "Ford", "model": "...", "years": "2003-2007",
"engine": "6.0L Power Stroke diesel",
"author": "you", "version": "1.0.0",
"protocol": "auto", // ELM ATSP target, or "auto"
"notes": "provenance / confidence policy / caveats"
},
"presets": { "crank": ["ICP","FICM_M","BATT","RPM"], "...": [] },
"pids": [ /* see below */ ],
"dtcs": [ {"code":"P0087","desc":"...","system":"fuel","no_start":true,"causes":""} ]
}
```
### PID fields
| Field | Meaning |
|---|---|
| `key` | short unique id used in presets/derived (e.g. `ICP`) |
| `name` | display name |
| `mode` | `01` (generic SAE), `22` (manufacturer-enhanced), `atrv` (adapter pin voltage), `derived` (computed from other PIDs) |
| `pid` | request id hex — `0C` (mode 01) or `1446` (mode 22) |
| `nbytes` | expected data bytes in the response |
| `formula` | scaling expression (see below) |
| `round` | display rounding: omit = raw, `0` = integer, `2` = 2 dp |
| `unit`, `group` | display unit; group = `fuel\|ficm\|air\|engine\|driveline\|power\|misc` |
| `vmin`,`vmax` | range (used for gauges + the Normalize overlay) |
| `confidence` | `verified` (multi-source / read on a real vehicle), `doc` (sourced, unconfirmed), `tentative` (single-source / disputed) |
| `deps` | for `derived`: the PID keys the formula references |
| `notes` | freeform; surfaced as a tooltip |
### Formula language
Arithmetic over **data-byte variables** `A, B, C, …` (byte 0, 1, 2, …) — the
same convention as Torque/FORScan/ScanGauge:
```
(A*256+B)*0.57 # 16-bit * scale (ICP psi)
A-40 # 8-bit temp
(A>>1)&1 # a status bit
A//2 # integer divide (gear)
```
For `derived` PIDs the variables are **other PID keys**: `"MAP - BARO"` with
`"deps": ["MAP","BARO"]`.
Formulas are evaluated by a **safe AST evaluator** (`obdcore/formula.py`):
only numbers, the declared variables, arithmetic/bitwise operators, and
`min/max/abs/round/int/float` are allowed. Anything else (names, attribute
access, arbitrary calls) is rejected at load — so a community profile **cannot
execute code**.
## Caveats worth recording in `notes`
- Manufacturer-enhanced (`22`) PIDs vary by model year and PCM strategy.
- Some signals aren't on the OBD stream at all (e.g. the 6.0 has no EGT or
lube-oil-pressure PID — only ICP and EOT). Don't invent them.
- Mark single-source numbers `tentative` and say so in `notes`.
+60
View File
@@ -0,0 +1,60 @@
{
"schema": 1,
"meta": {
"name": "Ford 6.0L Power Stroke",
"make": "Ford",
"model": "Super Duty / Excursion",
"years": "2003-2007",
"engine": "6.0L Power Stroke diesel",
"author": "ford-obd project",
"version": "1.1.0",
"protocol": "auto",
"notes": "PID addresses + scaling corrected/verified by the ford-60-pid-hunt workflow (2026-06-29) and on-truck reads (2026-06-30). confidence: verified = multi-source or read on a real 6.0; doc = corroborated in sources, not yet read on-vehicle; tentative = single-source / disputed scaling. ICP_DES (desired ICP) has no public Mode-22 DID -> FORScan-only, not included."
},
"presets": {
"crank": ["ICP", "FICM_M", "BATT", "RPM"],
"driving": ["BOOST", "VGT", "EOT", "ECT", "EBP", "LOAD", "RPM", "IPR", "BATT"],
"vitals": ["ICP", "FICM_M", "FICM_L", "IPR", "BATT", "RPM", "ECT", "EOT", "IAT", "VPCM"]
},
"pids": [
{"key": "ICP", "name": "Injection Control Pressure", "mode": "22", "pid": "1446", "nbytes": 2, "formula": "(A*256+B)*0.57", "round": 1, "unit": "psi", "group": "fuel", "vmin": 0, "vmax": 3500, "confidence": "verified", "notes": "need ~500+ psi to fire"},
{"key": "ICP_V", "name": "ICP Sensor Voltage", "mode": "22", "pid": "16AD", "nbytes": 2, "formula": "(A*256+B)*0.000072", "round": 4, "unit": "V", "group": "fuel", "vmin": 0, "vmax": 5, "confidence": "tentative", "notes": "single-source"},
{"key": "IPR", "name": "Injection Pressure Regulator", "mode": "22", "pid": "1434", "nbytes": 1, "formula": "A*13.53/35", "round": 1, "unit": "%", "group": "fuel", "vmin": 0, "vmax": 100, "confidence": "tentative", "notes": "KOEO ~14-15%, cranking ~30-40%"},
{"key": "INJ_TIMING", "name": "Injection Timing", "mode": "22", "pid": "09CC", "nbytes": 2, "formula": "(A*256+B)*10/64", "round": 1, "unit": "degBTDC", "group": "fuel", "vmin": -10, "vmax": 30, "confidence": "tentative", "notes": "scaling disputed; using *10/64 (ScanGauge), not /10"},
{"key": "FUEL_PUMP", "name": "Fuel Pump Duty (HFCM)", "mode": "22", "pid": "1672", "nbytes": 1, "formula": "A*100/128", "round": 1, "unit": "%", "group": "fuel", "vmin": 0, "vmax": 100, "confidence": "tentative", "notes": "sits ~100%, drops on high EOT"},
{"key": "MFDES", "name": "Mass Fuel Desired", "mode": "22", "pid": "1411", "nbytes": 2, "formula": "A*256+B", "round": 0, "unit": "raw", "group": "fuel", "vmin": 0, "vmax": 65535, "confidence": "tentative", "notes": "~mg/stroke internal count; no verified GPH formula"},
{"key": "FICM_M", "name": "FICM Main Power", "mode": "22", "pid": "09D0", "nbytes": 2, "formula": "(A*256+B)/256", "round": 1, "unit": "V", "group": "ficm", "vmin": 0, "vmax": 55, "confidence": "verified", "notes": "~48V; <45 suspect; reads intermittently while cranking"},
{"key": "FICM_L", "name": "FICM Logic Power", "mode": "22", "pid": "09CF", "nbytes": 2, "formula": "(A*256+B)/256", "round": 1, "unit": "V", "group": "ficm", "vmin": 0, "vmax": 16, "confidence": "doc"},
{"key": "FICM_V", "name": "FICM Vehicle Power", "mode": "22", "pid": "09CE", "nbytes": 2, "formula": "(A*256+B)/256", "round": 1, "unit": "V", "group": "ficm", "vmin": 0, "vmax": 16, "confidence": "doc"},
{"key": "FICM_SYNC", "name": "FICM Sync", "mode": "22", "pid": "09CD", "nbytes": 1, "formula": "(A>>1)&1", "round": 0, "unit": "", "group": "ficm", "vmin": 0, "vmax": 1, "confidence": "doc", "notes": "1=in sync, 0=no sync"},
{"key": "MAP", "name": "Manifold Absolute Pressure", "mode": "22", "pid": "1440", "nbytes": 2, "formula": "(A*256+B)*0.03625", "round": 2, "unit": "psia", "group": "air", "vmin": 0, "vmax": 60, "confidence": "verified"},
{"key": "BARO", "name": "Barometric Pressure", "mode": "22", "pid": "1442", "nbytes": 2, "formula": "(A*256+B)*0.03625", "round": 2, "unit": "psia", "group": "air", "vmin": 0, "vmax": 20, "confidence": "verified"},
{"key": "EBP", "name": "Exhaust Back Pressure", "mode": "22", "pid": "1445", "nbytes": 2, "formula": "(A*256+B)*0.03625", "round": 2, "unit": "psia", "group": "air", "vmin": 0, "vmax": 60, "confidence": "verified", "notes": "minus BARO = gauge"},
{"key": "VGT", "name": "VGT Duty Cycle", "mode": "22", "pid": "096D", "nbytes": 2, "formula": "(A*256+B)*100/32767", "round": 1, "unit": "%", "group": "air", "vmin": 0, "vmax": 100, "confidence": "doc", "notes": "turbo vane duty"},
{"key": "EOT", "name": "Engine Oil Temperature", "mode": "22", "pid": "1310", "nbytes": 2, "formula": "(A*256+B)/100-40", "round": 1, "unit": "C", "group": "engine", "vmin": -40, "vmax": 160, "confidence": "verified"},
{"key": "FAN", "name": "Fan Speed", "mode": "22", "pid": "099F", "nbytes": 2, "formula": "(A*256+B)/4", "round": 0, "unit": "rpm", "group": "engine", "vmin": 0, "vmax": 4000, "confidence": "doc", "notes": "real ceiling ~3500"},
{"key": "GEAR", "name": "Current Gear", "mode": "22", "pid": "11B3", "nbytes": 1, "formula": "A//2", "round": 0, "unit": "", "group": "driveline", "vmin": 0, "vmax": 6, "confidence": "verified"},
{"key": "TSS", "name": "Trans Input Shaft Speed", "mode": "22", "pid": "11B4", "nbytes": 2, "formula": "(A*256+B)/4", "round": 0, "unit": "rpm", "group": "driveline", "vmin": 0, "vmax": 4000, "confidence": "verified"},
{"key": "RPM", "name": "Engine RPM", "mode": "01", "pid": "0C", "nbytes": 2, "formula": "(A*256+B)/4", "round": 0, "unit": "rpm", "group": "engine", "vmin": 0, "vmax": 4000, "confidence": "verified"},
{"key": "ECT", "name": "Engine Coolant Temp", "mode": "01", "pid": "05", "nbytes": 1, "formula": "A-40", "round": 0, "unit": "C", "group": "engine", "vmin": -40, "vmax": 160, "confidence": "verified"},
{"key": "IAT", "name": "Intake Air Temp", "mode": "01", "pid": "0F", "nbytes": 1, "formula": "A-40", "round": 0, "unit": "C", "group": "air", "vmin": -40, "vmax": 160, "confidence": "verified"},
{"key": "LOAD", "name": "Engine Load", "mode": "01", "pid": "04", "nbytes": 1, "formula": "A*100/255", "round": 0, "unit": "%", "group": "engine", "vmin": 0, "vmax": 100, "confidence": "verified"},
{"key": "VPCM", "name": "Module Voltage", "mode": "01", "pid": "42", "nbytes": 2, "formula": "(A*256+B)/1000", "round": 2, "unit": "V", "group": "power", "vmin": 0, "vmax": 16, "confidence": "verified"},
{"key": "VBAT", "name": "Battery (PCM)", "mode": "22", "pid": "1172", "nbytes": 1, "formula": "A/16", "round": 1, "unit": "V", "group": "power", "vmin": 0, "vmax": 16, "confidence": "tentative", "notes": "PCM-reported B+; distinct from ATRV port voltage"},
{"key": "FUEL_LVL", "name": "Fuel Level", "mode": "22", "pid": "16C1", "nbytes": 2, "formula": "(A*256+B)*100/328", "round": 1, "unit": "%", "group": "misc", "vmin": 0, "vmax": 100, "confidence": "tentative", "notes": "UNCALIBRATED -- needs per-truck full/empty cal"},
{"key": "BATT", "name": "Battery (OBD port)", "mode": "atrv", "unit": "V", "group": "power", "vmin": 0, "vmax": 16, "confidence": "verified", "notes": "ELM327 ATRV pin voltage"},
{"key": "BOOST", "name": "Boost (MGP)", "mode": "derived", "formula": "MAP-BARO", "deps": ["MAP", "BARO"], "round": 2, "unit": "psi", "group": "air", "vmin": -5, "vmax": 40, "confidence": "verified", "notes": "MAP - BARO"}
],
"dtcs": [
{"code": "P0087", "desc": "Fuel rail/system pressure too LOW", "system": "fuel", "no_start": true},
{"code": "P0088", "desc": "Fuel rail/system pressure too HIGH", "system": "fuel"},
{"code": "P0148", "desc": "Fuel delivery error (low pressure / HPOP / IPR)", "system": "fuel", "no_start": true},
{"code": "P0335", "desc": "Crankshaft position (CKP) sensor circuit", "system": "engine", "no_start": true},
{"code": "P0340", "desc": "Camshaft position (CMP) sensor circuit", "system": "engine", "no_start": true},
{"code": "P0611", "desc": "FICM performance", "system": "ficm", "no_start": true},
{"code": "P1316", "desc": "Injector circuit/FICM codes detected", "system": "ficm", "no_start": true},
{"code": "P0606", "desc": "PCM processor fault", "system": "power", "no_start": true},
{"code": "U0100", "desc": "Lost communication with PCM/ECM", "system": "network", "no_start": true},
{"code": "P0670", "desc": "Glow plug control module circuit", "system": "engine"}
]
}
+31
View File
@@ -0,0 +1,31 @@
{
"schema": 1,
"meta": {
"name": "Generic OBD-II",
"make": "Any",
"model": "Any OBD-II vehicle (1996+)",
"years": "1996+",
"engine": "any",
"author": "ford-obd project",
"version": "1.0.0",
"protocol": "auto",
"notes": "Standard SAE J1979 Mode-01 PIDs only -- supported by essentially every OBD-II vehicle. Use as a base/starting point for a new vehicle profile, then add manufacturer-enhanced Mode-22 PIDs. Decodes are the SAE-standard formulas."
},
"presets": {
"basic": ["RPM", "SPEED", "ECT", "IAT", "MAP", "THROTTLE", "LOAD", "BATT"]
},
"pids": [
{"key": "RPM", "name": "Engine RPM", "mode": "01", "pid": "0C", "nbytes": 2, "formula": "(A*256+B)/4", "round": 0, "unit": "rpm", "group": "engine", "vmin": 0, "vmax": 8000, "confidence": "verified"},
{"key": "SPEED", "name": "Vehicle Speed", "mode": "01", "pid": "0D", "nbytes": 1, "formula": "A", "round": 0, "unit": "km/h", "group": "driveline", "vmin": 0, "vmax": 255, "confidence": "verified"},
{"key": "ECT", "name": "Engine Coolant Temp", "mode": "01", "pid": "05", "nbytes": 1, "formula": "A-40", "round": 0, "unit": "C", "group": "engine", "vmin": -40, "vmax": 215, "confidence": "verified"},
{"key": "IAT", "name": "Intake Air Temp", "mode": "01", "pid": "0F", "nbytes": 1, "formula": "A-40", "round": 0, "unit": "C", "group": "air", "vmin": -40, "vmax": 215, "confidence": "verified"},
{"key": "MAP", "name": "Intake Manifold Pressure", "mode": "01", "pid": "0B", "nbytes": 1, "formula": "A", "round": 0, "unit": "kPa", "group": "air", "vmin": 0, "vmax": 255, "confidence": "verified"},
{"key": "MAF", "name": "Mass Air Flow", "mode": "01", "pid": "10", "nbytes": 2, "formula": "(A*256+B)/100", "round": 2, "unit": "g/s", "group": "air", "vmin": 0, "vmax": 655, "confidence": "verified"},
{"key": "THROTTLE", "name": "Throttle Position", "mode": "01", "pid": "11", "nbytes": 1, "formula": "A*100/255", "round": 0, "unit": "%", "group": "engine", "vmin": 0, "vmax": 100, "confidence": "verified"},
{"key": "LOAD", "name": "Calculated Load", "mode": "01", "pid": "04", "nbytes": 1, "formula": "A*100/255", "round": 0, "unit": "%", "group": "engine", "vmin": 0, "vmax": 100, "confidence": "verified"},
{"key": "TIMING", "name": "Timing Advance", "mode": "01", "pid": "0E", "nbytes": 1, "formula": "A/2-64", "round": 1, "unit": "deg", "group": "engine", "vmin": -64, "vmax": 64, "confidence": "verified"},
{"key": "VPCM", "name": "Module Voltage", "mode": "01", "pid": "42", "nbytes": 2, "formula": "(A*256+B)/1000", "round": 2, "unit": "V", "group": "power", "vmin": 0, "vmax": 16, "confidence": "verified"},
{"key": "BATT", "name": "Battery (OBD port)", "mode": "atrv", "unit": "V", "group": "power", "vmin": 0, "vmax": 16, "confidence": "verified", "notes": "ELM327 ATRV pin voltage"}
],
"dtcs": []
}
+32 -4
View File
@@ -11,7 +11,9 @@ import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from obdcore import PidRegistry, TimeSeriesStore, PollScheduler, CsvRecorder, replay_csv from obdcore import (PidRegistry, TimeSeriesStore, PollScheduler, CsvRecorder,
replay_csv, load_default, load_profile, default_profile_path,
list_profiles, compile_formula, FormulaError)
from obdcore.mock import MockLink from obdcore.mock import MockLink
@@ -28,7 +30,7 @@ class FakeClock:
def _setup(specs): def _setup(specs):
clk = FakeClock() clk = FakeClock()
reg = PidRegistry() reg = PidRegistry(load_default())
store = TimeSeriesStore() store = TimeSeriesStore()
link = MockLink(clock=clk) link = MockLink(clock=clk)
sch = PollScheduler(link, reg, store, clock=clk) sch = PollScheduler(link, reg, store, clock=clk)
@@ -36,8 +38,33 @@ def _setup(specs):
return clk, reg, store, sch return clk, reg, store, sch
def test_profiles_load_and_validate():
profs = list_profiles()
assert any("ford-6.0" in p for p, _ in profs), "ford profile should be listed"
for path, meta in profs:
prof = load_profile(path) # compiles every formula -> raises if bad
assert prof.meta.get("name")
assert all(p.decode or p.mode == "atrv" for p in prof.pids)
print(f" {len(profs)} profiles load + compile clean: OK")
def test_formula_is_sandboxed():
# legit
fn = compile_formula("(A*256+B)*0.57", "ABCDEFGH")
assert abs(fn({"A": 0, "B": 22}) - 12.54) < 0.01
# hostile / disallowed -> rejected at compile
for bad in ("__import__('os').system('x')", "open('/etc/passwd')",
"A.__class__", "Z+1", "A if B else C"):
try:
compile_formula(bad, "ABC")
raise AssertionError(f"should have rejected: {bad}")
except FormulaError:
pass
print(" formula evaluator rejects code/unknowns: OK")
def test_registry_decoders_match_truck_bytes(): def test_registry_decoders_match_truck_bytes():
reg = PidRegistry() reg = PidRegistry(load_default())
cases = { cases = {
"ICP": ([0x00, 0x16], 12.5), "EBP": ([0x01, 0x8F], 14.46), "ICP": ([0x00, 0x16], 12.5), "EBP": ([0x01, 0x8F], 14.46),
"MAP": ([0x01, 0x89], 14.25), "BARO": ([0x01, 0x88], 14.21), "MAP": ([0x01, 0x89], 14.25), "BARO": ([0x01, 0x88], 14.21),
@@ -110,7 +137,8 @@ def test_record_replay_roundtrip(tmp_path=None):
if __name__ == "__main__": if __name__ == "__main__":
for fn in [test_registry_decoders_match_truck_bytes, test_crank_ramp_and_peak, for fn in [test_profiles_load_and_validate, test_formula_is_sandboxed,
test_registry_decoders_match_truck_bytes, test_crank_ramp_and_peak,
test_derived_boost_channel, test_dead_pid_parks_and_revives, test_derived_boost_channel, test_dead_pid_parks_and_revives,
test_record_replay_roundtrip]: test_record_replay_roundtrip]:
fn() fn()