"""Bi-directional action framework tests (against MockLink, no hardware).""" import os import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from obdcore import load_default, load_profile, profiles_dir from obdcore.actions import (Action, ActionStep, run_action, validate_action, effective_risk) from obdcore.mock import MockLink import time class _FakeLink: """Returns canned raw responses keyed by the sent hex (for parse tests).""" def __init__(self, table): self.table = table def read_raw(self, hexcmd, timeout=2.0): return self.table.get(hexcmd.replace(" ", ""), []) def _mock(): return MockLink(clock=time.time) def test_simple_action(): a = Action("PING", "Tester Present", steps=[ActionStep("3E00")]) r = run_action(a, _mock()) assert r["ok"], r print(" simple action (3E00): OK") def test_session_and_reset(): a = Action("RESET", "ECU Reset", session="03", steps=[ActionStep("1101")]) r = run_action(a, _mock()) assert r["ok"], r print(" session + reset: OK") def test_security_known_algo(): a = Action("LOCKED", "Secured routine", security={"level": "01", "algorithm": "xor-ff"}, steps=[ActionStep("31010203")]) # start routine r = run_action(a, _mock()) assert r["ok"], r print(" security handshake (xor-ff seed->key): OK") def test_security_unknown_algo_blocked(): a = Action("SECRET", "Vendor routine", security={"level": "01", "algorithm": "ford-2005-secret"}, steps=[ActionStep("31010203")]) r = run_action(a, _mock()) assert not r["ok"] and "not available" in r["message"], r print(" unknown security algorithm blocked (fails safe): OK") def test_validate_rejects_bad_hex(): bad = Action("BAD", "bad", steps=[ActionStep("ZZ")]) try: validate_action(bad) raise AssertionError("should reject non-hex send") except ValueError: pass print(" malformed hex rejected at load: OK") def test_profile_actions_load(): prof = load_profile(os.path.join(profiles_dir(), "generic-obd2.json")) keys = {a.key for a in (prof.actions or [])} assert "TESTER_PRESENT" in keys and "ECU_RESET" in keys, keys # the reset is risk-gated reset = next(a for a in prof.actions if a.key == "ECU_RESET") assert reset.risk == "caution" and reset.warning print(f" generic profile loads {len(prof.actions)} actions (risk-tagged): OK") def test_effective_risk_cannot_be_downgraded(): # a profile mislabels an ECU-reset (0x11 write SID) as "safe" a = Action("SNEAKY", "totally safe", risk="safe", steps=[ActionStep("1101")]) assert effective_risk(a) == "danger", "write SID must force danger regardless of label" # a pure read is safe assert effective_risk(Action("R", "read", steps=[ActionStep("22F190")])) == "safe" # tester-present is safe; a security block bumps to at least caution a2 = Action("S", "s", risk="safe", security={"level": "01", "algorithm": "xor-ff"}, steps=[ActionStep("3E00")]) assert effective_risk(a2) == "caution" print(" effective_risk derives from SIDs (write can't be labeled safe): OK") def test_response_parsing_rejects_false_positive(): # NRC byte 0x7E in a NEGATIVE response must NOT be read as positive for 0x3E. # 7F 3E 11 = negativeResponse to service 3E; the old membership test saw 0x7E # (=3E+40) elsewhere and false-passed. a = Action("PING", "ping", steps=[ActionStep("3E00")]) r = run_action(a, _FakeLink({"3E00": [0x7F, 0x3E, 0x11]})) assert not r["ok"] and "negative" in r["message"], r # a genuine positive still passes r2 = run_action(a, _FakeLink({"3E00": [0x7E, 0x00]})) assert r2["ok"], r2 print(" contiguous response check rejects NRC false-positive: OK") def test_response_pending_not_terminal(): a = Action("ROUT", "routine", risk="danger", steps=[ActionStep("31010203")]) r = run_action(a, _FakeLink({"31010203": [0x7F, 0x31, 0x78]})) # responsePending assert r["ok"] and "pending" in r["message"].lower(), r print(" 0x78 responsePending treated as in-progress, not failure: OK") if __name__ == "__main__": for fn in [test_simple_action, test_session_and_reset, test_security_known_algo, test_security_unknown_algo_blocked, test_validate_rejects_bad_hex, test_profile_actions_load, test_effective_risk_cannot_be_downgraded, test_response_parsing_rejects_false_positive, test_response_pending_not_terminal]: fn() print("\nALL ACTION TESTS PASS")