Section 1 backend: VIN/Mode-09, readiness monitors, freeze-frame, trip/perf

obdcore additions (all standard SAE J1979, vehicle-agnostic, hardware-free
tested):
- obdservices.py: decode_vin (Mode 09), decode_readiness (Mode 01 PID 01 I-M
  monitors + MIL + DTC count, spark/diesel monitor sets), freeze-frame PID set.
- link.py: ElmLink.read_vehicle_info (VIN/cal/ECU), read_readiness, read_freeze_frame.
- trip.py: TripComputer (MAF-based MPG + trip totals) and PerformanceMeter
  (0-60 / 1/4-mile with launch detection).
- mock.py: speed/MAF/readiness + service stubs for GUI mock mode.
- tests/test_services.py: VIN, readiness bit decode, trip math, 0-60/quarter.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
2026-06-30 19:37:48 -04:00
parent 310d5a3497
commit 6c1ee0c81d
5 changed files with 341 additions and 2 deletions
+35
View File
@@ -172,6 +172,41 @@ class ElmLink:
data = self._bytes(lines)
return 0x44 in data or ("OK" in "".join(lines).upper())
# -- standard OBD services (Mode 09 / 01-01 / 02) --
def read_vehicle_info(self, timeout=2.0):
"""Mode 09: VIN + calibration IDs + ECU name. Returns a dict."""
from . import obdservices as svc
vin = svc.decode_vin(self._bytes(self.cmd("0902", timeout=timeout)))
cal = svc.decode_ascii_block(self._bytes(self.cmd("0904", timeout=timeout)), 0x04)
ecu = svc.decode_ascii_block(self._bytes(self.cmd("090A", timeout=timeout)), 0x0A)
return {"vin": vin, "calibration": cal, "ecu_name": ecu}
def read_readiness(self, timeout=1.0):
"""Mode 01 PID 01: MIL, DTC count, and I-M readiness monitors."""
from . import obdservices as svc
data = self.read_m01("01", 4, timeout=timeout)
return svc.decode_readiness(data) if data else None
def read_freeze_frame(self, timeout=0.6):
"""Mode 02: the DTC that set the freeze frame + the standard PID snapshot."""
from . import obdservices as svc
out = {"dtc": None, "values": []}
d = self._bytes(self.cmd("0202", timeout=timeout))
if 0x42 in d:
r = d[d.index(0x42) + 2:] # after '42 02'
if len(r) >= 2 and (r[0] or r[1]):
out["dtc"] = decode_dtc(r[0], r[1])
for name, pid, nbytes, dec, unit in svc.FREEZE_PIDS:
dd = self._bytes(self.cmd(f"02{pid}00", timeout=timeout))
if 0x42 in dd:
payload = dd[dd.index(0x42) + 3:dd.index(0x42) + 3 + nbytes]
if len(payload) == nbytes:
try:
out["values"].append((name, dec(payload), unit))
except Exception:
pass
return out
def close(self):
try:
self.ser.close()
+23 -2
View File
@@ -42,10 +42,18 @@ class MockLink:
return None # everything else: no response
def read_m01(self, pid, nbytes, timeout=0.6):
if pid == "0C": # RPM 0 at rest
return [0x00, 0x00]
if pid == "0C": # RPM ~750 idle
v = 750 * 4
return [(v >> 8) & 0xFF, v & 0xFF]
if pid == "05": # ECT 82C
return [122]
if pid == "0D": # speed 48 km/h
return [48]
if pid == "10": # MAF 12.0 g/s
v = 1200
return [(v >> 8) & 0xFF, v & 0xFF]
if pid == "01": # readiness: MIL off, 0 DTCs, mixed monitors
return [0x00, 0x07, 0x61, 0x20]
return None
def read_atrv(self, timeout=0.8):
@@ -58,5 +66,18 @@ class MockLink:
def clear_dtcs(self):
return True
def read_vehicle_info(self, timeout=2.0):
return {"vin": "1FMZU73E12ZA12345", "calibration": "JR3A-12A650-BCD",
"ecu_name": "ECM-EngineControl"}
def read_readiness(self, timeout=1.0):
from . import obdservices as svc
return svc.decode_readiness([0x00, 0x07, 0x61, 0x20])
def read_freeze_frame(self, timeout=0.6):
return {"dtc": "P0148",
"values": [("Engine RPM", 240, "rpm"), ("Coolant Temp", 33, "C"),
("Engine Load", 18, "%"), ("Vehicle Speed", 0, "km/h")]}
def close(self):
pass
+94
View File
@@ -0,0 +1,94 @@
"""Standard OBD-II service decoders (vehicle-agnostic, no serial).
Covers the generic SAE J1979 services OBDash exposes beyond live PIDs:
- Mode 09 vehicle info (VIN, calibration IDs, ECU name)
- Mode 01 PID 01 readiness / I-M monitors (+ MIL, DTC count)
- Mode 02 freeze-frame (the standard PID set captured when a DTC set)
These are pure functions over raw response byte lists so they can be unit
tested without hardware; ElmLink wraps them with the actual reads.
"""
_VIN_CHARS = set("ABCDEFGHJKLMNPRSTUVWXYZ0123456789") # no I, O, Q
def decode_vin(data):
"""data: flattened response bytes for '0902' (49 02 <NODI> <17 ASCII>).
Returns the 17-char VIN or None."""
if 0x49 not in data:
return None
i = data.index(0x49)
rest = data[i + 1:]
# drop the service-PID (02) and number-of-data-items byte if present
if rest and rest[0] == 0x02:
rest = rest[2:]
chars = "".join(chr(b) for b in rest if chr(b).upper() in _VIN_CHARS)
return chars[-17:] if len(chars) >= 17 else (chars or None)
def decode_ascii_block(data, svc_pid):
"""Generic mode-09 ASCII payload (calibration IDs / ECU name) for svc_pid
(e.g. 0x04, 0x0A). Returns a printable string or None."""
if 0x49 not in data:
return None
i = data.index(0x49)
rest = data[i + 1:]
if rest and rest[0] == svc_pid:
rest = rest[2:]
s = "".join(chr(b) for b in rest if 0x20 <= b < 0x7F).strip()
return s or None
# Readiness monitor names by ignition type (SAE J1979 PID 01, bytes C/D bit map)
_SPARK = ["Catalyst", "Heated Catalyst", "Evap System", "Secondary Air System",
"A/C Refrigerant", "O2 Sensor", "O2 Sensor Heater", "EGR System"]
_COMPRESSION = ["NMHC Catalyst", "NOx/SCR Aftertreatment", "-", "Boost Pressure",
"-", "Exhaust Gas Sensor", "PM Filter", "EGR/VVT System"]
_CONTINUOUS = [("Misfire", 0), ("Fuel System", 1), ("Components", 2)]
def decode_readiness(data):
"""data: 4 bytes [A,B,C,D] from Mode 01 PID 01. Returns a dict:
{mil, dtc_count, ignition, monitors:[{name, ready}], ready_count, total}."""
if len(data) < 4:
return None
a, b, c, d = data[0], data[1], data[2], data[3]
compression = bool(b & 0x08)
monitors = []
# continuous monitors: B bits 0-2 supported, bits 4-6 = incomplete
for name, i in _CONTINUOUS:
if b & (1 << i):
monitors.append({"name": name, "ready": not (b & (1 << (i + 4)))})
# non-continuous: C bits = supported, D bits = incomplete
names = _COMPRESSION if compression else _SPARK
for i, name in enumerate(names):
if name == "-":
continue
if c & (1 << i):
monitors.append({"name": name, "ready": not (d & (1 << i))})
ready = sum(1 for m in monitors if m["ready"])
return {
"mil": bool(a & 0x80),
"dtc_count": a & 0x7F,
"ignition": "compression" if compression else "spark",
"monitors": monitors,
"ready_count": ready,
"total": len(monitors),
}
# Standard PID set read from the freeze frame (Mode 02, frame 0).
# (key, pid hex, nbytes, decoder(bytes)->value, unit)
FREEZE_PIDS = [
("Fuel System Status", "03", 2, lambda b: b[0], ""),
("Engine Load", "04", 1, lambda b: round(b[0] * 100 / 255), "%"),
("Coolant Temp", "05", 1, lambda b: b[0] - 40, "C"),
("Short Term Fuel Trim", "06", 1, lambda b: round(b[0] * 100 / 128 - 100), "%"),
("Long Term Fuel Trim", "07", 1, lambda b: round(b[0] * 100 / 128 - 100), "%"),
("Intake MAP", "0B", 1, lambda b: b[0], "kPa"),
("Engine RPM", "0C", 2, lambda b: round((b[0] * 256 + b[1]) / 4), "rpm"),
("Vehicle Speed", "0D", 1, lambda b: b[0], "km/h"),
("Intake Air Temp", "0F", 1, lambda b: b[0] - 40, "C"),
("MAF", "10", 2, lambda b: round((b[0] * 256 + b[1]) / 100, 1), "g/s"),
("Throttle Position", "11", 1, lambda b: round(b[0] * 100 / 255), "%"),
]
+119
View File
@@ -0,0 +1,119 @@
"""Trip computer + performance timers (computed from live data, no serial).
TripComputer estimates fuel economy and trip totals from vehicle speed (km/h)
and MAF (g/s) -- the same MAF-based estimate Torque/etc. use. PerformanceMeter
measures 0-60 mph and 1/4-mile times, auto-detecting a launch from a near-stop.
Both are fed time-stamped samples and are unit tested without hardware.
"""
KMH_PER_MPH = 1.609344
GAS_AFR = 14.7 # stoichiometric air:fuel for gasoline
GAS_G_PER_GAL = 2835.0 # ~6.25 lb/gal
def _gph(maf_gps, afr=GAS_AFR, g_per_gal=GAS_G_PER_GAL):
return (maf_gps / afr) * 3600.0 / g_per_gal
class TripComputer:
def __init__(self, afr=GAS_AFR, g_per_gal=GAS_G_PER_GAL):
self.afr, self.g_per_gal = afr, g_per_gal
self.reset()
def reset(self):
self.dist_km = 0.0
self.fuel_gal = 0.0
self.moving_s = 0.0
self.elapsed_s = 0.0
self._t = None
def update(self, t, speed_kmh, maf_gps):
if self._t is None:
self._t = t
return
dt = t - self._t
self._t = t
if dt <= 0 or dt > 5: # ignore gaps / first sample
return
self.elapsed_s += dt
if speed_kmh:
self.dist_km += speed_kmh * dt / 3600.0
if speed_kmh > 1.0:
self.moving_s += dt
if maf_gps:
self.fuel_gal += _gph(maf_gps, self.afr, self.g_per_gal) * dt / 3600.0
def instant_mpg(self, speed_kmh, maf_gps):
if not speed_kmh or not maf_gps:
return 0.0
gph = _gph(maf_gps, self.afr, self.g_per_gal)
return (speed_kmh / KMH_PER_MPH) / gph if gph > 0 else 0.0
def avg_mpg(self):
miles = self.dist_km / KMH_PER_MPH
return miles / self.fuel_gal if self.fuel_gal > 0 else 0.0
def stats(self):
return {
"distance_mi": round(self.dist_km / KMH_PER_MPH, 2),
"fuel_gal": round(self.fuel_gal, 3),
"avg_mpg": round(self.avg_mpg(), 1),
"elapsed_s": round(self.elapsed_s, 1),
"moving_s": round(self.moving_s, 1),
}
class PerformanceMeter:
"""0-60 mph and 1/4-mile timers. A run starts when the car pulls away from a
near-stop and ends at the 1/4 mile or when it stops; best times are kept."""
QUARTER_M = 402.336
def __init__(self):
self.best_0_60 = None
self.best_quarter = None
self.last_0_60 = None
self.last_quarter = None
self._reset_run()
def _reset_run(self):
self._running = False
self._t0 = None
self._dist_m = 0.0
self._last_t = None
self._stop_t = None
self._did60 = False
def update(self, t, speed_kmh):
if speed_kmh is None:
return
mph = speed_kmh / KMH_PER_MPH
if not self._running:
if mph < 1.0:
self._stop_t = t # parked, ready to launch
elif self._stop_t is not None:
self._running = True # launch detected
self._t0 = self._stop_t
self._dist_m = 0.0
self._last_t = t
self._did60 = False
return
dt = t - self._last_t
self._last_t = t
if dt <= 0 or dt > 5:
self._reset_run()
return
self._dist_m += (speed_kmh / 3.6) * dt
if mph >= 60 and not self._did60:
self._did60 = True
self.last_0_60 = round(t - self._t0, 2)
if self.best_0_60 is None or self.last_0_60 < self.best_0_60:
self.best_0_60 = self.last_0_60
if self._dist_m >= self.QUARTER_M:
self.last_quarter = round(t - self._t0, 2)
if self.best_quarter is None or self.last_quarter < self.best_quarter:
self.best_quarter = self.last_quarter
self._reset_run()
elif mph < 1.0:
self._reset_run()