diff --git a/README.md b/README.md index c10c855..2f1ac83 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,9 @@ a new vehicle is data, not code. Runs on **Windows, macOS, and Linux**. report. **Vehicle info** — VIN, calibration IDs, ECU name (Mode 09). - **Trip / Performance** — live MPG, trip distance/fuel, and **0-60 mph & 1/4-mile** timers (auto-detected from a standing start). +- **Bi-directional / service functions** — actuator tests, service resets, etc., + defined per-vehicle in the profile and run behind risk-based confirmations + (ships with safe standard actions; OBDash never synthesizes command bytes). - **Vehicle profiles** — switch/import/edit vehicles from the Profile menu. - **Units** — °C/°F toggle (US/metric). - **Captures** — record a session to CSV and replay it. diff --git a/gui/controller.py b/gui/controller.py index ff5311a..23a6e53 100644 --- a/gui/controller.py +++ b/gui/controller.py @@ -153,6 +153,14 @@ class Controller: def read_freeze_frame(self): return self._oneoff(lambda: self.link.read_freeze_frame()) + # -- bi-directional / service actions -- + def actions(self): + return self.profile.actions or [] + + def run_action(self, action): + from obdcore.actions import run_action + return self._oneoff(lambda: run_action(action, self.link), timeout=20.0) + # -- trip / performance (fed from the live store each GUI tick) -- def update_trip(self): spd = self.store.latest(self.speed_key) if self.speed_key else None diff --git a/gui/main.py b/gui/main.py index 69d557b..752ab18 100644 --- a/gui/main.py +++ b/gui/main.py @@ -87,6 +87,9 @@ class MainWindow(QtWidgets.QMainWindow): "I/M readiness monitors + MIL (will it pass inspection?)") self._act(diagm, "Vehicle Info (VIN)", self._vehicle_info, "VIN, calibration IDs, ECU name (mode 09)") + diagm.addSeparator() + self._act(diagm, "Service & Bi-directional…", self._service_actions, + "Actuator tests, service resets, and other profile-defined functions") viewm = mb.addMenu("&View") self.view_graph = self._act(viewm, "Graph View", lambda: self._set_view(0), @@ -483,6 +486,68 @@ class MainWindow(QtWidgets.QMainWindow): bb.rejected.connect(dlg.reject); lay.addWidget(bb) dlg.exec() + _RISK_COLOR = {"safe": "#3cb44b", "caution": "#e6a23c", "danger": "#e6194B"} + + def _service_actions(self): + if not self._need_connection(): + return + acts = self.ctl.actions() + dlg = QtWidgets.QDialog(self); dlg.setWindowTitle("Service & Bi-directional Functions") + dlg.resize(560, 420) + lay = QtWidgets.QVBoxLayout(dlg) + if not acts: + lay.addWidget(QtWidgets.QLabel( + "No service functions are defined for this vehicle profile yet.\n\n" + "These are manufacturer-specific — add them to the profile's \"actions\"\n" + "block (see profiles/PROFILE_SPEC.md). OBDash never synthesizes command\n" + "bytes; it only runs what a verified profile defines.")) + else: + lay.addWidget(QtWidgets.QLabel( + "Caution: these send commands to the vehicle. Read each warning.")) + scroll = QtWidgets.QScrollArea(); scroll.setWidgetResizable(True) + inner = QtWidgets.QWidget(); il = QtWidgets.QVBoxLayout(inner) + for a in acts: + row = QtWidgets.QFrame() + row.setStyleSheet("QFrame{border:1px solid #333;border-radius:6px;}") + rl = QtWidgets.QHBoxLayout(row) + txt = QtWidgets.QLabel( + f"{a.name} " + f"[{a.risk}]" + f"
{a.description}") + txt.setWordWrap(True) + rl.addWidget(txt, 1) + btn = QtWidgets.QPushButton("Run") + btn.clicked.connect(lambda _=False, act=a: self._run_action(act)) + rl.addWidget(btn) + il.addWidget(row) + il.addStretch(1) + scroll.setWidget(inner); lay.addWidget(scroll) + bb = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Close) + bb.rejected.connect(dlg.reject); lay.addWidget(bb) + dlg.exec() + + def _run_action(self, action): + if action.risk in ("caution", "danger"): + msg = (action.warning or "This sends a command to the vehicle.") + \ + "\n\nProceed?" + btn = QtWidgets.QMessageBox.warning( + self, f"Run: {action.name}", msg, + QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, + QtWidgets.QMessageBox.No) + if btn != QtWidgets.QMessageBox.Yes: + return + try: + res = self.ctl.run_action(action) or {} + except Exception as e: + QtWidgets.QMessageBox.critical(self, action.name, str(e)); return + if res.get("ok"): + QtWidgets.QMessageBox.information(self, action.name, res.get("message", "Done.")) + self.status.showMessage(f"{action.name}: {res.get('message','done')}") + else: + QtWidgets.QMessageBox.warning(self, action.name, + "Failed: " + res.get("message", "no response")) + self.status.showMessage(f"{action.name} failed: {res.get('message','')}") + # ---------- center (graph + table stack) ---------- def _build_center(self): self.stack = QtWidgets.QStackedWidget() diff --git a/obdcore/actions.py b/obdcore/actions.py new file mode 100644 index 0000000..d7331b6 --- /dev/null +++ b/obdcore/actions.py @@ -0,0 +1,131 @@ +"""Bi-directional / service functions -- profile-defined command sequences. + +FORScan-class functions (actuator tests, service resets, module writes) are +manufacturer-specific UDS (ISO 14229) sequences, so OBDash models them as DATA +in the vehicle profile (an `actions` block) rather than hardcoded logic. An +Action is a small sequence the runner executes through the ELM link: + + optional Mode 10 diagnostic session + optional Mode 27 security access (seed -> key) + one+ raw command(s) (Mode 2F I/O control, Mode 31 routine, Mode 11 + ECU reset, Mode 3E tester-present, ...) with response checks + +SAFETY: +- Every action carries a `risk` (safe | caution | danger); the GUI gates + caution/danger behind an explicit confirmation. +- The runner sends ONLY the hex bytes defined in the profile -- nothing is + synthesized. +- Security-access KEY algorithms are per-vehicle secrets and are deliberately + NOT bundled; only a few trivial/standard transforms are known. An action that + needs an unknown algorithm is BLOCKED (fails safe) rather than guessed. +""" +from dataclasses import dataclass, field + + +@dataclass +class ActionStep: + send: str # hex bytes to send, e.g. "1101" + expect: str = "" # hex the response must contain; "" = accept UDS positive response + + +@dataclass +class Action: + key: str + name: str + kind: str = "test" # test | actuator | reset | write + risk: str = "safe" # safe | caution | danger + description: str = "" + warning: str = "" # shown in the confirmation for caution/danger + session: str = None # Mode 10 subfunction hex (e.g. "03" extended) + security: dict = None # {"level": "01", "algorithm": ""} + steps: list = field(default_factory=list) # list[ActionStep] + success_msg: str = "Done." + + +# Known, NON-secret security transforms. Real per-vehicle seed->key algorithms +# are proprietary and intentionally absent -- unknown algorithm => action blocked. +SECURITY_ALGOS = { + "xor-ff": lambda seed: bytes((b ^ 0xFF) & 0xFF for b in seed), + "invert": lambda seed: bytes((~b) & 0xFF for b in seed), + "add-ff": lambda seed: bytes((b + 0xFF) & 0xFF for b in seed), +} + + +def _hex(byte_list): + return "".join(f"{b:02X}" for b in byte_list) + + +def _positive(data, req_sid): + """UDS positive response = request service id + 0x40 present in the data.""" + return (req_sid + 0x40) in data + + +def _is_negative(data): + return 0x7F in data + + +def validate_action(a): + """Raise ValueError if an action has malformed hex / structure.""" + for st in a.steps: + bytes.fromhex(st.send) # raises if not valid hex + if st.expect: + bytes.fromhex(st.expect) + if a.session is not None: + bytes.fromhex(a.session) + if a.security is not None and "algorithm" not in a.security: + raise ValueError(f"action {a.key}: security block needs an 'algorithm'") + + +def run_action(action, link, log=None): + """Execute an Action through `link` (must expose read_raw(hex)->list[int]). + Returns {"ok": bool, "message": str, "responses": [list[int], ...]}.""" + def note(m): + if log: + log(m) + + def send(hexstr): + return link.read_raw(hexstr) + + # 1. diagnostic session + if action.session: + d = send("10" + action.session) + if not _positive(d, 0x10): + return {"ok": False, "message": "could not enter diagnostic session", "responses": [d]} + + # 2. security access (seed -> key) + if action.security: + algo = action.security.get("algorithm") + fn = SECURITY_ALGOS.get(algo) + if fn is None: + return {"ok": False, "message": f"security algorithm '{algo}' not available " + "(per-vehicle secret) — action blocked for safety", "responses": []} + level = action.security.get("level", "01") + seed_resp = send("27" + level) + if not _positive(seed_resp, 0x27): + return {"ok": False, "message": "security seed request failed", "responses": [seed_resp]} + i = seed_resp.index(0x67) + seed = seed_resp[i + 2:] + key = fn(bytes(seed)) + lvl2 = f"{int(level, 16) + 1:02X}" + kr = send("27" + lvl2 + _hex(key)) + if not _positive(kr, 0x27): + return {"ok": False, "message": "security unlock rejected", "responses": [kr]} + note("security unlocked") + + # 3. the command steps + responses = [] + for st in action.steps: + d = send(st.send) + responses.append(d) + req_sid = int(st.send[:2], 16) + if st.expect: + ok = bytes.fromhex(st.expect)[0] in d + else: + ok = _positive(d, req_sid) + if not ok: + reason = "ECU negative response" if _is_negative(d) else "no valid response" + return {"ok": False, "message": f"{reason} to {st.send} (got {_hex(d) or 'nothing'})", + "responses": responses} + note(f"{st.send} -> {_hex(d)}") + + return {"ok": True, "message": action.success_msg, "responses": responses} diff --git a/obdcore/link.py b/obdcore/link.py index 29ec1bb..47c7518 100644 --- a/obdcore/link.py +++ b/obdcore/link.py @@ -175,6 +175,11 @@ class ElmLink: except ValueError: return None + def read_raw(self, hexcmd, timeout=2.0): + """Send an arbitrary hex command and return the flattened response + bytes (for bi-directional actions / service routines).""" + return self._bytes(self.cmd(hexcmd, timeout=timeout)) + # -- DTCs -- def read_dtcs(self, mode, svc, timeout=5.0): lines = self.cmd(mode, timeout=timeout) diff --git a/obdcore/mock.py b/obdcore/mock.py index 60bb094..c0a8339 100644 --- a/obdcore/mock.py +++ b/obdcore/mock.py @@ -79,5 +79,14 @@ class MockLink: "values": [("Engine RPM", 240, "rpm"), ("Coolant Temp", 33, "C"), ("Engine Load", 18, "%"), ("Vehicle Speed", 0, "km/h")]} + def read_raw(self, hexcmd, timeout=2.0): + # Return a UDS positive response (sid+0x40) so actions succeed in mock. + h = hexcmd.replace(" ", "") + sid = int(h[:2], 16) + rest = [int(h[i:i + 2], 16) for i in range(2, len(h) - 1, 2)] + if sid == 0x27 and len(rest) == 1: # security seed request -> return a seed + return [0x67, rest[0], 0x11, 0x22, 0x33, 0x44] + return [(sid + 0x40) & 0xFF] + rest + def close(self): pass diff --git a/obdcore/profile.py b/obdcore/profile.py index b94fbd6..2e5aafc 100644 --- a/obdcore/profile.py +++ b/obdcore/profile.py @@ -24,6 +24,7 @@ from dataclasses import dataclass, field from .formula import compile_formula from .registry import Pid, Dtc +from .actions import Action, ActionStep, validate_action SCHEMA = 1 BYTE_VARS = [chr(65 + i) for i in range(8)] # A..H @@ -36,6 +37,7 @@ class Profile: dtcs: list presets: dict path: str = None + actions: list = None @property def name(self): @@ -100,8 +102,20 @@ def load_profile(path): dtcs = [Dtc(code=x["code"], desc=x.get("desc", ""), system=x.get("system", "powertrain"), no_start=x.get("no_start", False), causes=x.get("causes", "")) for x in raw.get("dtcs", [])] + actions = [] + for a in raw.get("actions", []): + act = Action( + key=a["key"], name=a.get("name", a["key"]), kind=a.get("kind", "test"), + risk=a.get("risk", "safe"), description=a.get("description", ""), + warning=a.get("warning", ""), session=a.get("session"), + security=a.get("security"), + steps=[ActionStep(send=s["send"], expect=s.get("expect", "")) + for s in a.get("steps", [])], + success_msg=a.get("success_msg", "Done.")) + validate_action(act) # rejects malformed hex + actions.append(act) return Profile(meta=raw.get("meta", {}), pids=pids, dtcs=dtcs, - presets=raw.get("presets", {}), path=path) + presets=raw.get("presets", {}), path=path, actions=actions) def _pid_to_dict(p): @@ -136,6 +150,18 @@ def save_profile(profile, path=None): "dtcs": [{"code": d.code, "desc": d.desc, "system": d.system, "no_start": d.no_start, "causes": d.causes} for d in profile.dtcs], } + if profile.actions: + out["actions"] = [] + for a in profile.actions: + ad = {"key": a.key, "name": a.name, "kind": a.kind, "risk": a.risk} + if a.description: ad["description"] = a.description + if a.warning: ad["warning"] = a.warning + if a.session: ad["session"] = a.session + if a.security: ad["security"] = a.security + ad["steps"] = [{"send": s.send, **({"expect": s.expect} if s.expect else {})} + for s in a.steps] + if a.success_msg != "Done.": ad["success_msg"] = a.success_msg + out["actions"].append(ad) with open(path, "w") as f: json.dump(out, f, indent=2) return path diff --git a/profiles/PROFILE_SPEC.md b/profiles/PROFILE_SPEC.md index 285de0f..c374703 100644 --- a/profiles/PROFILE_SPEC.md +++ b/profiles/PROFILE_SPEC.md @@ -137,6 +137,47 @@ vehicle reports trims/O2). flags drive-disabling faults (shown bold red). Include generic `P0xxx` plus manufacturer-specific `P1xxx` you can source. +## 7b. `actions` — bi-directional / service functions (optional) + +Manufacturer service functions (actuator tests, service resets, module writes) +are UDS (ISO 14229) sequences, so they live in the profile as **data**. OBDash +runs ONLY the hex bytes you define — it never synthesizes commands. + +```jsonc +"actions": [ + { + "key": "ECU_RESET", + "name": "Reset ECU (soft reboot)", + "kind": "reset", // test | actuator | reset | write + "risk": "caution", // safe | caution | danger (caution/danger prompt to confirm) + "description": "shown in the list", + "warning": "shown in the confirmation for caution/danger actions", + "session": "03", // OPTIONAL Mode 10 subfunction hex (enter extended session) + "security": {"level":"01","algorithm":"xor-ff"}, // OPTIONAL seed->key unlock + "steps": [ {"send":"1101", "expect":"51"} ], // send hex; expect = hex the reply must contain + "success_msg": "ECU reset acknowledged." + } +] +``` + +Execution order: `session` (Mode 10) → `security` (Mode 27 seed→key) → each +`step` in order. A step succeeds if the reply contains `expect`, or (when +`expect` is omitted) the UDS positive-response byte (`send` SID + 0x40). Any +negative response (`7F …`) aborts. + +**Security access:** real per-vehicle seed→key algorithms are proprietary and are +**not** bundled. Only trivial/standard transforms are known (`xor-ff`, `invert`, +`add-ff`); an action naming any other `algorithm` is **blocked** (fails safe) — +don't put a real secret algorithm name and expect it to work. Most simple +functions need no security block. + +**Safety rules for authors:** +- Only include commands with **verified** bytes (service manual / bench-confirmed). + A wrong `2F`/`31`/`2E` command can mis-actuate or misconfigure a module. +- Mark anything that writes/actuates `caution` or `danger` and write a clear + `warning` (e.g. "engine off", "wheels chocked"). +- `kind:"write"` (module config / As-Built) is the highest-risk — reserve `danger`. + ## 8. Rules for authors / agents - **Standard Mode-01 PIDs are the reliable backbone** — include the ones this diff --git a/profiles/generic-obd2.json b/profiles/generic-obd2.json index c1a1259..de1fdea 100644 --- a/profiles/generic-obd2.json +++ b/profiles/generic-obd2.json @@ -27,5 +27,14 @@ {"key": "VPCM", "name": "Module Voltage", "mode": "01", "pid": "42", "nbytes": 2, "formula": "(A*256+B)/1000", "round": 2, "unit": "V", "group": "power", "vmin": 0, "vmax": 16, "confidence": "verified"}, {"key": "BATT", "name": "Battery (OBD port)", "mode": "atrv", "unit": "V", "group": "power", "vmin": 0, "vmax": 16, "confidence": "verified", "notes": "ELM327 ATRV pin voltage"} ], - "dtcs": [] + "dtcs": [], + "actions": [ + {"key": "TESTER_PRESENT", "name": "Tester Present (ping)", "kind": "test", "risk": "safe", + "description": "Sends a UDS keep-alive (3E 00). Confirms the ECU is responding on a CAN vehicle. No effect.", + "steps": [{"send": "3E00"}], "success_msg": "ECU responded — module is alive."}, + {"key": "ECU_RESET", "name": "Reset ECU (soft reboot)", "kind": "reset", "risk": "caution", + "description": "ISO 14229 ECUReset — reboots the engine control module (clears volatile adaptations).", + "warning": "Reboots the ECM. Do this with the ENGINE OFF, key in RUN. The engine would stall if running, and comms drop briefly. UDS/CAN vehicles only.", + "steps": [{"send": "1101"}], "success_msg": "ECU reset acknowledged."} + ] } diff --git a/tests/test_actions.py b/tests/test_actions.py new file mode 100644 index 0000000..7f51686 --- /dev/null +++ b/tests/test_actions.py @@ -0,0 +1,72 @@ +"""Bi-directional action framework tests (against MockLink, no hardware).""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from obdcore import load_default, load_profile, profiles_dir +from obdcore.actions import Action, ActionStep, run_action, validate_action +from obdcore.mock import MockLink +import time + + +def _mock(): + return MockLink(clock=time.time) + + +def test_simple_action(): + a = Action("PING", "Tester Present", steps=[ActionStep("3E00")]) + r = run_action(a, _mock()) + assert r["ok"], r + print(" simple action (3E00): OK") + + +def test_session_and_reset(): + a = Action("RESET", "ECU Reset", session="03", steps=[ActionStep("1101")]) + r = run_action(a, _mock()) + assert r["ok"], r + print(" session + reset: OK") + + +def test_security_known_algo(): + a = Action("LOCKED", "Secured routine", security={"level": "01", "algorithm": "xor-ff"}, + steps=[ActionStep("31010203")]) # start routine + r = run_action(a, _mock()) + assert r["ok"], r + print(" security handshake (xor-ff seed->key): OK") + + +def test_security_unknown_algo_blocked(): + a = Action("SECRET", "Vendor routine", security={"level": "01", "algorithm": "ford-2005-secret"}, + steps=[ActionStep("31010203")]) + r = run_action(a, _mock()) + assert not r["ok"] and "not available" in r["message"], r + print(" unknown security algorithm blocked (fails safe): OK") + + +def test_validate_rejects_bad_hex(): + bad = Action("BAD", "bad", steps=[ActionStep("ZZ")]) + try: + validate_action(bad) + raise AssertionError("should reject non-hex send") + except ValueError: + pass + print(" malformed hex rejected at load: OK") + + +def test_profile_actions_load(): + prof = load_profile(os.path.join(profiles_dir(), "generic-obd2.json")) + keys = {a.key for a in (prof.actions or [])} + assert "TESTER_PRESENT" in keys and "ECU_RESET" in keys, keys + # the reset is risk-gated + reset = next(a for a in prof.actions if a.key == "ECU_RESET") + assert reset.risk == "caution" and reset.warning + print(f" generic profile loads {len(prof.actions)} actions (risk-tagged): OK") + + +if __name__ == "__main__": + for fn in [test_simple_action, test_session_and_reset, test_security_known_algo, + test_security_unknown_algo_blocked, test_validate_rejects_bad_hex, + test_profile_actions_load]: + fn() + print("\nALL ACTION TESTS PASS")