From b5e0c96763e4bd6462657a36e22418c217fdcce3 Mon Sep 17 00:00:00 2001 From: Justin Paul Date: Wed, 1 Jul 2026 19:29:44 -0400 Subject: [PATCH] Fix #7: derive action risk from UDS SIDs; fix response parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Untrusted profiles could bypass the confirmation and responses were mis-parsed: - effective_risk(action): risk is now DERIVED from the actual service IDs the steps send — any write/actuator/reset/transfer SID (2F/31/11/14/2E/27/34-37/…) forces 'danger'; unknown SID / non-default session / security block force 'caution'. A profile can only RAISE risk, never label a reflash 'safe'. GUI gates the confirmation (and the risk badge) on this derived value. - Response checks use CONTIGUOUS subsequence matching + a hard '7F ' negative-response guard, so an NRC data byte (e.g. 0x7E) can't false-pass as a positive response; applied to session/security/step checks. - 0x78 (responsePending) is treated as in-progress, not terminal failure. - controller.run_action restores slow ELM timing for the run (0x78 window). - Tests: risk-cannot-be-downgraded, NRC false-positive rejected, pending handled. Closes #7 Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs --- gui/controller.py | 18 ++++++++- gui/main.py | 15 +++++-- obdcore/actions.py | 93 ++++++++++++++++++++++++++++++++++--------- tests/test_actions.py | 48 +++++++++++++++++++++- 4 files changed, 149 insertions(+), 25 deletions(-) diff --git a/gui/controller.py b/gui/controller.py index 23a6e53..ff4e487 100644 --- a/gui/controller.py +++ b/gui/controller.py @@ -159,7 +159,23 @@ class Controller: def run_action(self, action): from obdcore.actions import run_action - return self._oneoff(lambda: run_action(action, self.link), timeout=20.0) + + def go(): + # actions/routines can take longer than the fast polling window and + # may reply 0x78 (pending) — restore slow ELM timing for the run + try: + self.link.fast_timing(False) + except Exception: + pass + try: + return run_action(action, self.link) + finally: + try: + self.link.fast_timing(True) + except Exception: + pass + + return self._oneoff(go, timeout=25.0) # -- trip / performance (fed from the live store each GUI tick) -- def update_trip(self): diff --git a/gui/main.py b/gui/main.py index 752ab18..ef1c673 100644 --- a/gui/main.py +++ b/gui/main.py @@ -506,13 +506,15 @@ class MainWindow(QtWidgets.QMainWindow): "Caution: these send commands to the vehicle. Read each warning.")) scroll = QtWidgets.QScrollArea(); scroll.setWidgetResizable(True) inner = QtWidgets.QWidget(); il = QtWidgets.QVBoxLayout(inner) + from obdcore.actions import effective_risk for a in acts: row = QtWidgets.QFrame() row.setStyleSheet("QFrame{border:1px solid #333;border-radius:6px;}") rl = QtWidgets.QHBoxLayout(row) + er = effective_risk(a) txt = QtWidgets.QLabel( f"{a.name} " - f"[{a.risk}]" + f"[{er}]" f"
{a.description}") txt.setWordWrap(True) rl.addWidget(txt, 1) @@ -527,11 +529,16 @@ class MainWindow(QtWidgets.QMainWindow): dlg.exec() def _run_action(self, action): - if action.risk in ("caution", "danger"): - msg = (action.warning or "This sends a command to the vehicle.") + \ + from obdcore.actions import effective_risk + risk = effective_risk(action) # derived from the actual UDS SIDs + if risk != "safe": + note = ("" if risk == action.risk else + f"\n\n(The profile labels this \"{action.risk}\", but its commands are " + f"{risk}-level — confirming anyway.)") + msg = (action.warning or "This sends a command to the vehicle.") + note + \ "\n\nProceed?" btn = QtWidgets.QMessageBox.warning( - self, f"Run: {action.name}", msg, + self, f"Run [{risk}]: {action.name}", msg, QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No) if btn != QtWidgets.QMessageBox.Yes: diff --git a/obdcore/actions.py b/obdcore/actions.py index d7331b6..fd1332a 100644 --- a/obdcore/actions.py +++ b/obdcore/actions.py @@ -11,8 +11,9 @@ Action is a small sequence the runner executes through the ELM link: ECU reset, Mode 3E tester-present, ...) with response checks SAFETY: -- Every action carries a `risk` (safe | caution | danger); the GUI gates - caution/danger behind an explicit confirmation. +- Effective risk is DERIVED from the actual UDS service IDs the action sends + (effective_risk): any write/actuator/reset/transfer SID forces 'danger', so a + profile can only RAISE the confirmation, never mislabel a reflash as "safe". - The runner sends ONLY the hex bytes defined in the profile -- nothing is synthesized. - Security-access KEY algorithms are per-vehicle secrets and are deliberately @@ -55,13 +56,68 @@ def _hex(byte_list): return "".join(f"{b:02X}" for b in byte_list) -def _positive(data, req_sid): - """UDS positive response = request service id + 0x40 present in the data.""" - return (req_sid + 0x40) in data +def _find(data, sub): + """True if `sub` (list/bytes) appears as a CONTIGUOUS run in `data`.""" + if not sub: + return False + return bytes(data).find(bytes(sub)) != -1 -def _is_negative(data): - return 0x7F in data +def _neg(data, sid): + """UDS negative response for this service = contiguous 7F .""" + return _find(data, [0x7F, sid]) + + +def _pending(data, sid): + """Negative response code 0x78 = requestCorrectlyReceived-ResponsePending.""" + return _find(data, [0x7F, sid, 0x78]) + + +def _ok(data, sid, expect): + """A step succeeds only if there's NO negative response AND either the + expected bytes or the UDS positive-response id (sid+0x40) appears.""" + if _neg(data, sid): + return False + if expect: + return _find(data, bytes.fromhex(expect)) + return _find(data, [(sid + 0x40) & 0xFF]) + + +# UDS/OBD service-id classification -- used to derive an action's real risk from +# what it actually SENDS, so a profile can't mislabel a write as "safe". +READ_ONLY_SIDS = {0x01, 0x02, 0x03, 0x07, 0x09, 0x0A, 0x19, 0x22, 0x2A, 0x3E} +WRITE_SIDS = {0x04, 0x11, 0x14, 0x28, 0x2E, 0x2F, 0x31, 0x34, 0x35, 0x36, 0x37, + 0x38, 0x3B, 0x3D, 0x85} +_RISK_ORDER = {"safe": 0, "caution": 1, "danger": 2} + + +def effective_risk(action): + """Risk = max(profile-declared, risk derived from the actual service IDs). + Write/actuator/reset/transfer SIDs force 'danger'; unknown SIDs, a non-default + session, or a security block force at least 'caution'. The profile can only + RAISE the risk, never lower it below what the commands warrant.""" + derived = "safe" + + def bump(r): + nonlocal derived + if _RISK_ORDER[r] > _RISK_ORDER[derived]: + derived = r + + if action.session and action.session.lower() not in ("01", ""): + bump("caution") + if action.security: + bump("caution") + for st in action.steps: + try: + sid = int(st.send[:2], 16) + except ValueError: + bump("danger"); continue + if sid in WRITE_SIDS: + bump("danger") + elif sid not in READ_ONLY_SIDS: + bump("caution") + declared = action.risk if action.risk in _RISK_ORDER else "danger" + return max((derived, declared), key=lambda r: _RISK_ORDER[r]) def validate_action(a): @@ -89,8 +145,9 @@ def run_action(action, link, log=None): # 1. diagnostic session if action.session: d = send("10" + action.session) - if not _positive(d, 0x10): - return {"ok": False, "message": "could not enter diagnostic session", "responses": [d]} + if not _ok(d, 0x10, "50"): + return {"ok": False, "message": "could not enter diagnostic session", + "responses": [d]} # 2. security access (seed -> key) if action.security: @@ -101,14 +158,14 @@ def run_action(action, link, log=None): "(per-vehicle secret) — action blocked for safety", "responses": []} level = action.security.get("level", "01") seed_resp = send("27" + level) - if not _positive(seed_resp, 0x27): + if not _ok(seed_resp, 0x27, ""): return {"ok": False, "message": "security seed request failed", "responses": [seed_resp]} - i = seed_resp.index(0x67) + i = bytes(seed_resp).find(bytes([0x67])) seed = seed_resp[i + 2:] key = fn(bytes(seed)) lvl2 = f"{int(level, 16) + 1:02X}" kr = send("27" + lvl2 + _hex(key)) - if not _positive(kr, 0x27): + if not _ok(kr, 0x27, ""): return {"ok": False, "message": "security unlock rejected", "responses": [kr]} note("security unlocked") @@ -118,12 +175,12 @@ def run_action(action, link, log=None): d = send(st.send) responses.append(d) req_sid = int(st.send[:2], 16) - if st.expect: - ok = bytes.fromhex(st.expect)[0] in d - else: - ok = _positive(d, req_sid) - if not ok: - reason = "ECU negative response" if _is_negative(d) else "no valid response" + if _pending(d, req_sid): + return {"ok": True, "responses": responses, + "message": "ECU accepted the request; the operation is still in " + "progress (response pending). Check the vehicle — do NOT retry."} + if not _ok(d, req_sid, st.expect): + reason = "ECU negative response" if _neg(d, req_sid) else "no valid response" return {"ok": False, "message": f"{reason} to {st.send} (got {_hex(d) or 'nothing'})", "responses": responses} note(f"{st.send} -> {_hex(d)}") diff --git a/tests/test_actions.py b/tests/test_actions.py index 7f51686..3c1942b 100644 --- a/tests/test_actions.py +++ b/tests/test_actions.py @@ -5,11 +5,21 @@ import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from obdcore import load_default, load_profile, profiles_dir -from obdcore.actions import Action, ActionStep, run_action, validate_action +from obdcore.actions import (Action, ActionStep, run_action, validate_action, + effective_risk) from obdcore.mock import MockLink import time +class _FakeLink: + """Returns canned raw responses keyed by the sent hex (for parse tests).""" + def __init__(self, table): + self.table = table + + def read_raw(self, hexcmd, timeout=2.0): + return self.table.get(hexcmd.replace(" ", ""), []) + + def _mock(): return MockLink(clock=time.time) @@ -64,9 +74,43 @@ def test_profile_actions_load(): print(f" generic profile loads {len(prof.actions)} actions (risk-tagged): OK") +def test_effective_risk_cannot_be_downgraded(): + # a profile mislabels an ECU-reset (0x11 write SID) as "safe" + a = Action("SNEAKY", "totally safe", risk="safe", steps=[ActionStep("1101")]) + assert effective_risk(a) == "danger", "write SID must force danger regardless of label" + # a pure read is safe + assert effective_risk(Action("R", "read", steps=[ActionStep("22F190")])) == "safe" + # tester-present is safe; a security block bumps to at least caution + a2 = Action("S", "s", risk="safe", security={"level": "01", "algorithm": "xor-ff"}, + steps=[ActionStep("3E00")]) + assert effective_risk(a2) == "caution" + print(" effective_risk derives from SIDs (write can't be labeled safe): OK") + + +def test_response_parsing_rejects_false_positive(): + # NRC byte 0x7E in a NEGATIVE response must NOT be read as positive for 0x3E. + # 7F 3E 11 = negativeResponse to service 3E; the old membership test saw 0x7E + # (=3E+40) elsewhere and false-passed. + a = Action("PING", "ping", steps=[ActionStep("3E00")]) + r = run_action(a, _FakeLink({"3E00": [0x7F, 0x3E, 0x11]})) + assert not r["ok"] and "negative" in r["message"], r + # a genuine positive still passes + r2 = run_action(a, _FakeLink({"3E00": [0x7E, 0x00]})) + assert r2["ok"], r2 + print(" contiguous response check rejects NRC false-positive: OK") + + +def test_response_pending_not_terminal(): + a = Action("ROUT", "routine", risk="danger", steps=[ActionStep("31010203")]) + r = run_action(a, _FakeLink({"31010203": [0x7F, 0x31, 0x78]})) # responsePending + assert r["ok"] and "pending" in r["message"].lower(), r + print(" 0x78 responsePending treated as in-progress, not failure: OK") + + if __name__ == "__main__": for fn in [test_simple_action, test_session_and_reset, test_security_known_algo, test_security_unknown_algo_blocked, test_validate_rejects_bad_hex, - test_profile_actions_load]: + test_profile_actions_load, test_effective_risk_cannot_be_downgraded, + test_response_parsing_rejects_false_positive, test_response_pending_not_terminal]: fn() print("\nALL ACTION TESTS PASS")