d435384b58
Profile-defined UDS action sequences, run safely -- the framework for #2 (real per-vehicle actuator tests/resets are follow-on, added as verified profile data). - obdcore/actions.py: Action model + run_action() executing session (Mode 10) -> security (Mode 27 seed->key) -> command steps (2F/31/11/3E/... any hex) with positive/negative response checks. Security KEY algorithms are per-vehicle secrets and NOT bundled -- only trivial transforms (xor-ff/invert/add-ff) known; an action naming an unknown algorithm is BLOCKED (fails safe). Never synthesizes bytes -- runs only what the profile defines. validate_action() rejects malformed hex at load. - profile.py: load/save an actions[] block; ElmLink/MockLink read_raw(hex). - GUI: Diagnostics -> Service & Bi-directional dialog -- lists the profile's actions with risk badges; caution/danger gated behind a warning confirmation. - generic-obd2: two safe STANDARD actions (Tester-Present ping; ECU-Reset, caution + engine-off warning). PROFILE_SPEC.md documents the actions schema + safety rules. - tests/test_actions.py: runner, session+reset, security handshake, unknown-algo block, hex validation, profile load. All 5 suites pass. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
193 lines
6.5 KiB
Python
193 lines
6.5 KiB
Python
"""Vehicle profiles -- load/save/list the JSON files under profiles/.
|
|
|
|
A profile is pure data: vehicle metadata, PID definitions (with safe formula
|
|
strings), DTC meanings, and named presets (perspectives). Loading a profile
|
|
compiles each PID's formula into a decode callable; nothing in a profile can
|
|
execute arbitrary code (see formula.py).
|
|
|
|
JSON schema (schema=1):
|
|
{
|
|
"schema": 1,
|
|
"meta": {"name","make","model","years","engine","author","version",
|
|
"protocol","notes"},
|
|
"pids": [{"key","name","mode","pid","nbytes","formula","unit","group",
|
|
"vmin","vmax","confidence","round","deps","notes"}, ...],
|
|
"presets": {"crank":[keys...], ...},
|
|
"dtcs": [{"code","desc","system","no_start","causes"}, ...]
|
|
}
|
|
"""
|
|
import glob
|
|
import json
|
|
import os
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
|
|
from .formula import compile_formula
|
|
from .registry import Pid, Dtc
|
|
from .actions import Action, ActionStep, validate_action
|
|
|
|
SCHEMA = 1
|
|
BYTE_VARS = [chr(65 + i) for i in range(8)] # A..H
|
|
|
|
|
|
@dataclass
|
|
class Profile:
|
|
meta: dict
|
|
pids: list
|
|
dtcs: list
|
|
presets: dict
|
|
path: str = None
|
|
actions: list = None
|
|
|
|
@property
|
|
def name(self):
|
|
return self.meta.get("name", "Unnamed profile")
|
|
|
|
|
|
def profiles_dir():
|
|
# In a PyInstaller bundle, profiles/ is added as data and unpacked under
|
|
# sys._MEIPASS; otherwise it's the repo's profiles/ next to obdcore/.
|
|
if getattr(sys, "frozen", False):
|
|
base = getattr(sys, "_MEIPASS", os.path.dirname(sys.executable))
|
|
return os.path.join(base, "profiles")
|
|
return os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
|
"profiles")
|
|
|
|
|
|
def _round(v, rnd):
|
|
if rnd is None:
|
|
return v
|
|
return int(round(v)) if rnd == 0 else round(v, rnd)
|
|
|
|
|
|
def _build_decode(d):
|
|
mode = d.get("mode", "22")
|
|
rnd = d.get("round")
|
|
if mode == "atrv":
|
|
return None
|
|
formula = d.get("formula", "")
|
|
if mode == "derived":
|
|
deps = tuple(d.get("deps", ()))
|
|
fn = compile_formula(formula, deps)
|
|
def dec(vals, fn=fn, deps=deps, rnd=rnd):
|
|
return _round(fn(dict(zip(deps, vals))), rnd)
|
|
return dec
|
|
fn = compile_formula(formula, BYTE_VARS)
|
|
def dec(raw, fn=fn, rnd=rnd):
|
|
names = {BYTE_VARS[i]: raw[i] for i in range(min(len(raw), 8))}
|
|
return _round(fn(names), rnd)
|
|
return dec
|
|
|
|
|
|
def _pid_from_dict(d):
|
|
return Pid(
|
|
key=d["key"], name=d.get("name", d["key"]), mode=d.get("mode", "22"),
|
|
pid=d.get("pid", ""), nbytes=d.get("nbytes", 2),
|
|
formula=d.get("formula", ""), decode=_build_decode(d),
|
|
unit=d.get("unit", ""), group=d.get("group", "misc"),
|
|
vmin=d.get("vmin", 0.0), vmax=d.get("vmax", 100.0),
|
|
confidence=d.get("confidence", "verified"), round=d.get("round"),
|
|
deps=tuple(d.get("deps", ())), notes=d.get("notes", ""),
|
|
warn_hi=d.get("warn_hi"), redline_hi=d.get("redline_hi"),
|
|
warn_lo=d.get("warn_lo"), redline_lo=d.get("redline_lo"),
|
|
)
|
|
|
|
|
|
def load_profile(path):
|
|
with open(path) as f:
|
|
raw = json.load(f)
|
|
if raw.get("schema", 1) != SCHEMA:
|
|
raise ValueError(f"unsupported profile schema {raw.get('schema')} in {path}")
|
|
pids = [_pid_from_dict(d) for d in raw.get("pids", [])]
|
|
dtcs = [Dtc(code=x["code"], desc=x.get("desc", ""), system=x.get("system", "powertrain"),
|
|
no_start=x.get("no_start", False), causes=x.get("causes", ""))
|
|
for x in raw.get("dtcs", [])]
|
|
actions = []
|
|
for a in raw.get("actions", []):
|
|
act = Action(
|
|
key=a["key"], name=a.get("name", a["key"]), kind=a.get("kind", "test"),
|
|
risk=a.get("risk", "safe"), description=a.get("description", ""),
|
|
warning=a.get("warning", ""), session=a.get("session"),
|
|
security=a.get("security"),
|
|
steps=[ActionStep(send=s["send"], expect=s.get("expect", ""))
|
|
for s in a.get("steps", [])],
|
|
success_msg=a.get("success_msg", "Done."))
|
|
validate_action(act) # rejects malformed hex
|
|
actions.append(act)
|
|
return Profile(meta=raw.get("meta", {}), pids=pids, dtcs=dtcs,
|
|
presets=raw.get("presets", {}), path=path, actions=actions)
|
|
|
|
|
|
def _pid_to_dict(p):
|
|
d = {"key": p.key, "name": p.name, "mode": p.mode}
|
|
if p.pid:
|
|
d["pid"] = p.pid
|
|
if p.mode in ("01", "22"):
|
|
d["nbytes"] = p.nbytes
|
|
if p.formula:
|
|
d["formula"] = p.formula
|
|
if p.deps:
|
|
d["deps"] = list(p.deps)
|
|
d.update({"unit": p.unit, "group": p.group, "vmin": p.vmin, "vmax": p.vmax,
|
|
"confidence": p.confidence})
|
|
if p.round is not None:
|
|
d["round"] = p.round
|
|
for z in ("warn_hi", "redline_hi", "warn_lo", "redline_lo"):
|
|
if getattr(p, z) is not None:
|
|
d[z] = getattr(p, z)
|
|
if p.notes:
|
|
d["notes"] = p.notes
|
|
return d
|
|
|
|
|
|
def save_profile(profile, path=None):
|
|
path = path or profile.path
|
|
out = {
|
|
"schema": SCHEMA,
|
|
"meta": profile.meta,
|
|
"pids": [_pid_to_dict(p) for p in profile.pids],
|
|
"presets": profile.presets,
|
|
"dtcs": [{"code": d.code, "desc": d.desc, "system": d.system,
|
|
"no_start": d.no_start, "causes": d.causes} for d in profile.dtcs],
|
|
}
|
|
if profile.actions:
|
|
out["actions"] = []
|
|
for a in profile.actions:
|
|
ad = {"key": a.key, "name": a.name, "kind": a.kind, "risk": a.risk}
|
|
if a.description: ad["description"] = a.description
|
|
if a.warning: ad["warning"] = a.warning
|
|
if a.session: ad["session"] = a.session
|
|
if a.security: ad["security"] = a.security
|
|
ad["steps"] = [{"send": s.send, **({"expect": s.expect} if s.expect else {})}
|
|
for s in a.steps]
|
|
if a.success_msg != "Done.": ad["success_msg"] = a.success_msg
|
|
out["actions"].append(ad)
|
|
with open(path, "w") as f:
|
|
json.dump(out, f, indent=2)
|
|
return path
|
|
|
|
|
|
def list_profiles(directory=None):
|
|
"""Return [(path, meta_dict), ...] for every *.json profile in directory."""
|
|
directory = directory or profiles_dir()
|
|
out = []
|
|
for p in sorted(glob.glob(os.path.join(directory, "*.json"))):
|
|
try:
|
|
with open(p) as f:
|
|
meta = json.load(f).get("meta", {})
|
|
out.append((p, meta))
|
|
except Exception:
|
|
continue
|
|
return out
|
|
|
|
|
|
DEFAULT_PROFILE = "ford-6.0-powerstroke.json"
|
|
|
|
|
|
def default_profile_path():
|
|
return os.path.join(profiles_dir(), DEFAULT_PROFILE)
|
|
|
|
|
|
def load_default():
|
|
return load_profile(default_profile_path())
|