Fix #7: derive action risk from UDS SIDs; fix response parsing
Untrusted profiles could bypass the confirmation and responses were mis-parsed: - effective_risk(action): risk is now DERIVED from the actual service IDs the steps send — any write/actuator/reset/transfer SID (2F/31/11/14/2E/27/34-37/…) forces 'danger'; unknown SID / non-default session / security block force 'caution'. A profile can only RAISE risk, never label a reflash 'safe'. GUI gates the confirmation (and the risk badge) on this derived value. - Response checks use CONTIGUOUS subsequence matching + a hard '7F <sid>' negative-response guard, so an NRC data byte (e.g. 0x7E) can't false-pass as a positive response; applied to session/security/step checks. - 0x78 (responsePending) is treated as in-progress, not terminal failure. - controller.run_action restores slow ELM timing for the run (0x78 window). - Tests: risk-cannot-be-downgraded, NRC false-positive rejected, pending handled. Closes #7 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
+17
-1
@@ -159,7 +159,23 @@ class Controller:
|
|||||||
|
|
||||||
def run_action(self, action):
|
def run_action(self, action):
|
||||||
from obdcore.actions import run_action
|
from obdcore.actions import run_action
|
||||||
return self._oneoff(lambda: run_action(action, self.link), timeout=20.0)
|
|
||||||
|
def go():
|
||||||
|
# actions/routines can take longer than the fast polling window and
|
||||||
|
# may reply 0x78 (pending) — restore slow ELM timing for the run
|
||||||
|
try:
|
||||||
|
self.link.fast_timing(False)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
return run_action(action, self.link)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
self.link.fast_timing(True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return self._oneoff(go, timeout=25.0)
|
||||||
|
|
||||||
# -- trip / performance (fed from the live store each GUI tick) --
|
# -- trip / performance (fed from the live store each GUI tick) --
|
||||||
def update_trip(self):
|
def update_trip(self):
|
||||||
|
|||||||
+11
-4
@@ -506,13 +506,15 @@ class MainWindow(QtWidgets.QMainWindow):
|
|||||||
"<b>Caution:</b> these send commands to the vehicle. Read each warning."))
|
"<b>Caution:</b> these send commands to the vehicle. Read each warning."))
|
||||||
scroll = QtWidgets.QScrollArea(); scroll.setWidgetResizable(True)
|
scroll = QtWidgets.QScrollArea(); scroll.setWidgetResizable(True)
|
||||||
inner = QtWidgets.QWidget(); il = QtWidgets.QVBoxLayout(inner)
|
inner = QtWidgets.QWidget(); il = QtWidgets.QVBoxLayout(inner)
|
||||||
|
from obdcore.actions import effective_risk
|
||||||
for a in acts:
|
for a in acts:
|
||||||
row = QtWidgets.QFrame()
|
row = QtWidgets.QFrame()
|
||||||
row.setStyleSheet("QFrame{border:1px solid #333;border-radius:6px;}")
|
row.setStyleSheet("QFrame{border:1px solid #333;border-radius:6px;}")
|
||||||
rl = QtWidgets.QHBoxLayout(row)
|
rl = QtWidgets.QHBoxLayout(row)
|
||||||
|
er = effective_risk(a)
|
||||||
txt = QtWidgets.QLabel(
|
txt = QtWidgets.QLabel(
|
||||||
f"<b>{a.name}</b> "
|
f"<b>{a.name}</b> "
|
||||||
f"<span style='color:{self._RISK_COLOR.get(a.risk,'#999')}'>[{a.risk}]</span>"
|
f"<span style='color:{self._RISK_COLOR.get(er,'#999')}'>[{er}]</span>"
|
||||||
f"<br><span style='color:#999'>{a.description}</span>")
|
f"<br><span style='color:#999'>{a.description}</span>")
|
||||||
txt.setWordWrap(True)
|
txt.setWordWrap(True)
|
||||||
rl.addWidget(txt, 1)
|
rl.addWidget(txt, 1)
|
||||||
@@ -527,11 +529,16 @@ class MainWindow(QtWidgets.QMainWindow):
|
|||||||
dlg.exec()
|
dlg.exec()
|
||||||
|
|
||||||
def _run_action(self, action):
|
def _run_action(self, action):
|
||||||
if action.risk in ("caution", "danger"):
|
from obdcore.actions import effective_risk
|
||||||
msg = (action.warning or "This sends a command to the vehicle.") + \
|
risk = effective_risk(action) # derived from the actual UDS SIDs
|
||||||
|
if risk != "safe":
|
||||||
|
note = ("" if risk == action.risk else
|
||||||
|
f"\n\n(The profile labels this \"{action.risk}\", but its commands are "
|
||||||
|
f"{risk}-level — confirming anyway.)")
|
||||||
|
msg = (action.warning or "This sends a command to the vehicle.") + note + \
|
||||||
"\n\nProceed?"
|
"\n\nProceed?"
|
||||||
btn = QtWidgets.QMessageBox.warning(
|
btn = QtWidgets.QMessageBox.warning(
|
||||||
self, f"Run: {action.name}", msg,
|
self, f"Run [{risk}]: {action.name}", msg,
|
||||||
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
|
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
|
||||||
QtWidgets.QMessageBox.No)
|
QtWidgets.QMessageBox.No)
|
||||||
if btn != QtWidgets.QMessageBox.Yes:
|
if btn != QtWidgets.QMessageBox.Yes:
|
||||||
|
|||||||
+75
-18
@@ -11,8 +11,9 @@ Action is a small sequence the runner executes through the ELM link:
|
|||||||
ECU reset, Mode 3E tester-present, ...) with response checks
|
ECU reset, Mode 3E tester-present, ...) with response checks
|
||||||
|
|
||||||
SAFETY:
|
SAFETY:
|
||||||
- Every action carries a `risk` (safe | caution | danger); the GUI gates
|
- Effective risk is DERIVED from the actual UDS service IDs the action sends
|
||||||
caution/danger behind an explicit confirmation.
|
(effective_risk): any write/actuator/reset/transfer SID forces 'danger', so a
|
||||||
|
profile can only RAISE the confirmation, never mislabel a reflash as "safe".
|
||||||
- The runner sends ONLY the hex bytes defined in the profile -- nothing is
|
- The runner sends ONLY the hex bytes defined in the profile -- nothing is
|
||||||
synthesized.
|
synthesized.
|
||||||
- Security-access KEY algorithms are per-vehicle secrets and are deliberately
|
- Security-access KEY algorithms are per-vehicle secrets and are deliberately
|
||||||
@@ -55,13 +56,68 @@ def _hex(byte_list):
|
|||||||
return "".join(f"{b:02X}" for b in byte_list)
|
return "".join(f"{b:02X}" for b in byte_list)
|
||||||
|
|
||||||
|
|
||||||
def _positive(data, req_sid):
|
def _find(data, sub):
|
||||||
"""UDS positive response = request service id + 0x40 present in the data."""
|
"""True if `sub` (list/bytes) appears as a CONTIGUOUS run in `data`."""
|
||||||
return (req_sid + 0x40) in data
|
if not sub:
|
||||||
|
return False
|
||||||
|
return bytes(data).find(bytes(sub)) != -1
|
||||||
|
|
||||||
|
|
||||||
def _is_negative(data):
|
def _neg(data, sid):
|
||||||
return 0x7F in data
|
"""UDS negative response for this service = contiguous 7F <sid>."""
|
||||||
|
return _find(data, [0x7F, sid])
|
||||||
|
|
||||||
|
|
||||||
|
def _pending(data, sid):
|
||||||
|
"""Negative response code 0x78 = requestCorrectlyReceived-ResponsePending."""
|
||||||
|
return _find(data, [0x7F, sid, 0x78])
|
||||||
|
|
||||||
|
|
||||||
|
def _ok(data, sid, expect):
|
||||||
|
"""A step succeeds only if there's NO negative response AND either the
|
||||||
|
expected bytes or the UDS positive-response id (sid+0x40) appears."""
|
||||||
|
if _neg(data, sid):
|
||||||
|
return False
|
||||||
|
if expect:
|
||||||
|
return _find(data, bytes.fromhex(expect))
|
||||||
|
return _find(data, [(sid + 0x40) & 0xFF])
|
||||||
|
|
||||||
|
|
||||||
|
# UDS/OBD service-id classification -- used to derive an action's real risk from
|
||||||
|
# what it actually SENDS, so a profile can't mislabel a write as "safe".
|
||||||
|
READ_ONLY_SIDS = {0x01, 0x02, 0x03, 0x07, 0x09, 0x0A, 0x19, 0x22, 0x2A, 0x3E}
|
||||||
|
WRITE_SIDS = {0x04, 0x11, 0x14, 0x28, 0x2E, 0x2F, 0x31, 0x34, 0x35, 0x36, 0x37,
|
||||||
|
0x38, 0x3B, 0x3D, 0x85}
|
||||||
|
_RISK_ORDER = {"safe": 0, "caution": 1, "danger": 2}
|
||||||
|
|
||||||
|
|
||||||
|
def effective_risk(action):
|
||||||
|
"""Risk = max(profile-declared, risk derived from the actual service IDs).
|
||||||
|
Write/actuator/reset/transfer SIDs force 'danger'; unknown SIDs, a non-default
|
||||||
|
session, or a security block force at least 'caution'. The profile can only
|
||||||
|
RAISE the risk, never lower it below what the commands warrant."""
|
||||||
|
derived = "safe"
|
||||||
|
|
||||||
|
def bump(r):
|
||||||
|
nonlocal derived
|
||||||
|
if _RISK_ORDER[r] > _RISK_ORDER[derived]:
|
||||||
|
derived = r
|
||||||
|
|
||||||
|
if action.session and action.session.lower() not in ("01", ""):
|
||||||
|
bump("caution")
|
||||||
|
if action.security:
|
||||||
|
bump("caution")
|
||||||
|
for st in action.steps:
|
||||||
|
try:
|
||||||
|
sid = int(st.send[:2], 16)
|
||||||
|
except ValueError:
|
||||||
|
bump("danger"); continue
|
||||||
|
if sid in WRITE_SIDS:
|
||||||
|
bump("danger")
|
||||||
|
elif sid not in READ_ONLY_SIDS:
|
||||||
|
bump("caution")
|
||||||
|
declared = action.risk if action.risk in _RISK_ORDER else "danger"
|
||||||
|
return max((derived, declared), key=lambda r: _RISK_ORDER[r])
|
||||||
|
|
||||||
|
|
||||||
def validate_action(a):
|
def validate_action(a):
|
||||||
@@ -89,8 +145,9 @@ def run_action(action, link, log=None):
|
|||||||
# 1. diagnostic session
|
# 1. diagnostic session
|
||||||
if action.session:
|
if action.session:
|
||||||
d = send("10" + action.session)
|
d = send("10" + action.session)
|
||||||
if not _positive(d, 0x10):
|
if not _ok(d, 0x10, "50"):
|
||||||
return {"ok": False, "message": "could not enter diagnostic session", "responses": [d]}
|
return {"ok": False, "message": "could not enter diagnostic session",
|
||||||
|
"responses": [d]}
|
||||||
|
|
||||||
# 2. security access (seed -> key)
|
# 2. security access (seed -> key)
|
||||||
if action.security:
|
if action.security:
|
||||||
@@ -101,14 +158,14 @@ def run_action(action, link, log=None):
|
|||||||
"(per-vehicle secret) — action blocked for safety", "responses": []}
|
"(per-vehicle secret) — action blocked for safety", "responses": []}
|
||||||
level = action.security.get("level", "01")
|
level = action.security.get("level", "01")
|
||||||
seed_resp = send("27" + level)
|
seed_resp = send("27" + level)
|
||||||
if not _positive(seed_resp, 0x27):
|
if not _ok(seed_resp, 0x27, ""):
|
||||||
return {"ok": False, "message": "security seed request failed", "responses": [seed_resp]}
|
return {"ok": False, "message": "security seed request failed", "responses": [seed_resp]}
|
||||||
i = seed_resp.index(0x67)
|
i = bytes(seed_resp).find(bytes([0x67]))
|
||||||
seed = seed_resp[i + 2:]
|
seed = seed_resp[i + 2:]
|
||||||
key = fn(bytes(seed))
|
key = fn(bytes(seed))
|
||||||
lvl2 = f"{int(level, 16) + 1:02X}"
|
lvl2 = f"{int(level, 16) + 1:02X}"
|
||||||
kr = send("27" + lvl2 + _hex(key))
|
kr = send("27" + lvl2 + _hex(key))
|
||||||
if not _positive(kr, 0x27):
|
if not _ok(kr, 0x27, ""):
|
||||||
return {"ok": False, "message": "security unlock rejected", "responses": [kr]}
|
return {"ok": False, "message": "security unlock rejected", "responses": [kr]}
|
||||||
note("security unlocked")
|
note("security unlocked")
|
||||||
|
|
||||||
@@ -118,12 +175,12 @@ def run_action(action, link, log=None):
|
|||||||
d = send(st.send)
|
d = send(st.send)
|
||||||
responses.append(d)
|
responses.append(d)
|
||||||
req_sid = int(st.send[:2], 16)
|
req_sid = int(st.send[:2], 16)
|
||||||
if st.expect:
|
if _pending(d, req_sid):
|
||||||
ok = bytes.fromhex(st.expect)[0] in d
|
return {"ok": True, "responses": responses,
|
||||||
else:
|
"message": "ECU accepted the request; the operation is still in "
|
||||||
ok = _positive(d, req_sid)
|
"progress (response pending). Check the vehicle — do NOT retry."}
|
||||||
if not ok:
|
if not _ok(d, req_sid, st.expect):
|
||||||
reason = "ECU negative response" if _is_negative(d) else "no valid response"
|
reason = "ECU negative response" if _neg(d, req_sid) else "no valid response"
|
||||||
return {"ok": False, "message": f"{reason} to {st.send} (got {_hex(d) or 'nothing'})",
|
return {"ok": False, "message": f"{reason} to {st.send} (got {_hex(d) or 'nothing'})",
|
||||||
"responses": responses}
|
"responses": responses}
|
||||||
note(f"{st.send} -> {_hex(d)}")
|
note(f"{st.send} -> {_hex(d)}")
|
||||||
|
|||||||
+46
-2
@@ -5,11 +5,21 @@ import sys
|
|||||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
|
||||||
from obdcore import load_default, load_profile, profiles_dir
|
from obdcore import load_default, load_profile, profiles_dir
|
||||||
from obdcore.actions import Action, ActionStep, run_action, validate_action
|
from obdcore.actions import (Action, ActionStep, run_action, validate_action,
|
||||||
|
effective_risk)
|
||||||
from obdcore.mock import MockLink
|
from obdcore.mock import MockLink
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeLink:
|
||||||
|
"""Returns canned raw responses keyed by the sent hex (for parse tests)."""
|
||||||
|
def __init__(self, table):
|
||||||
|
self.table = table
|
||||||
|
|
||||||
|
def read_raw(self, hexcmd, timeout=2.0):
|
||||||
|
return self.table.get(hexcmd.replace(" ", ""), [])
|
||||||
|
|
||||||
|
|
||||||
def _mock():
|
def _mock():
|
||||||
return MockLink(clock=time.time)
|
return MockLink(clock=time.time)
|
||||||
|
|
||||||
@@ -64,9 +74,43 @@ def test_profile_actions_load():
|
|||||||
print(f" generic profile loads {len(prof.actions)} actions (risk-tagged): OK")
|
print(f" generic profile loads {len(prof.actions)} actions (risk-tagged): OK")
|
||||||
|
|
||||||
|
|
||||||
|
def test_effective_risk_cannot_be_downgraded():
|
||||||
|
# a profile mislabels an ECU-reset (0x11 write SID) as "safe"
|
||||||
|
a = Action("SNEAKY", "totally safe", risk="safe", steps=[ActionStep("1101")])
|
||||||
|
assert effective_risk(a) == "danger", "write SID must force danger regardless of label"
|
||||||
|
# a pure read is safe
|
||||||
|
assert effective_risk(Action("R", "read", steps=[ActionStep("22F190")])) == "safe"
|
||||||
|
# tester-present is safe; a security block bumps to at least caution
|
||||||
|
a2 = Action("S", "s", risk="safe", security={"level": "01", "algorithm": "xor-ff"},
|
||||||
|
steps=[ActionStep("3E00")])
|
||||||
|
assert effective_risk(a2) == "caution"
|
||||||
|
print(" effective_risk derives from SIDs (write can't be labeled safe): OK")
|
||||||
|
|
||||||
|
|
||||||
|
def test_response_parsing_rejects_false_positive():
|
||||||
|
# NRC byte 0x7E in a NEGATIVE response must NOT be read as positive for 0x3E.
|
||||||
|
# 7F 3E 11 = negativeResponse to service 3E; the old membership test saw 0x7E
|
||||||
|
# (=3E+40) elsewhere and false-passed.
|
||||||
|
a = Action("PING", "ping", steps=[ActionStep("3E00")])
|
||||||
|
r = run_action(a, _FakeLink({"3E00": [0x7F, 0x3E, 0x11]}))
|
||||||
|
assert not r["ok"] and "negative" in r["message"], r
|
||||||
|
# a genuine positive still passes
|
||||||
|
r2 = run_action(a, _FakeLink({"3E00": [0x7E, 0x00]}))
|
||||||
|
assert r2["ok"], r2
|
||||||
|
print(" contiguous response check rejects NRC false-positive: OK")
|
||||||
|
|
||||||
|
|
||||||
|
def test_response_pending_not_terminal():
|
||||||
|
a = Action("ROUT", "routine", risk="danger", steps=[ActionStep("31010203")])
|
||||||
|
r = run_action(a, _FakeLink({"31010203": [0x7F, 0x31, 0x78]})) # responsePending
|
||||||
|
assert r["ok"] and "pending" in r["message"].lower(), r
|
||||||
|
print(" 0x78 responsePending treated as in-progress, not failure: OK")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
for fn in [test_simple_action, test_session_and_reset, test_security_known_algo,
|
for fn in [test_simple_action, test_session_and_reset, test_security_known_algo,
|
||||||
test_security_unknown_algo_blocked, test_validate_rejects_bad_hex,
|
test_security_unknown_algo_blocked, test_validate_rejects_bad_hex,
|
||||||
test_profile_actions_load]:
|
test_profile_actions_load, test_effective_risk_cannot_be_downgraded,
|
||||||
|
test_response_parsing_rejects_false_positive, test_response_pending_not_terminal]:
|
||||||
fn()
|
fn()
|
||||||
print("\nALL ACTION TESTS PASS")
|
print("\nALL ACTION TESTS PASS")
|
||||||
|
|||||||
Reference in New Issue
Block a user