diff --git a/gui/controller.py b/gui/controller.py
index 23a6e53..ff4e487 100644
--- a/gui/controller.py
+++ b/gui/controller.py
@@ -159,7 +159,23 @@ class Controller:
def run_action(self, action):
from obdcore.actions import run_action
- return self._oneoff(lambda: run_action(action, self.link), timeout=20.0)
+
+ def go():
+ # actions/routines can take longer than the fast polling window and
+ # may reply 0x78 (pending) — restore slow ELM timing for the run
+ try:
+ self.link.fast_timing(False)
+ except Exception:
+ pass
+ try:
+ return run_action(action, self.link)
+ finally:
+ try:
+ self.link.fast_timing(True)
+ except Exception:
+ pass
+
+ return self._oneoff(go, timeout=25.0)
# -- trip / performance (fed from the live store each GUI tick) --
def update_trip(self):
diff --git a/gui/main.py b/gui/main.py
index 752ab18..ef1c673 100644
--- a/gui/main.py
+++ b/gui/main.py
@@ -506,13 +506,15 @@ class MainWindow(QtWidgets.QMainWindow):
"Caution: these send commands to the vehicle. Read each warning."))
scroll = QtWidgets.QScrollArea(); scroll.setWidgetResizable(True)
inner = QtWidgets.QWidget(); il = QtWidgets.QVBoxLayout(inner)
+ from obdcore.actions import effective_risk
for a in acts:
row = QtWidgets.QFrame()
row.setStyleSheet("QFrame{border:1px solid #333;border-radius:6px;}")
rl = QtWidgets.QHBoxLayout(row)
+ er = effective_risk(a)
txt = QtWidgets.QLabel(
f"{a.name} "
- f"[{a.risk}]"
+ f"[{er}]"
f"
{a.description}")
txt.setWordWrap(True)
rl.addWidget(txt, 1)
@@ -527,11 +529,16 @@ class MainWindow(QtWidgets.QMainWindow):
dlg.exec()
def _run_action(self, action):
- if action.risk in ("caution", "danger"):
- msg = (action.warning or "This sends a command to the vehicle.") + \
+ from obdcore.actions import effective_risk
+ risk = effective_risk(action) # derived from the actual UDS SIDs
+ if risk != "safe":
+ note = ("" if risk == action.risk else
+ f"\n\n(The profile labels this \"{action.risk}\", but its commands are "
+ f"{risk}-level — confirming anyway.)")
+ msg = (action.warning or "This sends a command to the vehicle.") + note + \
"\n\nProceed?"
btn = QtWidgets.QMessageBox.warning(
- self, f"Run: {action.name}", msg,
+ self, f"Run [{risk}]: {action.name}", msg,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.No)
if btn != QtWidgets.QMessageBox.Yes:
diff --git a/obdcore/actions.py b/obdcore/actions.py
index d7331b6..fd1332a 100644
--- a/obdcore/actions.py
+++ b/obdcore/actions.py
@@ -11,8 +11,9 @@ Action is a small sequence the runner executes through the ELM link:
ECU reset, Mode 3E tester-present, ...) with response checks
SAFETY:
-- Every action carries a `risk` (safe | caution | danger); the GUI gates
- caution/danger behind an explicit confirmation.
+- Effective risk is DERIVED from the actual UDS service IDs the action sends
+ (effective_risk): any write/actuator/reset/transfer SID forces 'danger', so a
+ profile can only RAISE the confirmation, never mislabel a reflash as "safe".
- The runner sends ONLY the hex bytes defined in the profile -- nothing is
synthesized.
- Security-access KEY algorithms are per-vehicle secrets and are deliberately
@@ -55,13 +56,68 @@ def _hex(byte_list):
return "".join(f"{b:02X}" for b in byte_list)
-def _positive(data, req_sid):
- """UDS positive response = request service id + 0x40 present in the data."""
- return (req_sid + 0x40) in data
+def _find(data, sub):
+ """True if `sub` (list/bytes) appears as a CONTIGUOUS run in `data`."""
+ if not sub:
+ return False
+ return bytes(data).find(bytes(sub)) != -1
-def _is_negative(data):
- return 0x7F in data
+def _neg(data, sid):
+ """UDS negative response for this service = contiguous 7F ."""
+ return _find(data, [0x7F, sid])
+
+
+def _pending(data, sid):
+ """Negative response code 0x78 = requestCorrectlyReceived-ResponsePending."""
+ return _find(data, [0x7F, sid, 0x78])
+
+
+def _ok(data, sid, expect):
+ """A step succeeds only if there's NO negative response AND either the
+ expected bytes or the UDS positive-response id (sid+0x40) appears."""
+ if _neg(data, sid):
+ return False
+ if expect:
+ return _find(data, bytes.fromhex(expect))
+ return _find(data, [(sid + 0x40) & 0xFF])
+
+
+# UDS/OBD service-id classification -- used to derive an action's real risk from
+# what it actually SENDS, so a profile can't mislabel a write as "safe".
+READ_ONLY_SIDS = {0x01, 0x02, 0x03, 0x07, 0x09, 0x0A, 0x19, 0x22, 0x2A, 0x3E}
+WRITE_SIDS = {0x04, 0x11, 0x14, 0x28, 0x2E, 0x2F, 0x31, 0x34, 0x35, 0x36, 0x37,
+ 0x38, 0x3B, 0x3D, 0x85}
+_RISK_ORDER = {"safe": 0, "caution": 1, "danger": 2}
+
+
+def effective_risk(action):
+ """Risk = max(profile-declared, risk derived from the actual service IDs).
+ Write/actuator/reset/transfer SIDs force 'danger'; unknown SIDs, a non-default
+ session, or a security block force at least 'caution'. The profile can only
+ RAISE the risk, never lower it below what the commands warrant."""
+ derived = "safe"
+
+ def bump(r):
+ nonlocal derived
+ if _RISK_ORDER[r] > _RISK_ORDER[derived]:
+ derived = r
+
+ if action.session and action.session.lower() not in ("01", ""):
+ bump("caution")
+ if action.security:
+ bump("caution")
+ for st in action.steps:
+ try:
+ sid = int(st.send[:2], 16)
+ except ValueError:
+ bump("danger"); continue
+ if sid in WRITE_SIDS:
+ bump("danger")
+ elif sid not in READ_ONLY_SIDS:
+ bump("caution")
+ declared = action.risk if action.risk in _RISK_ORDER else "danger"
+ return max((derived, declared), key=lambda r: _RISK_ORDER[r])
def validate_action(a):
@@ -89,8 +145,9 @@ def run_action(action, link, log=None):
# 1. diagnostic session
if action.session:
d = send("10" + action.session)
- if not _positive(d, 0x10):
- return {"ok": False, "message": "could not enter diagnostic session", "responses": [d]}
+ if not _ok(d, 0x10, "50"):
+ return {"ok": False, "message": "could not enter diagnostic session",
+ "responses": [d]}
# 2. security access (seed -> key)
if action.security:
@@ -101,14 +158,14 @@ def run_action(action, link, log=None):
"(per-vehicle secret) — action blocked for safety", "responses": []}
level = action.security.get("level", "01")
seed_resp = send("27" + level)
- if not _positive(seed_resp, 0x27):
+ if not _ok(seed_resp, 0x27, ""):
return {"ok": False, "message": "security seed request failed", "responses": [seed_resp]}
- i = seed_resp.index(0x67)
+ i = bytes(seed_resp).find(bytes([0x67]))
seed = seed_resp[i + 2:]
key = fn(bytes(seed))
lvl2 = f"{int(level, 16) + 1:02X}"
kr = send("27" + lvl2 + _hex(key))
- if not _positive(kr, 0x27):
+ if not _ok(kr, 0x27, ""):
return {"ok": False, "message": "security unlock rejected", "responses": [kr]}
note("security unlocked")
@@ -118,12 +175,12 @@ def run_action(action, link, log=None):
d = send(st.send)
responses.append(d)
req_sid = int(st.send[:2], 16)
- if st.expect:
- ok = bytes.fromhex(st.expect)[0] in d
- else:
- ok = _positive(d, req_sid)
- if not ok:
- reason = "ECU negative response" if _is_negative(d) else "no valid response"
+ if _pending(d, req_sid):
+ return {"ok": True, "responses": responses,
+ "message": "ECU accepted the request; the operation is still in "
+ "progress (response pending). Check the vehicle — do NOT retry."}
+ if not _ok(d, req_sid, st.expect):
+ reason = "ECU negative response" if _neg(d, req_sid) else "no valid response"
return {"ok": False, "message": f"{reason} to {st.send} (got {_hex(d) or 'nothing'})",
"responses": responses}
note(f"{st.send} -> {_hex(d)}")
diff --git a/tests/test_actions.py b/tests/test_actions.py
index 7f51686..3c1942b 100644
--- a/tests/test_actions.py
+++ b/tests/test_actions.py
@@ -5,11 +5,21 @@ import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from obdcore import load_default, load_profile, profiles_dir
-from obdcore.actions import Action, ActionStep, run_action, validate_action
+from obdcore.actions import (Action, ActionStep, run_action, validate_action,
+ effective_risk)
from obdcore.mock import MockLink
import time
+class _FakeLink:
+ """Returns canned raw responses keyed by the sent hex (for parse tests)."""
+ def __init__(self, table):
+ self.table = table
+
+ def read_raw(self, hexcmd, timeout=2.0):
+ return self.table.get(hexcmd.replace(" ", ""), [])
+
+
def _mock():
return MockLink(clock=time.time)
@@ -64,9 +74,43 @@ def test_profile_actions_load():
print(f" generic profile loads {len(prof.actions)} actions (risk-tagged): OK")
+def test_effective_risk_cannot_be_downgraded():
+ # a profile mislabels an ECU-reset (0x11 write SID) as "safe"
+ a = Action("SNEAKY", "totally safe", risk="safe", steps=[ActionStep("1101")])
+ assert effective_risk(a) == "danger", "write SID must force danger regardless of label"
+ # a pure read is safe
+ assert effective_risk(Action("R", "read", steps=[ActionStep("22F190")])) == "safe"
+ # tester-present is safe; a security block bumps to at least caution
+ a2 = Action("S", "s", risk="safe", security={"level": "01", "algorithm": "xor-ff"},
+ steps=[ActionStep("3E00")])
+ assert effective_risk(a2) == "caution"
+ print(" effective_risk derives from SIDs (write can't be labeled safe): OK")
+
+
+def test_response_parsing_rejects_false_positive():
+ # NRC byte 0x7E in a NEGATIVE response must NOT be read as positive for 0x3E.
+ # 7F 3E 11 = negativeResponse to service 3E; the old membership test saw 0x7E
+ # (=3E+40) elsewhere and false-passed.
+ a = Action("PING", "ping", steps=[ActionStep("3E00")])
+ r = run_action(a, _FakeLink({"3E00": [0x7F, 0x3E, 0x11]}))
+ assert not r["ok"] and "negative" in r["message"], r
+ # a genuine positive still passes
+ r2 = run_action(a, _FakeLink({"3E00": [0x7E, 0x00]}))
+ assert r2["ok"], r2
+ print(" contiguous response check rejects NRC false-positive: OK")
+
+
+def test_response_pending_not_terminal():
+ a = Action("ROUT", "routine", risk="danger", steps=[ActionStep("31010203")])
+ r = run_action(a, _FakeLink({"31010203": [0x7F, 0x31, 0x78]})) # responsePending
+ assert r["ok"] and "pending" in r["message"].lower(), r
+ print(" 0x78 responsePending treated as in-progress, not failure: OK")
+
+
if __name__ == "__main__":
for fn in [test_simple_action, test_session_and_reset, test_security_known_algo,
test_security_unknown_algo_blocked, test_validate_rejects_bad_hex,
- test_profile_actions_load]:
+ test_profile_actions_load, test_effective_risk_cannot_be_downgraded,
+ test_response_parsing_rejects_false_positive, test_response_pending_not_terminal]:
fn()
print("\nALL ACTION TESTS PASS")