"""Standard OBD-II service decoders (vehicle-agnostic, no serial). Covers the generic SAE J1979 services OBDash exposes beyond live PIDs: - Mode 09 vehicle info (VIN, calibration IDs, ECU name) - Mode 01 PID 01 readiness / I-M monitors (+ MIL, DTC count) - Mode 02 freeze-frame (the standard PID set captured when a DTC set) These are pure functions over raw response byte lists so they can be unit tested without hardware; ElmLink wraps them with the actual reads. """ _VIN_CHARS = set("ABCDEFGHJKLMNPRSTUVWXYZ0123456789") # no I, O, Q def decode_vin(data): """data: flattened response bytes for '0902' (49 02 <17 ASCII>). Returns the 17-char VIN or None.""" if 0x49 not in data: return None i = data.index(0x49) rest = data[i + 1:] # drop the service-PID (02) and number-of-data-items byte if present if rest and rest[0] == 0x02: rest = rest[2:] chars = "".join(chr(b) for b in rest if chr(b).upper() in _VIN_CHARS) return chars[-17:] if len(chars) >= 17 else (chars or None) def decode_ascii_block(data, svc_pid): """Generic mode-09 ASCII payload (calibration IDs / ECU name) for svc_pid (e.g. 0x04, 0x0A). Returns a printable string or None.""" if 0x49 not in data: return None i = data.index(0x49) rest = data[i + 1:] if rest and rest[0] == svc_pid: rest = rest[2:] s = "".join(chr(b) for b in rest if 0x20 <= b < 0x7F).strip() return s or None # Readiness monitor names by ignition type (SAE J1979 PID 01, bytes C/D bit map) _SPARK = ["Catalyst", "Heated Catalyst", "Evap System", "Secondary Air System", "A/C Refrigerant", "O2 Sensor", "O2 Sensor Heater", "EGR System"] _COMPRESSION = ["NMHC Catalyst", "NOx/SCR Aftertreatment", "-", "Boost Pressure", "-", "Exhaust Gas Sensor", "PM Filter", "EGR/VVT System"] _CONTINUOUS = [("Misfire", 0), ("Fuel System", 1), ("Components", 2)] def decode_readiness(data): """data: 4 bytes [A,B,C,D] from Mode 01 PID 01. Returns a dict: {mil, dtc_count, ignition, monitors:[{name, ready}], ready_count, total}.""" if len(data) < 4: return None a, b, c, d = data[0], data[1], data[2], data[3] compression = bool(b & 0x08) monitors = [] # continuous monitors: B bits 0-2 supported, bits 4-6 = incomplete for name, i in _CONTINUOUS: if b & (1 << i): monitors.append({"name": name, "ready": not (b & (1 << (i + 4)))}) # non-continuous: C bits = supported, D bits = incomplete names = _COMPRESSION if compression else _SPARK for i, name in enumerate(names): if name == "-": continue if c & (1 << i): monitors.append({"name": name, "ready": not (d & (1 << i))}) ready = sum(1 for m in monitors if m["ready"]) return { "mil": bool(a & 0x80), "dtc_count": a & 0x7F, "ignition": "compression" if compression else "spark", "monitors": monitors, "ready_count": ready, "total": len(monitors), } # Standard PID set read from the freeze frame (Mode 02, frame 0). # (key, pid hex, nbytes, decoder(bytes)->value, unit) FREEZE_PIDS = [ ("Fuel System Status", "03", 2, lambda b: b[0], ""), ("Engine Load", "04", 1, lambda b: round(b[0] * 100 / 255), "%"), ("Coolant Temp", "05", 1, lambda b: b[0] - 40, "C"), ("Short Term Fuel Trim", "06", 1, lambda b: round(b[0] * 100 / 128 - 100), "%"), ("Long Term Fuel Trim", "07", 1, lambda b: round(b[0] * 100 / 128 - 100), "%"), ("Intake MAP", "0B", 1, lambda b: b[0], "kPa"), ("Engine RPM", "0C", 2, lambda b: round((b[0] * 256 + b[1]) / 4), "rpm"), ("Vehicle Speed", "0D", 1, lambda b: b[0], "km/h"), ("Intake Air Temp", "0F", 1, lambda b: b[0] - 40, "C"), ("MAF", "10", 2, lambda b: round((b[0] * 256 + b[1]) / 100, 1), "g/s"), ("Throttle Position", "11", 1, lambda b: round(b[0] * 100 / 255), "%"), ]