d435384b58
Profile-defined UDS action sequences, run safely -- the framework for #2 (real per-vehicle actuator tests/resets are follow-on, added as verified profile data). - obdcore/actions.py: Action model + run_action() executing session (Mode 10) -> security (Mode 27 seed->key) -> command steps (2F/31/11/3E/... any hex) with positive/negative response checks. Security KEY algorithms are per-vehicle secrets and NOT bundled -- only trivial transforms (xor-ff/invert/add-ff) known; an action naming an unknown algorithm is BLOCKED (fails safe). Never synthesizes bytes -- runs only what the profile defines. validate_action() rejects malformed hex at load. - profile.py: load/save an actions[] block; ElmLink/MockLink read_raw(hex). - GUI: Diagnostics -> Service & Bi-directional dialog -- lists the profile's actions with risk badges; caution/danger gated behind a warning confirmation. - generic-obd2: two safe STANDARD actions (Tester-Present ping; ECU-Reset, caution + engine-off warning). PROFILE_SPEC.md documents the actions schema + safety rules. - tests/test_actions.py: runner, session+reset, security handshake, unknown-algo block, hex validation, profile load. All 5 suites pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
93 lines
3.3 KiB
Python
93 lines
3.3 KiB
Python
"""MockLink -- a synthetic ElmLink for tests and GUI development without a
|
|
truck. Simulates a cranking 6.0: ICP ramps toward ~540 psi, FICM holds ~48V,
|
|
battery sags, MAP/BARO sit at atmospheric. Same read interface as ElmLink.
|
|
"""
|
|
|
|
|
|
class MockLink:
|
|
def __init__(self, clock):
|
|
self.clock = clock # callable -> float seconds
|
|
self.t0 = clock()
|
|
self.protocol = "A6"
|
|
|
|
def init(self):
|
|
pass
|
|
|
|
def fast_timing(self, on=True):
|
|
pass
|
|
|
|
def connect(self):
|
|
return True
|
|
|
|
def is_can(self):
|
|
return True
|
|
|
|
def _u16le(self, raw16):
|
|
return [(raw16 >> 8) & 0xFF, raw16 & 0xFF]
|
|
|
|
def read_m22(self, pid, timeout=0.5):
|
|
el = self.clock() - self.t0
|
|
if pid == "1446": # ICP: ramps 0 -> 540 over ~2.7s
|
|
return self._u16le(int(min(540, el * 200) / 0.57))
|
|
if pid == "09D0": # FICM main ~48V (0x3000)
|
|
return self._u16le(0x3000)
|
|
if pid == "1440": # MAP atmospheric
|
|
return [0x01, 0x89]
|
|
if pid == "1442": # BARO atmospheric
|
|
return [0x01, 0x88]
|
|
if pid == "1445": # EBP atmospheric
|
|
return [0x01, 0x8F]
|
|
if pid == "1310": # EOT ~33C
|
|
return [0x1C, 0x92]
|
|
return None # everything else: no response
|
|
|
|
def read_m01(self, pid, nbytes, timeout=0.6):
|
|
if pid == "0C": # RPM ~750 idle
|
|
v = 750 * 4
|
|
return [(v >> 8) & 0xFF, v & 0xFF]
|
|
if pid == "05": # ECT 82C
|
|
return [122]
|
|
if pid == "0D": # speed 48 km/h
|
|
return [48]
|
|
if pid == "10": # MAF 12.0 g/s
|
|
v = 1200
|
|
return [(v >> 8) & 0xFF, v & 0xFF]
|
|
if pid == "01": # readiness: MIL off, 0 DTCs, mixed monitors
|
|
return [0x00, 0x07, 0x61, 0x20]
|
|
return None
|
|
|
|
def read_atrv(self, timeout=0.8):
|
|
el = self.clock() - self.t0
|
|
return 10.6 if el < 2.5 else 12.5 # crank sag then recover
|
|
|
|
def read_dtcs(self, mode, svc, timeout=5.0):
|
|
return ["P0148"] if mode == "03" else []
|
|
|
|
def clear_dtcs(self):
|
|
return True
|
|
|
|
def read_vehicle_info(self, timeout=2.0):
|
|
return {"vin": "1FMZU73E12ZA12345", "calibration": "JR3A-12A650-BCD",
|
|
"ecu_name": "ECM-EngineControl"}
|
|
|
|
def read_readiness(self, timeout=1.0):
|
|
from . import obdservices as svc
|
|
return svc.decode_readiness([0x00, 0x07, 0x61, 0x20])
|
|
|
|
def read_freeze_frame(self, timeout=0.6):
|
|
return {"dtc": "P0148",
|
|
"values": [("Engine RPM", 240, "rpm"), ("Coolant Temp", 33, "C"),
|
|
("Engine Load", 18, "%"), ("Vehicle Speed", 0, "km/h")]}
|
|
|
|
def read_raw(self, hexcmd, timeout=2.0):
|
|
# Return a UDS positive response (sid+0x40) so actions succeed in mock.
|
|
h = hexcmd.replace(" ", "")
|
|
sid = int(h[:2], 16)
|
|
rest = [int(h[i:i + 2], 16) for i in range(2, len(h) - 1, 2)]
|
|
if sid == 0x27 and len(rest) == 1: # security seed request -> return a seed
|
|
return [0x67, rest[0], 0x11, 0x22, 0x33, 0x44]
|
|
return [(sid + 0x40) & 0xFF] + rest
|
|
|
|
def close(self):
|
|
pass
|