"""Bi-directional / service functions -- profile-defined command sequences. FORScan-class functions (actuator tests, service resets, module writes) are manufacturer-specific UDS (ISO 14229) sequences, so OBDash models them as DATA in the vehicle profile (an `actions` block) rather than hardcoded logic. An Action is a small sequence the runner executes through the ELM link: optional Mode 10 diagnostic session optional Mode 27 security access (seed -> key) one+ raw command(s) (Mode 2F I/O control, Mode 31 routine, Mode 11 ECU reset, Mode 3E tester-present, ...) with response checks SAFETY: - Effective risk is DERIVED from the actual UDS service IDs the action sends (effective_risk): any write/actuator/reset/transfer SID forces 'danger', so a profile can only RAISE the confirmation, never mislabel a reflash as "safe". - The runner sends ONLY the hex bytes defined in the profile -- nothing is synthesized. - Security-access KEY algorithms are per-vehicle secrets and are deliberately NOT bundled; only a few trivial/standard transforms are known. An action that needs an unknown algorithm is BLOCKED (fails safe) rather than guessed. """ from dataclasses import dataclass, field @dataclass class ActionStep: send: str # hex bytes to send, e.g. "1101" expect: str = "" # hex the response must contain; "" = accept UDS positive response @dataclass class Action: key: str name: str kind: str = "test" # test | actuator | reset | write risk: str = "safe" # safe | caution | danger description: str = "" warning: str = "" # shown in the confirmation for caution/danger session: str = None # Mode 10 subfunction hex (e.g. "03" extended) security: dict = None # {"level": "01", "algorithm": ""} steps: list = field(default_factory=list) # list[ActionStep] success_msg: str = "Done." # Known, NON-secret security transforms. Real per-vehicle seed->key algorithms # are proprietary and intentionally absent -- unknown algorithm => action blocked. SECURITY_ALGOS = { "xor-ff": lambda seed: bytes((b ^ 0xFF) & 0xFF for b in seed), "invert": lambda seed: bytes((~b) & 0xFF for b in seed), "add-ff": lambda seed: bytes((b + 0xFF) & 0xFF for b in seed), } def _hex(byte_list): return "".join(f"{b:02X}" for b in byte_list) def _find(data, sub): """True if `sub` (list/bytes) appears as a CONTIGUOUS run in `data`.""" if not sub: return False return bytes(data).find(bytes(sub)) != -1 def _neg(data, sid): """UDS negative response for this service = contiguous 7F .""" return _find(data, [0x7F, sid]) def _pending(data, sid): """Negative response code 0x78 = requestCorrectlyReceived-ResponsePending.""" return _find(data, [0x7F, sid, 0x78]) def _ok(data, sid, expect): """A step succeeds only if there's NO negative response AND either the expected bytes or the UDS positive-response id (sid+0x40) appears.""" if _neg(data, sid): return False if expect: return _find(data, bytes.fromhex(expect)) return _find(data, [(sid + 0x40) & 0xFF]) # UDS/OBD service-id classification -- used to derive an action's real risk from # what it actually SENDS, so a profile can't mislabel a write as "safe". READ_ONLY_SIDS = {0x01, 0x02, 0x03, 0x07, 0x09, 0x0A, 0x19, 0x22, 0x2A, 0x3E} WRITE_SIDS = {0x04, 0x11, 0x14, 0x28, 0x2E, 0x2F, 0x31, 0x34, 0x35, 0x36, 0x37, 0x38, 0x3B, 0x3D, 0x85} _RISK_ORDER = {"safe": 0, "caution": 1, "danger": 2} def effective_risk(action): """Risk = max(profile-declared, risk derived from the actual service IDs). Write/actuator/reset/transfer SIDs force 'danger'; unknown SIDs, a non-default session, or a security block force at least 'caution'. The profile can only RAISE the risk, never lower it below what the commands warrant.""" derived = "safe" def bump(r): nonlocal derived if _RISK_ORDER[r] > _RISK_ORDER[derived]: derived = r if action.session and action.session.lower() not in ("01", ""): bump("caution") if action.security: bump("caution") for st in action.steps: try: sid = int(st.send[:2], 16) except ValueError: bump("danger"); continue if sid in WRITE_SIDS: bump("danger") elif sid not in READ_ONLY_SIDS: bump("caution") declared = action.risk if action.risk in _RISK_ORDER else "danger" return max((derived, declared), key=lambda r: _RISK_ORDER[r]) def validate_action(a): """Raise ValueError if an action has malformed hex / structure.""" for st in a.steps: bytes.fromhex(st.send) # raises if not valid hex if st.expect: bytes.fromhex(st.expect) if a.session is not None: bytes.fromhex(a.session) if a.security is not None and "algorithm" not in a.security: raise ValueError(f"action {a.key}: security block needs an 'algorithm'") def run_action(action, link, log=None): """Execute an Action through `link` (must expose read_raw(hex)->list[int]). Returns {"ok": bool, "message": str, "responses": [list[int], ...]}.""" def note(m): if log: log(m) def send(hexstr): return link.read_raw(hexstr) # 1. diagnostic session if action.session: d = send("10" + action.session) if not _ok(d, 0x10, "50"): return {"ok": False, "message": "could not enter diagnostic session", "responses": [d]} # 2. security access (seed -> key) if action.security: algo = action.security.get("algorithm") fn = SECURITY_ALGOS.get(algo) if fn is None: return {"ok": False, "message": f"security algorithm '{algo}' not available " "(per-vehicle secret) — action blocked for safety", "responses": []} level = action.security.get("level", "01") seed_resp = send("27" + level) if not _ok(seed_resp, 0x27, ""): return {"ok": False, "message": "security seed request failed", "responses": [seed_resp]} i = bytes(seed_resp).find(bytes([0x67])) seed = seed_resp[i + 2:] key = fn(bytes(seed)) lvl2 = f"{int(level, 16) + 1:02X}" kr = send("27" + lvl2 + _hex(key)) if not _ok(kr, 0x27, ""): return {"ok": False, "message": "security unlock rejected", "responses": [kr]} note("security unlocked") # 3. the command steps responses = [] for st in action.steps: d = send(st.send) responses.append(d) req_sid = int(st.send[:2], 16) if _pending(d, req_sid): return {"ok": True, "responses": responses, "message": "ECU accepted the request; the operation is still in " "progress (response pending). Check the vehicle — do NOT retry."} if not _ok(d, req_sid, st.expect): reason = "ECU negative response" if _neg(d, req_sid) else "no valid response" return {"ok": False, "message": f"{reason} to {st.send} (got {_hex(d) or 'nothing'})", "responses": responses} note(f"{st.send} -> {_hex(d)}") return {"ok": True, "message": action.success_msg, "responses": responses}