#2 (framework): bi-directional / service-function engine

Profile-defined UDS action sequences, run safely -- the framework for #2 (real
per-vehicle actuator tests/resets are follow-on, added as verified profile data).

- obdcore/actions.py: Action model + run_action() executing session (Mode 10) ->
  security (Mode 27 seed->key) -> command steps (2F/31/11/3E/... any hex) with
  positive/negative response checks. Security KEY algorithms are per-vehicle
  secrets and NOT bundled -- only trivial transforms (xor-ff/invert/add-ff)
  known; an action naming an unknown algorithm is BLOCKED (fails safe). Never
  synthesizes bytes -- runs only what the profile defines. validate_action()
  rejects malformed hex at load.
- profile.py: load/save an actions[] block; ElmLink/MockLink read_raw(hex).
- GUI: Diagnostics -> Service & Bi-directional dialog -- lists the profile's
  actions with risk badges; caution/danger gated behind a warning confirmation.
- generic-obd2: two safe STANDARD actions (Tester-Present ping; ECU-Reset,
  caution + engine-off warning). PROFILE_SPEC.md documents the actions schema
  + safety rules.
- tests/test_actions.py: runner, session+reset, security handshake, unknown-algo
  block, hex validation, profile load. All 5 suites pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
2026-07-01 16:33:51 -04:00
parent 74bfa2e146
commit d435384b58
10 changed files with 371 additions and 2 deletions
+131
View File
@@ -0,0 +1,131 @@
"""Bi-directional / service functions -- profile-defined command sequences.
FORScan-class functions (actuator tests, service resets, module writes) are
manufacturer-specific UDS (ISO 14229) sequences, so OBDash models them as DATA
in the vehicle profile (an `actions` block) rather than hardcoded logic. An
Action is a small sequence the runner executes through the ELM link:
optional Mode 10 diagnostic session
optional Mode 27 security access (seed -> key)
one+ raw command(s) (Mode 2F I/O control, Mode 31 routine, Mode 11
ECU reset, Mode 3E tester-present, ...) with response checks
SAFETY:
- Every action carries a `risk` (safe | caution | danger); the GUI gates
caution/danger behind an explicit confirmation.
- The runner sends ONLY the hex bytes defined in the profile -- nothing is
synthesized.
- Security-access KEY algorithms are per-vehicle secrets and are deliberately
NOT bundled; only a few trivial/standard transforms are known. An action that
needs an unknown algorithm is BLOCKED (fails safe) rather than guessed.
"""
from dataclasses import dataclass, field
@dataclass
class ActionStep:
send: str # hex bytes to send, e.g. "1101"
expect: str = "" # hex the response must contain; "" = accept UDS positive response
@dataclass
class Action:
key: str
name: str
kind: str = "test" # test | actuator | reset | write
risk: str = "safe" # safe | caution | danger
description: str = ""
warning: str = "" # shown in the confirmation for caution/danger
session: str = None # Mode 10 subfunction hex (e.g. "03" extended)
security: dict = None # {"level": "01", "algorithm": "<name>"}
steps: list = field(default_factory=list) # list[ActionStep]
success_msg: str = "Done."
# Known, NON-secret security transforms. Real per-vehicle seed->key algorithms
# are proprietary and intentionally absent -- unknown algorithm => action blocked.
SECURITY_ALGOS = {
"xor-ff": lambda seed: bytes((b ^ 0xFF) & 0xFF for b in seed),
"invert": lambda seed: bytes((~b) & 0xFF for b in seed),
"add-ff": lambda seed: bytes((b + 0xFF) & 0xFF for b in seed),
}
def _hex(byte_list):
return "".join(f"{b:02X}" for b in byte_list)
def _positive(data, req_sid):
"""UDS positive response = request service id + 0x40 present in the data."""
return (req_sid + 0x40) in data
def _is_negative(data):
return 0x7F in data
def validate_action(a):
"""Raise ValueError if an action has malformed hex / structure."""
for st in a.steps:
bytes.fromhex(st.send) # raises if not valid hex
if st.expect:
bytes.fromhex(st.expect)
if a.session is not None:
bytes.fromhex(a.session)
if a.security is not None and "algorithm" not in a.security:
raise ValueError(f"action {a.key}: security block needs an 'algorithm'")
def run_action(action, link, log=None):
"""Execute an Action through `link` (must expose read_raw(hex)->list[int]).
Returns {"ok": bool, "message": str, "responses": [list[int], ...]}."""
def note(m):
if log:
log(m)
def send(hexstr):
return link.read_raw(hexstr)
# 1. diagnostic session
if action.session:
d = send("10" + action.session)
if not _positive(d, 0x10):
return {"ok": False, "message": "could not enter diagnostic session", "responses": [d]}
# 2. security access (seed -> key)
if action.security:
algo = action.security.get("algorithm")
fn = SECURITY_ALGOS.get(algo)
if fn is None:
return {"ok": False, "message": f"security algorithm '{algo}' not available "
"(per-vehicle secret) — action blocked for safety", "responses": []}
level = action.security.get("level", "01")
seed_resp = send("27" + level)
if not _positive(seed_resp, 0x27):
return {"ok": False, "message": "security seed request failed", "responses": [seed_resp]}
i = seed_resp.index(0x67)
seed = seed_resp[i + 2:]
key = fn(bytes(seed))
lvl2 = f"{int(level, 16) + 1:02X}"
kr = send("27" + lvl2 + _hex(key))
if not _positive(kr, 0x27):
return {"ok": False, "message": "security unlock rejected", "responses": [kr]}
note("security unlocked")
# 3. the command steps
responses = []
for st in action.steps:
d = send(st.send)
responses.append(d)
req_sid = int(st.send[:2], 16)
if st.expect:
ok = bytes.fromhex(st.expect)[0] in d
else:
ok = _positive(d, req_sid)
if not ok:
reason = "ECU negative response" if _is_negative(d) else "no valid response"
return {"ok": False, "message": f"{reason} to {st.send} (got {_hex(d) or 'nothing'})",
"responses": responses}
note(f"{st.send} -> {_hex(d)}")
return {"ok": True, "message": action.success_msg, "responses": responses}