Files
justin d435384b58 #2 (framework): bi-directional / service-function engine
Profile-defined UDS action sequences, run safely -- the framework for #2 (real
per-vehicle actuator tests/resets are follow-on, added as verified profile data).

- obdcore/actions.py: Action model + run_action() executing session (Mode 10) ->
  security (Mode 27 seed->key) -> command steps (2F/31/11/3E/... any hex) with
  positive/negative response checks. Security KEY algorithms are per-vehicle
  secrets and NOT bundled -- only trivial transforms (xor-ff/invert/add-ff)
  known; an action naming an unknown algorithm is BLOCKED (fails safe). Never
  synthesizes bytes -- runs only what the profile defines. validate_action()
  rejects malformed hex at load.
- profile.py: load/save an actions[] block; ElmLink/MockLink read_raw(hex).
- GUI: Diagnostics -> Service & Bi-directional dialog -- lists the profile's
  actions with risk badges; caution/danger gated behind a warning confirmation.
- generic-obd2: two safe STANDARD actions (Tester-Present ping; ECU-Reset,
  caution + engine-off warning). PROFILE_SPEC.md documents the actions schema
  + safety rules.
- tests/test_actions.py: runner, session+reset, security handshake, unknown-algo
  block, hex validation, profile load. All 5 suites pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
2026-07-01 16:33:51 -04:00

193 lines
6.5 KiB
Python

"""Vehicle profiles -- load/save/list the JSON files under profiles/.
A profile is pure data: vehicle metadata, PID definitions (with safe formula
strings), DTC meanings, and named presets (perspectives). Loading a profile
compiles each PID's formula into a decode callable; nothing in a profile can
execute arbitrary code (see formula.py).
JSON schema (schema=1):
{
"schema": 1,
"meta": {"name","make","model","years","engine","author","version",
"protocol","notes"},
"pids": [{"key","name","mode","pid","nbytes","formula","unit","group",
"vmin","vmax","confidence","round","deps","notes"}, ...],
"presets": {"crank":[keys...], ...},
"dtcs": [{"code","desc","system","no_start","causes"}, ...]
}
"""
import glob
import json
import os
import sys
from dataclasses import dataclass, field
from .formula import compile_formula
from .registry import Pid, Dtc
from .actions import Action, ActionStep, validate_action
SCHEMA = 1
BYTE_VARS = [chr(65 + i) for i in range(8)] # A..H
@dataclass
class Profile:
meta: dict
pids: list
dtcs: list
presets: dict
path: str = None
actions: list = None
@property
def name(self):
return self.meta.get("name", "Unnamed profile")
def profiles_dir():
# In a PyInstaller bundle, profiles/ is added as data and unpacked under
# sys._MEIPASS; otherwise it's the repo's profiles/ next to obdcore/.
if getattr(sys, "frozen", False):
base = getattr(sys, "_MEIPASS", os.path.dirname(sys.executable))
return os.path.join(base, "profiles")
return os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"profiles")
def _round(v, rnd):
if rnd is None:
return v
return int(round(v)) if rnd == 0 else round(v, rnd)
def _build_decode(d):
mode = d.get("mode", "22")
rnd = d.get("round")
if mode == "atrv":
return None
formula = d.get("formula", "")
if mode == "derived":
deps = tuple(d.get("deps", ()))
fn = compile_formula(formula, deps)
def dec(vals, fn=fn, deps=deps, rnd=rnd):
return _round(fn(dict(zip(deps, vals))), rnd)
return dec
fn = compile_formula(formula, BYTE_VARS)
def dec(raw, fn=fn, rnd=rnd):
names = {BYTE_VARS[i]: raw[i] for i in range(min(len(raw), 8))}
return _round(fn(names), rnd)
return dec
def _pid_from_dict(d):
return Pid(
key=d["key"], name=d.get("name", d["key"]), mode=d.get("mode", "22"),
pid=d.get("pid", ""), nbytes=d.get("nbytes", 2),
formula=d.get("formula", ""), decode=_build_decode(d),
unit=d.get("unit", ""), group=d.get("group", "misc"),
vmin=d.get("vmin", 0.0), vmax=d.get("vmax", 100.0),
confidence=d.get("confidence", "verified"), round=d.get("round"),
deps=tuple(d.get("deps", ())), notes=d.get("notes", ""),
warn_hi=d.get("warn_hi"), redline_hi=d.get("redline_hi"),
warn_lo=d.get("warn_lo"), redline_lo=d.get("redline_lo"),
)
def load_profile(path):
with open(path) as f:
raw = json.load(f)
if raw.get("schema", 1) != SCHEMA:
raise ValueError(f"unsupported profile schema {raw.get('schema')} in {path}")
pids = [_pid_from_dict(d) for d in raw.get("pids", [])]
dtcs = [Dtc(code=x["code"], desc=x.get("desc", ""), system=x.get("system", "powertrain"),
no_start=x.get("no_start", False), causes=x.get("causes", ""))
for x in raw.get("dtcs", [])]
actions = []
for a in raw.get("actions", []):
act = Action(
key=a["key"], name=a.get("name", a["key"]), kind=a.get("kind", "test"),
risk=a.get("risk", "safe"), description=a.get("description", ""),
warning=a.get("warning", ""), session=a.get("session"),
security=a.get("security"),
steps=[ActionStep(send=s["send"], expect=s.get("expect", ""))
for s in a.get("steps", [])],
success_msg=a.get("success_msg", "Done."))
validate_action(act) # rejects malformed hex
actions.append(act)
return Profile(meta=raw.get("meta", {}), pids=pids, dtcs=dtcs,
presets=raw.get("presets", {}), path=path, actions=actions)
def _pid_to_dict(p):
d = {"key": p.key, "name": p.name, "mode": p.mode}
if p.pid:
d["pid"] = p.pid
if p.mode in ("01", "22"):
d["nbytes"] = p.nbytes
if p.formula:
d["formula"] = p.formula
if p.deps:
d["deps"] = list(p.deps)
d.update({"unit": p.unit, "group": p.group, "vmin": p.vmin, "vmax": p.vmax,
"confidence": p.confidence})
if p.round is not None:
d["round"] = p.round
for z in ("warn_hi", "redline_hi", "warn_lo", "redline_lo"):
if getattr(p, z) is not None:
d[z] = getattr(p, z)
if p.notes:
d["notes"] = p.notes
return d
def save_profile(profile, path=None):
path = path or profile.path
out = {
"schema": SCHEMA,
"meta": profile.meta,
"pids": [_pid_to_dict(p) for p in profile.pids],
"presets": profile.presets,
"dtcs": [{"code": d.code, "desc": d.desc, "system": d.system,
"no_start": d.no_start, "causes": d.causes} for d in profile.dtcs],
}
if profile.actions:
out["actions"] = []
for a in profile.actions:
ad = {"key": a.key, "name": a.name, "kind": a.kind, "risk": a.risk}
if a.description: ad["description"] = a.description
if a.warning: ad["warning"] = a.warning
if a.session: ad["session"] = a.session
if a.security: ad["security"] = a.security
ad["steps"] = [{"send": s.send, **({"expect": s.expect} if s.expect else {})}
for s in a.steps]
if a.success_msg != "Done.": ad["success_msg"] = a.success_msg
out["actions"].append(ad)
with open(path, "w") as f:
json.dump(out, f, indent=2)
return path
def list_profiles(directory=None):
"""Return [(path, meta_dict), ...] for every *.json profile in directory."""
directory = directory or profiles_dir()
out = []
for p in sorted(glob.glob(os.path.join(directory, "*.json"))):
try:
with open(p) as f:
meta = json.load(f).get("meta", {})
out.append((p, meta))
except Exception:
continue
return out
DEFAULT_PROFILE = "ford-6.0-powerstroke.json"
def default_profile_path():
return os.path.join(profiles_dir(), DEFAULT_PROFILE)
def load_default():
return load_profile(default_profile_path())