diff --git a/obdcore/link.py b/obdcore/link.py index 2b9b8e8..b268db3 100644 --- a/obdcore/link.py +++ b/obdcore/link.py @@ -172,6 +172,41 @@ class ElmLink: data = self._bytes(lines) return 0x44 in data or ("OK" in "".join(lines).upper()) + # -- standard OBD services (Mode 09 / 01-01 / 02) -- + def read_vehicle_info(self, timeout=2.0): + """Mode 09: VIN + calibration IDs + ECU name. Returns a dict.""" + from . import obdservices as svc + vin = svc.decode_vin(self._bytes(self.cmd("0902", timeout=timeout))) + cal = svc.decode_ascii_block(self._bytes(self.cmd("0904", timeout=timeout)), 0x04) + ecu = svc.decode_ascii_block(self._bytes(self.cmd("090A", timeout=timeout)), 0x0A) + return {"vin": vin, "calibration": cal, "ecu_name": ecu} + + def read_readiness(self, timeout=1.0): + """Mode 01 PID 01: MIL, DTC count, and I-M readiness monitors.""" + from . import obdservices as svc + data = self.read_m01("01", 4, timeout=timeout) + return svc.decode_readiness(data) if data else None + + def read_freeze_frame(self, timeout=0.6): + """Mode 02: the DTC that set the freeze frame + the standard PID snapshot.""" + from . import obdservices as svc + out = {"dtc": None, "values": []} + d = self._bytes(self.cmd("0202", timeout=timeout)) + if 0x42 in d: + r = d[d.index(0x42) + 2:] # after '42 02' + if len(r) >= 2 and (r[0] or r[1]): + out["dtc"] = decode_dtc(r[0], r[1]) + for name, pid, nbytes, dec, unit in svc.FREEZE_PIDS: + dd = self._bytes(self.cmd(f"02{pid}00", timeout=timeout)) + if 0x42 in dd: + payload = dd[dd.index(0x42) + 3:dd.index(0x42) + 3 + nbytes] + if len(payload) == nbytes: + try: + out["values"].append((name, dec(payload), unit)) + except Exception: + pass + return out + def close(self): try: self.ser.close() diff --git a/obdcore/mock.py b/obdcore/mock.py index 87b26b2..60bb094 100644 --- a/obdcore/mock.py +++ b/obdcore/mock.py @@ -42,10 +42,18 @@ class MockLink: return None # everything else: no response def read_m01(self, pid, nbytes, timeout=0.6): - if pid == "0C": # RPM 0 at rest - return [0x00, 0x00] + if pid == "0C": # RPM ~750 idle + v = 750 * 4 + return [(v >> 8) & 0xFF, v & 0xFF] if pid == "05": # ECT 82C return [122] + if pid == "0D": # speed 48 km/h + return [48] + if pid == "10": # MAF 12.0 g/s + v = 1200 + return [(v >> 8) & 0xFF, v & 0xFF] + if pid == "01": # readiness: MIL off, 0 DTCs, mixed monitors + return [0x00, 0x07, 0x61, 0x20] return None def read_atrv(self, timeout=0.8): @@ -58,5 +66,18 @@ class MockLink: def clear_dtcs(self): return True + def read_vehicle_info(self, timeout=2.0): + return {"vin": "1FMZU73E12ZA12345", "calibration": "JR3A-12A650-BCD", + "ecu_name": "ECM-EngineControl"} + + def read_readiness(self, timeout=1.0): + from . import obdservices as svc + return svc.decode_readiness([0x00, 0x07, 0x61, 0x20]) + + def read_freeze_frame(self, timeout=0.6): + return {"dtc": "P0148", + "values": [("Engine RPM", 240, "rpm"), ("Coolant Temp", 33, "C"), + ("Engine Load", 18, "%"), ("Vehicle Speed", 0, "km/h")]} + def close(self): pass diff --git a/obdcore/obdservices.py b/obdcore/obdservices.py new file mode 100644 index 0000000..79d42c3 --- /dev/null +++ b/obdcore/obdservices.py @@ -0,0 +1,94 @@ +"""Standard OBD-II service decoders (vehicle-agnostic, no serial). + +Covers the generic SAE J1979 services OBDash exposes beyond live PIDs: + - Mode 09 vehicle info (VIN, calibration IDs, ECU name) + - Mode 01 PID 01 readiness / I-M monitors (+ MIL, DTC count) + - Mode 02 freeze-frame (the standard PID set captured when a DTC set) + +These are pure functions over raw response byte lists so they can be unit +tested without hardware; ElmLink wraps them with the actual reads. +""" + +_VIN_CHARS = set("ABCDEFGHJKLMNPRSTUVWXYZ0123456789") # no I, O, Q + + +def decode_vin(data): + """data: flattened response bytes for '0902' (49 02 <17 ASCII>). + Returns the 17-char VIN or None.""" + if 0x49 not in data: + return None + i = data.index(0x49) + rest = data[i + 1:] + # drop the service-PID (02) and number-of-data-items byte if present + if rest and rest[0] == 0x02: + rest = rest[2:] + chars = "".join(chr(b) for b in rest if chr(b).upper() in _VIN_CHARS) + return chars[-17:] if len(chars) >= 17 else (chars or None) + + +def decode_ascii_block(data, svc_pid): + """Generic mode-09 ASCII payload (calibration IDs / ECU name) for svc_pid + (e.g. 0x04, 0x0A). Returns a printable string or None.""" + if 0x49 not in data: + return None + i = data.index(0x49) + rest = data[i + 1:] + if rest and rest[0] == svc_pid: + rest = rest[2:] + s = "".join(chr(b) for b in rest if 0x20 <= b < 0x7F).strip() + return s or None + + +# Readiness monitor names by ignition type (SAE J1979 PID 01, bytes C/D bit map) +_SPARK = ["Catalyst", "Heated Catalyst", "Evap System", "Secondary Air System", + "A/C Refrigerant", "O2 Sensor", "O2 Sensor Heater", "EGR System"] +_COMPRESSION = ["NMHC Catalyst", "NOx/SCR Aftertreatment", "-", "Boost Pressure", + "-", "Exhaust Gas Sensor", "PM Filter", "EGR/VVT System"] +_CONTINUOUS = [("Misfire", 0), ("Fuel System", 1), ("Components", 2)] + + +def decode_readiness(data): + """data: 4 bytes [A,B,C,D] from Mode 01 PID 01. Returns a dict: + {mil, dtc_count, ignition, monitors:[{name, ready}], ready_count, total}.""" + if len(data) < 4: + return None + a, b, c, d = data[0], data[1], data[2], data[3] + compression = bool(b & 0x08) + monitors = [] + # continuous monitors: B bits 0-2 supported, bits 4-6 = incomplete + for name, i in _CONTINUOUS: + if b & (1 << i): + monitors.append({"name": name, "ready": not (b & (1 << (i + 4)))}) + # non-continuous: C bits = supported, D bits = incomplete + names = _COMPRESSION if compression else _SPARK + for i, name in enumerate(names): + if name == "-": + continue + if c & (1 << i): + monitors.append({"name": name, "ready": not (d & (1 << i))}) + ready = sum(1 for m in monitors if m["ready"]) + return { + "mil": bool(a & 0x80), + "dtc_count": a & 0x7F, + "ignition": "compression" if compression else "spark", + "monitors": monitors, + "ready_count": ready, + "total": len(monitors), + } + + +# Standard PID set read from the freeze frame (Mode 02, frame 0). +# (key, pid hex, nbytes, decoder(bytes)->value, unit) +FREEZE_PIDS = [ + ("Fuel System Status", "03", 2, lambda b: b[0], ""), + ("Engine Load", "04", 1, lambda b: round(b[0] * 100 / 255), "%"), + ("Coolant Temp", "05", 1, lambda b: b[0] - 40, "C"), + ("Short Term Fuel Trim", "06", 1, lambda b: round(b[0] * 100 / 128 - 100), "%"), + ("Long Term Fuel Trim", "07", 1, lambda b: round(b[0] * 100 / 128 - 100), "%"), + ("Intake MAP", "0B", 1, lambda b: b[0], "kPa"), + ("Engine RPM", "0C", 2, lambda b: round((b[0] * 256 + b[1]) / 4), "rpm"), + ("Vehicle Speed", "0D", 1, lambda b: b[0], "km/h"), + ("Intake Air Temp", "0F", 1, lambda b: b[0] - 40, "C"), + ("MAF", "10", 2, lambda b: round((b[0] * 256 + b[1]) / 100, 1), "g/s"), + ("Throttle Position", "11", 1, lambda b: round(b[0] * 100 / 255), "%"), +] diff --git a/obdcore/trip.py b/obdcore/trip.py new file mode 100644 index 0000000..750f6c3 --- /dev/null +++ b/obdcore/trip.py @@ -0,0 +1,119 @@ +"""Trip computer + performance timers (computed from live data, no serial). + +TripComputer estimates fuel economy and trip totals from vehicle speed (km/h) +and MAF (g/s) -- the same MAF-based estimate Torque/etc. use. PerformanceMeter +measures 0-60 mph and 1/4-mile times, auto-detecting a launch from a near-stop. + +Both are fed time-stamped samples and are unit tested without hardware. +""" + +KMH_PER_MPH = 1.609344 +GAS_AFR = 14.7 # stoichiometric air:fuel for gasoline +GAS_G_PER_GAL = 2835.0 # ~6.25 lb/gal + + +def _gph(maf_gps, afr=GAS_AFR, g_per_gal=GAS_G_PER_GAL): + return (maf_gps / afr) * 3600.0 / g_per_gal + + +class TripComputer: + def __init__(self, afr=GAS_AFR, g_per_gal=GAS_G_PER_GAL): + self.afr, self.g_per_gal = afr, g_per_gal + self.reset() + + def reset(self): + self.dist_km = 0.0 + self.fuel_gal = 0.0 + self.moving_s = 0.0 + self.elapsed_s = 0.0 + self._t = None + + def update(self, t, speed_kmh, maf_gps): + if self._t is None: + self._t = t + return + dt = t - self._t + self._t = t + if dt <= 0 or dt > 5: # ignore gaps / first sample + return + self.elapsed_s += dt + if speed_kmh: + self.dist_km += speed_kmh * dt / 3600.0 + if speed_kmh > 1.0: + self.moving_s += dt + if maf_gps: + self.fuel_gal += _gph(maf_gps, self.afr, self.g_per_gal) * dt / 3600.0 + + def instant_mpg(self, speed_kmh, maf_gps): + if not speed_kmh or not maf_gps: + return 0.0 + gph = _gph(maf_gps, self.afr, self.g_per_gal) + return (speed_kmh / KMH_PER_MPH) / gph if gph > 0 else 0.0 + + def avg_mpg(self): + miles = self.dist_km / KMH_PER_MPH + return miles / self.fuel_gal if self.fuel_gal > 0 else 0.0 + + def stats(self): + return { + "distance_mi": round(self.dist_km / KMH_PER_MPH, 2), + "fuel_gal": round(self.fuel_gal, 3), + "avg_mpg": round(self.avg_mpg(), 1), + "elapsed_s": round(self.elapsed_s, 1), + "moving_s": round(self.moving_s, 1), + } + + +class PerformanceMeter: + """0-60 mph and 1/4-mile timers. A run starts when the car pulls away from a + near-stop and ends at the 1/4 mile or when it stops; best times are kept.""" + + QUARTER_M = 402.336 + + def __init__(self): + self.best_0_60 = None + self.best_quarter = None + self.last_0_60 = None + self.last_quarter = None + self._reset_run() + + def _reset_run(self): + self._running = False + self._t0 = None + self._dist_m = 0.0 + self._last_t = None + self._stop_t = None + self._did60 = False + + def update(self, t, speed_kmh): + if speed_kmh is None: + return + mph = speed_kmh / KMH_PER_MPH + if not self._running: + if mph < 1.0: + self._stop_t = t # parked, ready to launch + elif self._stop_t is not None: + self._running = True # launch detected + self._t0 = self._stop_t + self._dist_m = 0.0 + self._last_t = t + self._did60 = False + return + dt = t - self._last_t + self._last_t = t + if dt <= 0 or dt > 5: + self._reset_run() + return + self._dist_m += (speed_kmh / 3.6) * dt + if mph >= 60 and not self._did60: + self._did60 = True + self.last_0_60 = round(t - self._t0, 2) + if self.best_0_60 is None or self.last_0_60 < self.best_0_60: + self.best_0_60 = self.last_0_60 + if self._dist_m >= self.QUARTER_M: + self.last_quarter = round(t - self._t0, 2) + if self.best_quarter is None or self.last_quarter < self.best_quarter: + self.best_quarter = self.last_quarter + self._reset_run() + elif mph < 1.0: + self._reset_run() diff --git a/tests/test_services.py b/tests/test_services.py new file mode 100644 index 0000000..643b899 --- /dev/null +++ b/tests/test_services.py @@ -0,0 +1,70 @@ +"""Tests for the standard OBD services + trip/performance (no hardware).""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from obdcore import obdservices as svc +from obdcore.trip import TripComputer, PerformanceMeter, KMH_PER_MPH + + +def test_decode_vin(): + vin = "1FMZU73E12ZA12345" + data = [0x49, 0x02, 0x01] + [ord(c) for c in vin] + assert svc.decode_vin(data) == vin + assert svc.decode_vin([0x41, 0x00]) is None # not a mode-09 response + print(" VIN decode: OK") + + +def test_decode_readiness(): + # A=MIL off/0 DTCs, B=3 continuous supported+ready, C=Cat/O2/O2htr supported, + # D=O2 sensor incomplete + r = svc.decode_readiness([0x00, 0x07, 0x61, 0x20]) + assert r["mil"] is False and r["dtc_count"] == 0 and r["ignition"] == "spark" + by = {m["name"]: m["ready"] for m in r["monitors"]} + assert by["Misfire"] and by["Fuel System"] and by["Components"] + assert by["Catalyst"] is True and by["O2 Sensor"] is False + assert r["total"] == 6 and r["ready_count"] == 5 + # diesel flag + rc = svc.decode_readiness([0x80, 0x0F, 0x00, 0x00]) + assert rc["mil"] is True and rc["ignition"] == "compression" + print(f" readiness decode: {r['ready_count']}/{r['total']} ready, O2 not ready: OK") + + +def test_trip_computer(): + tc = TripComputer() + t = 0.0 + for _ in range(360): # 6 min at 1 Hz, 96.56 km/h (60 mph), 12 g/s + tc.update(t, 96.56, 12.0) + t += 1.0 + s = tc.stats() + assert 5.5 < s["distance_mi"] < 6.5, s # 60mph * 0.1h = 6 mi + assert s["avg_mpg"] > 5 and s["avg_mpg"] < 60, s + inst = tc.instant_mpg(96.56, 12.0) + assert inst > 0 + print(f" trip: {s['distance_mi']}mi, {s['avg_mpg']} avg mpg, " + f"{inst:.1f} inst: OK") + + +def test_performance_meter(): + pm = PerformanceMeter() + t = 0.0 + # parked + for _ in range(3): + pm.update(t, 0.0); t += 0.5 + # accelerate 0 -> 70 mph over 7s (mph), then cruise to cover 1/4 mile + for i in range(1, 200): + t = 1.5 + i * 0.1 + mph = min(70.0, (t - 1.5) * 10.0) # 10 mph/s + pm.update(t, mph * KMH_PER_MPH) + assert pm.best_0_60 is not None, "should have timed 0-60" + assert 5.0 < pm.best_0_60 < 8.0, pm.best_0_60 # ~6s at 10mph/s + assert pm.best_quarter is not None, "should have timed 1/4 mile" + print(f" performance: 0-60 {pm.best_0_60}s, 1/4mi {pm.best_quarter}s: OK") + + +if __name__ == "__main__": + for fn in [test_decode_vin, test_decode_readiness, test_trip_computer, + test_performance_meter]: + fn() + print("\nALL SERVICE TESTS PASS")