b5e0c96763
Untrusted profiles could bypass the confirmation and responses were mis-parsed: - effective_risk(action): risk is now DERIVED from the actual service IDs the steps send — any write/actuator/reset/transfer SID (2F/31/11/14/2E/27/34-37/…) forces 'danger'; unknown SID / non-default session / security block force 'caution'. A profile can only RAISE risk, never label a reflash 'safe'. GUI gates the confirmation (and the risk badge) on this derived value. - Response checks use CONTIGUOUS subsequence matching + a hard '7F <sid>' negative-response guard, so an NRC data byte (e.g. 0x7E) can't false-pass as a positive response; applied to session/security/step checks. - 0x78 (responsePending) is treated as in-progress, not terminal failure. - controller.run_action restores slow ELM timing for the run (0x78 window). - Tests: risk-cannot-be-downgraded, NRC false-positive rejected, pending handled. Closes #7 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
189 lines
7.3 KiB
Python
189 lines
7.3 KiB
Python
"""Bi-directional / service functions -- profile-defined command sequences.
|
|
|
|
FORScan-class functions (actuator tests, service resets, module writes) are
|
|
manufacturer-specific UDS (ISO 14229) sequences, so OBDash models them as DATA
|
|
in the vehicle profile (an `actions` block) rather than hardcoded logic. An
|
|
Action is a small sequence the runner executes through the ELM link:
|
|
|
|
optional Mode 10 diagnostic session
|
|
optional Mode 27 security access (seed -> key)
|
|
one+ raw command(s) (Mode 2F I/O control, Mode 31 routine, Mode 11
|
|
ECU reset, Mode 3E tester-present, ...) with response checks
|
|
|
|
SAFETY:
|
|
- Effective risk is DERIVED from the actual UDS service IDs the action sends
|
|
(effective_risk): any write/actuator/reset/transfer SID forces 'danger', so a
|
|
profile can only RAISE the confirmation, never mislabel a reflash as "safe".
|
|
- The runner sends ONLY the hex bytes defined in the profile -- nothing is
|
|
synthesized.
|
|
- Security-access KEY algorithms are per-vehicle secrets and are deliberately
|
|
NOT bundled; only a few trivial/standard transforms are known. An action that
|
|
needs an unknown algorithm is BLOCKED (fails safe) rather than guessed.
|
|
"""
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
@dataclass
|
|
class ActionStep:
|
|
send: str # hex bytes to send, e.g. "1101"
|
|
expect: str = "" # hex the response must contain; "" = accept UDS positive response
|
|
|
|
|
|
@dataclass
|
|
class Action:
|
|
key: str
|
|
name: str
|
|
kind: str = "test" # test | actuator | reset | write
|
|
risk: str = "safe" # safe | caution | danger
|
|
description: str = ""
|
|
warning: str = "" # shown in the confirmation for caution/danger
|
|
session: str = None # Mode 10 subfunction hex (e.g. "03" extended)
|
|
security: dict = None # {"level": "01", "algorithm": "<name>"}
|
|
steps: list = field(default_factory=list) # list[ActionStep]
|
|
success_msg: str = "Done."
|
|
|
|
|
|
# Known, NON-secret security transforms. Real per-vehicle seed->key algorithms
|
|
# are proprietary and intentionally absent -- unknown algorithm => action blocked.
|
|
SECURITY_ALGOS = {
|
|
"xor-ff": lambda seed: bytes((b ^ 0xFF) & 0xFF for b in seed),
|
|
"invert": lambda seed: bytes((~b) & 0xFF for b in seed),
|
|
"add-ff": lambda seed: bytes((b + 0xFF) & 0xFF for b in seed),
|
|
}
|
|
|
|
|
|
def _hex(byte_list):
|
|
return "".join(f"{b:02X}" for b in byte_list)
|
|
|
|
|
|
def _find(data, sub):
|
|
"""True if `sub` (list/bytes) appears as a CONTIGUOUS run in `data`."""
|
|
if not sub:
|
|
return False
|
|
return bytes(data).find(bytes(sub)) != -1
|
|
|
|
|
|
def _neg(data, sid):
|
|
"""UDS negative response for this service = contiguous 7F <sid>."""
|
|
return _find(data, [0x7F, sid])
|
|
|
|
|
|
def _pending(data, sid):
|
|
"""Negative response code 0x78 = requestCorrectlyReceived-ResponsePending."""
|
|
return _find(data, [0x7F, sid, 0x78])
|
|
|
|
|
|
def _ok(data, sid, expect):
|
|
"""A step succeeds only if there's NO negative response AND either the
|
|
expected bytes or the UDS positive-response id (sid+0x40) appears."""
|
|
if _neg(data, sid):
|
|
return False
|
|
if expect:
|
|
return _find(data, bytes.fromhex(expect))
|
|
return _find(data, [(sid + 0x40) & 0xFF])
|
|
|
|
|
|
# UDS/OBD service-id classification -- used to derive an action's real risk from
|
|
# what it actually SENDS, so a profile can't mislabel a write as "safe".
|
|
READ_ONLY_SIDS = {0x01, 0x02, 0x03, 0x07, 0x09, 0x0A, 0x19, 0x22, 0x2A, 0x3E}
|
|
WRITE_SIDS = {0x04, 0x11, 0x14, 0x28, 0x2E, 0x2F, 0x31, 0x34, 0x35, 0x36, 0x37,
|
|
0x38, 0x3B, 0x3D, 0x85}
|
|
_RISK_ORDER = {"safe": 0, "caution": 1, "danger": 2}
|
|
|
|
|
|
def effective_risk(action):
|
|
"""Risk = max(profile-declared, risk derived from the actual service IDs).
|
|
Write/actuator/reset/transfer SIDs force 'danger'; unknown SIDs, a non-default
|
|
session, or a security block force at least 'caution'. The profile can only
|
|
RAISE the risk, never lower it below what the commands warrant."""
|
|
derived = "safe"
|
|
|
|
def bump(r):
|
|
nonlocal derived
|
|
if _RISK_ORDER[r] > _RISK_ORDER[derived]:
|
|
derived = r
|
|
|
|
if action.session and action.session.lower() not in ("01", ""):
|
|
bump("caution")
|
|
if action.security:
|
|
bump("caution")
|
|
for st in action.steps:
|
|
try:
|
|
sid = int(st.send[:2], 16)
|
|
except ValueError:
|
|
bump("danger"); continue
|
|
if sid in WRITE_SIDS:
|
|
bump("danger")
|
|
elif sid not in READ_ONLY_SIDS:
|
|
bump("caution")
|
|
declared = action.risk if action.risk in _RISK_ORDER else "danger"
|
|
return max((derived, declared), key=lambda r: _RISK_ORDER[r])
|
|
|
|
|
|
def validate_action(a):
|
|
"""Raise ValueError if an action has malformed hex / structure."""
|
|
for st in a.steps:
|
|
bytes.fromhex(st.send) # raises if not valid hex
|
|
if st.expect:
|
|
bytes.fromhex(st.expect)
|
|
if a.session is not None:
|
|
bytes.fromhex(a.session)
|
|
if a.security is not None and "algorithm" not in a.security:
|
|
raise ValueError(f"action {a.key}: security block needs an 'algorithm'")
|
|
|
|
|
|
def run_action(action, link, log=None):
|
|
"""Execute an Action through `link` (must expose read_raw(hex)->list[int]).
|
|
Returns {"ok": bool, "message": str, "responses": [list[int], ...]}."""
|
|
def note(m):
|
|
if log:
|
|
log(m)
|
|
|
|
def send(hexstr):
|
|
return link.read_raw(hexstr)
|
|
|
|
# 1. diagnostic session
|
|
if action.session:
|
|
d = send("10" + action.session)
|
|
if not _ok(d, 0x10, "50"):
|
|
return {"ok": False, "message": "could not enter diagnostic session",
|
|
"responses": [d]}
|
|
|
|
# 2. security access (seed -> key)
|
|
if action.security:
|
|
algo = action.security.get("algorithm")
|
|
fn = SECURITY_ALGOS.get(algo)
|
|
if fn is None:
|
|
return {"ok": False, "message": f"security algorithm '{algo}' not available "
|
|
"(per-vehicle secret) — action blocked for safety", "responses": []}
|
|
level = action.security.get("level", "01")
|
|
seed_resp = send("27" + level)
|
|
if not _ok(seed_resp, 0x27, ""):
|
|
return {"ok": False, "message": "security seed request failed", "responses": [seed_resp]}
|
|
i = bytes(seed_resp).find(bytes([0x67]))
|
|
seed = seed_resp[i + 2:]
|
|
key = fn(bytes(seed))
|
|
lvl2 = f"{int(level, 16) + 1:02X}"
|
|
kr = send("27" + lvl2 + _hex(key))
|
|
if not _ok(kr, 0x27, ""):
|
|
return {"ok": False, "message": "security unlock rejected", "responses": [kr]}
|
|
note("security unlocked")
|
|
|
|
# 3. the command steps
|
|
responses = []
|
|
for st in action.steps:
|
|
d = send(st.send)
|
|
responses.append(d)
|
|
req_sid = int(st.send[:2], 16)
|
|
if _pending(d, req_sid):
|
|
return {"ok": True, "responses": responses,
|
|
"message": "ECU accepted the request; the operation is still in "
|
|
"progress (response pending). Check the vehicle — do NOT retry."}
|
|
if not _ok(d, req_sid, st.expect):
|
|
reason = "ECU negative response" if _neg(d, req_sid) else "no valid response"
|
|
return {"ok": False, "message": f"{reason} to {st.send} (got {_hex(d) or 'nothing'})",
|
|
"responses": responses}
|
|
note(f"{st.send} -> {_hex(d)}")
|
|
|
|
return {"ok": True, "message": action.success_msg, "responses": responses}
|