b5e0c96763
Untrusted profiles could bypass the confirmation and responses were mis-parsed: - effective_risk(action): risk is now DERIVED from the actual service IDs the steps send — any write/actuator/reset/transfer SID (2F/31/11/14/2E/27/34-37/…) forces 'danger'; unknown SID / non-default session / security block force 'caution'. A profile can only RAISE risk, never label a reflash 'safe'. GUI gates the confirmation (and the risk badge) on this derived value. - Response checks use CONTIGUOUS subsequence matching + a hard '7F <sid>' negative-response guard, so an NRC data byte (e.g. 0x7E) can't false-pass as a positive response; applied to session/security/step checks. - 0x78 (responsePending) is treated as in-progress, not terminal failure. - controller.run_action restores slow ELM timing for the run (0x78 window). - Tests: risk-cannot-be-downgraded, NRC false-positive rejected, pending handled. Closes #7 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
117 lines
4.5 KiB
Python
117 lines
4.5 KiB
Python
"""Bi-directional action framework tests (against MockLink, no hardware)."""
|
|
import os
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from obdcore import load_default, load_profile, profiles_dir
|
|
from obdcore.actions import (Action, ActionStep, run_action, validate_action,
|
|
effective_risk)
|
|
from obdcore.mock import MockLink
|
|
import time
|
|
|
|
|
|
class _FakeLink:
|
|
"""Returns canned raw responses keyed by the sent hex (for parse tests)."""
|
|
def __init__(self, table):
|
|
self.table = table
|
|
|
|
def read_raw(self, hexcmd, timeout=2.0):
|
|
return self.table.get(hexcmd.replace(" ", ""), [])
|
|
|
|
|
|
def _mock():
|
|
return MockLink(clock=time.time)
|
|
|
|
|
|
def test_simple_action():
|
|
a = Action("PING", "Tester Present", steps=[ActionStep("3E00")])
|
|
r = run_action(a, _mock())
|
|
assert r["ok"], r
|
|
print(" simple action (3E00): OK")
|
|
|
|
|
|
def test_session_and_reset():
|
|
a = Action("RESET", "ECU Reset", session="03", steps=[ActionStep("1101")])
|
|
r = run_action(a, _mock())
|
|
assert r["ok"], r
|
|
print(" session + reset: OK")
|
|
|
|
|
|
def test_security_known_algo():
|
|
a = Action("LOCKED", "Secured routine", security={"level": "01", "algorithm": "xor-ff"},
|
|
steps=[ActionStep("31010203")]) # start routine
|
|
r = run_action(a, _mock())
|
|
assert r["ok"], r
|
|
print(" security handshake (xor-ff seed->key): OK")
|
|
|
|
|
|
def test_security_unknown_algo_blocked():
|
|
a = Action("SECRET", "Vendor routine", security={"level": "01", "algorithm": "ford-2005-secret"},
|
|
steps=[ActionStep("31010203")])
|
|
r = run_action(a, _mock())
|
|
assert not r["ok"] and "not available" in r["message"], r
|
|
print(" unknown security algorithm blocked (fails safe): OK")
|
|
|
|
|
|
def test_validate_rejects_bad_hex():
|
|
bad = Action("BAD", "bad", steps=[ActionStep("ZZ")])
|
|
try:
|
|
validate_action(bad)
|
|
raise AssertionError("should reject non-hex send")
|
|
except ValueError:
|
|
pass
|
|
print(" malformed hex rejected at load: OK")
|
|
|
|
|
|
def test_profile_actions_load():
|
|
prof = load_profile(os.path.join(profiles_dir(), "generic-obd2.json"))
|
|
keys = {a.key for a in (prof.actions or [])}
|
|
assert "TESTER_PRESENT" in keys and "ECU_RESET" in keys, keys
|
|
# the reset is risk-gated
|
|
reset = next(a for a in prof.actions if a.key == "ECU_RESET")
|
|
assert reset.risk == "caution" and reset.warning
|
|
print(f" generic profile loads {len(prof.actions)} actions (risk-tagged): OK")
|
|
|
|
|
|
def test_effective_risk_cannot_be_downgraded():
|
|
# a profile mislabels an ECU-reset (0x11 write SID) as "safe"
|
|
a = Action("SNEAKY", "totally safe", risk="safe", steps=[ActionStep("1101")])
|
|
assert effective_risk(a) == "danger", "write SID must force danger regardless of label"
|
|
# a pure read is safe
|
|
assert effective_risk(Action("R", "read", steps=[ActionStep("22F190")])) == "safe"
|
|
# tester-present is safe; a security block bumps to at least caution
|
|
a2 = Action("S", "s", risk="safe", security={"level": "01", "algorithm": "xor-ff"},
|
|
steps=[ActionStep("3E00")])
|
|
assert effective_risk(a2) == "caution"
|
|
print(" effective_risk derives from SIDs (write can't be labeled safe): OK")
|
|
|
|
|
|
def test_response_parsing_rejects_false_positive():
|
|
# NRC byte 0x7E in a NEGATIVE response must NOT be read as positive for 0x3E.
|
|
# 7F 3E 11 = negativeResponse to service 3E; the old membership test saw 0x7E
|
|
# (=3E+40) elsewhere and false-passed.
|
|
a = Action("PING", "ping", steps=[ActionStep("3E00")])
|
|
r = run_action(a, _FakeLink({"3E00": [0x7F, 0x3E, 0x11]}))
|
|
assert not r["ok"] and "negative" in r["message"], r
|
|
# a genuine positive still passes
|
|
r2 = run_action(a, _FakeLink({"3E00": [0x7E, 0x00]}))
|
|
assert r2["ok"], r2
|
|
print(" contiguous response check rejects NRC false-positive: OK")
|
|
|
|
|
|
def test_response_pending_not_terminal():
|
|
a = Action("ROUT", "routine", risk="danger", steps=[ActionStep("31010203")])
|
|
r = run_action(a, _FakeLink({"31010203": [0x7F, 0x31, 0x78]})) # responsePending
|
|
assert r["ok"] and "pending" in r["message"].lower(), r
|
|
print(" 0x78 responsePending treated as in-progress, not failure: OK")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
for fn in [test_simple_action, test_session_and_reset, test_security_known_algo,
|
|
test_security_unknown_algo_blocked, test_validate_rejects_bad_hex,
|
|
test_profile_actions_load, test_effective_risk_cannot_be_downgraded,
|
|
test_response_parsing_rejects_false_positive, test_response_pending_not_terminal]:
|
|
fn()
|
|
print("\nALL ACTION TESTS PASS")
|