From c94caefd50a4f97f3af1b70bd7b54f8219ba0356 Mon Sep 17 00:00:00 2001 From: Justin Paul Date: Mon, 29 Jun 2026 23:27:29 -0400 Subject: [PATCH] Add real-time live gauge dashboard (--dash) In-place updating CLI dashboard for watching data while cranking/running. Pure-ANSI (no new deps; works on Windows 10+ terminals). - Color-coded gauges (green/yellow/red) by no-start thresholds - Live min/max per gauge -> captures PEAK ICP during a crank - ASCII bars for ICP and FICM main voltage - Presets: crank (ICP/FICM/batt/RPM, fastest), vitals (default), full - Dead-PID auto-skip keeps refresh rate up when 09xx FICM PIDs no-respond - --dash-log PATH writes a CSV while you watch (streaming log preserved) - q=quit, r=reset min/max; cross-platform non-blocking key input Validated: render + decoders vs the truck's real scan bytes, and the full dashboard() loop via a mock ELM (ICP climb across the 500psi firing threshold, peak capture, battery-sag capture, CSV logging, clean exit). Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs --- README.md | 21 ++++ obd_reader.py | 329 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 350 insertions(+) diff --git a/README.md b/README.md index 8604b95..e5489fe 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,27 @@ python obd_reader.py COM5 --clear # read, then optionally clear (asks to confi python obd_reader.py COM5 -v # verbose: show raw ELM327 traffic ``` +### Live dashboard (real-time gauges) + +Updates in place as you crank or run the engine — color-coded, with live +min/max so a crank's **peak ICP** is captured. No extra dependencies (ANSI; +works on any Windows 10+ terminal). `q` quits, `r` resets min/max. + +``` +python obd_reader.py COM5 --dash # vitals preset (ICP, FICM, IPR, batt, RPM, temps) +python obd_reader.py COM5 --dash crank # cranking preset: ICP / FICM main / batt / RPM (fastest) +python obd_reader.py COM5 --dash full # every PID +python obd_reader.py COM5 --dash crank --dash-log crank.csv # + write a CSV while you watch +``` + +**No-start use:** run `--dash crank`, then crank. A healthy 6.0 builds +**~500+ psi ICP within 1–2 s**; if ICP stalls below 500 (red), that confirms +the high-pressure oil bleed-off. FICM Main should hold ~48V. The `--dash-log` +CSV is your streaming log — paste it back for analysis. + +Note: the FICM PIDs (`09xx`) are `[DOC]` (not yet confirmed on this truck); if +they read `--`, they auto-drop after a few frames so the refresh rate stays up. + Or just double-click **`RUN_OBD.bat`** on Windows (auto-installs `pyserial`). On the truck: plug into the OBD port under the dash, key to **RUN** (engine off is fine diff --git a/obd_reader.py b/obd_reader.py index 2023b22..af7392e 100644 --- a/obd_reader.py +++ b/obd_reader.py @@ -23,12 +23,16 @@ Usage (Windows): python obd_reader.py COM5 # force a port python obd_reader.py COM5 9600 # force port + baud python obd_reader.py --ford # + read Ford 6.0 Mode-22 PIDs + python obd_reader.py --dash # LIVE gauge dashboard (real-time) + python obd_reader.py --dash crank # live dash, cranking preset (ICP/FICM/batt/rpm) + python obd_reader.py --dash full --dash-log run.csv # all gauges + CSV log python obd_reader.py --pid 1446 # probe one Mode-22 PID (1446=ICP), show raw python obd_reader.py --clear # erase stored + pending DTCs Requires: pip install pyserial """ +import os import sys import time @@ -534,6 +538,301 @@ def watch_loop(elm, seconds=20, with_ford=False): print("\n (stopped)") +# --------------------------------------------------------------------------- +# Live dashboard -- real-time gauges that update as you crank / run +# --------------------------------------------------------------------------- +ESC = "\x1b[" +C = { + "reset": ESC + "0m", "bold": ESC + "1m", "dim": ESC + "2m", + "green": ESC + "32m", "yellow": ESC + "33m", "red": ESC + "31m", + "cyan": ESC + "36m", +} + + +def enable_ansi(): + """Enable ANSI/VT processing on Windows 10+ consoles (no-op elsewhere).""" + if os.name == "nt": + try: + import ctypes + k = ctypes.windll.kernel32 + k.SetConsoleMode(k.GetStdHandle(-11), 7) # ENABLE_VT_PROCESSING + except Exception: + pass + + +def _key_io(): + """(get_key, restore) for non-blocking single-key reads; no-ops if unsupported.""" + try: + import msvcrt + def get(): + if msvcrt.kbhit(): + try: + return msvcrt.getch().decode("ascii", "ignore").lower() + except Exception: + return None + return None + return get, (lambda: None) + except ImportError: + try: + import termios, tty, select + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + tty.setcbreak(fd) + def get(): + if select.select([sys.stdin], [], [], 0)[0]: + return sys.stdin.read(1).lower() + return None + def restore(): + termios.tcsetattr(fd, termios.TCSADRAIN, old) + return get, restore + except Exception: + return (lambda: None), (lambda: None) + + +class Gauge: + def __init__(self, key, label, unit, kind, pid=None, nbytes=2, + decode=None, status=None, bar_max=None, doc=False): + self.key, self.label, self.unit = key, label, unit + self.kind, self.pid, self.nbytes = kind, pid, nbytes + self.decode, self.status, self.bar_max, self.doc = decode, status, bar_max, doc + self.cur = self.lo = self.hi = None + self.fails = 0 + self.active = True + + def feed(self, val): + self.cur = val + if val is not None: + self.fails = 0 + self.lo = val if self.lo is None else min(self.lo, val) + self.hi = val if self.hi is None else max(self.hi, val) + else: + self.fails += 1 + if self.fails >= 4: + self.active = False # stop polling a dead PID + + def reset_minmax(self): + self.lo = self.hi = self.cur + + +def _gauge_set(preset): + u16 = lambda b: (b[0] << 8) + b[1] + icp_st = lambda v: "ok" if v >= 500 else "crit" + fm_st = lambda v: "ok" if v >= 46 else "warn" if v >= 45 else "crit" + fl_st = lambda v: "ok" if v >= 11 else "warn" + bat_st = lambda v: "ok" if v >= 12.2 else "warn" if v >= 11.0 else "crit" + G = { + "ICP": Gauge("ICP", "ICP", "psi", "m22", "1446", + decode=lambda b: round(u16(b) * 0.57, 1), + status=icp_st, bar_max=600), + "FICM_M": Gauge("FICM_M", "FICM Main", "V", "m22", "09D0", + decode=lambda b: round(u16(b) / 256.0, 1), + status=fm_st, bar_max=50, doc=True), + "FICM_L": Gauge("FICM_L", "FICM Logic", "V", "m22", "09CF", + decode=lambda b: round(u16(b) / 256.0, 1), + status=fl_st, doc=True), + "IPR": Gauge("IPR", "IPR", "%", "m22", "1434", + decode=lambda b: round(b[0] * 13.53 / 35, 1), bar_max=100), + "BATT": Gauge("BATT", "Batt", "V", "atrv", status=bat_st), + "RPM": Gauge("RPM", "RPM", "rpm", "m01", "0C", nbytes=2, + decode=lambda b: round(u16(b) / 4)), + "ECT": Gauge("ECT", "ECT", "C", "m01", "05", nbytes=1, + decode=lambda b: b[0] - 40), + "EOT": Gauge("EOT", "EOT", "C", "m22", "1310", + decode=lambda b: round(u16(b) / 100.0 - 40, 1)), + "IAT": Gauge("IAT", "IAT", "C", "m01", "0F", nbytes=1, + decode=lambda b: b[0] - 40), + "VPCM": Gauge("VPCM", "VPCM", "V", "m01", "42", nbytes=2, + decode=lambda b: round(u16(b) / 1000.0, 2)), + "MAP": Gauge("MAP", "MAP", "psia", "m22", "1440", + decode=lambda b: round(u16(b) * 0.03625, 2)), + "BARO": Gauge("BARO", "BARO", "psia", "m22", "1442", + decode=lambda b: round(u16(b) * 0.03625, 2)), + "BOOST": Gauge("BOOST", "Boost", "psi", "derived", ("MAP", "BARO")), + "EBP": Gauge("EBP", "EBP", "psia", "m22", "1445", + decode=lambda b: round(u16(b) * 0.03625, 2)), + "GEAR": Gauge("GEAR", "Gear", "", "m22", "11B3", + decode=lambda b: b[0] // 2), + "TSS": Gauge("TSS", "TSS", "rpm", "m22", "11B4", + decode=lambda b: round(u16(b) / 4)), + } + presets = { + "crank": ["ICP", "FICM_M", "BATT", "RPM"], + "vitals": ["ICP", "FICM_M", "FICM_L", "IPR", "BATT", "RPM", + "ECT", "EOT", "IAT", "VPCM"], + "full": list(G.keys()), + } + keys = presets.get(preset, presets["vitals"]) + return [G[k] for k in keys] + + +def _poll_m01(elm, pid, nbytes): + data = hexbytes(elm.cmd(f"01{pid}", read_timeout=0.6)) + if 0x41 in data: + i = data.index(0x41) + payload = data[i + 2:i + 2 + nbytes] + if len(payload) == nbytes: + return payload + return None + + +def _poll(elm, g, frame_vals): + if g.kind == "derived": + a, b = frame_vals.get(g.pid[0]), frame_vals.get(g.pid[1]) + return None if (a is None or b is None) else round(a - b, 2) + if g.kind == "atrv": + s = " ".join(elm.cmd("ATRV", read_timeout=0.8)).replace("V", "").strip() + try: + return float(s) + except ValueError: + return None + raw = _poll_m01(elm, g.pid, g.nbytes) if g.kind == "m01" else elm.mode22(g.pid, timeout=0.5) + if not raw: + return None + try: + return g.decode(raw) + except Exception: + return None + + +def _color_for(g): + if g.cur is None: + return C["dim"] + if g.status: + try: + s = g.status(g.cur) + except Exception: + s = None + return {"ok": C["green"], "warn": C["yellow"], "crit": C["red"]}.get(s, C["cyan"]) + return C["cyan"] + + +def _bar(val, maxv, width=12): + if val is None or not maxv: + return " " * (width + 2) + n = int(round(width * max(0.0, min(1.0, val / maxv)))) + return "[" + "#" * n + "-" * (width - n) + "]" + + +def _fmt(g): + if g.cur is None: + return f"{'--':>8} {g.unit}" + if isinstance(g.cur, float): + return f"{g.cur:>8.1f} {g.unit}" + return f"{g.cur:>8} {g.unit}" + + +def _compact(g): + if g.cur is None: + return "--" + v = f"{g.cur:.1f}" if isinstance(g.cur, float) else f"{g.cur}" + return f"{v}{g.unit}" + + +def _render(gauges, preset, elapsed, last_dt, frame): + hz = (1.0 / last_dt) if last_dt > 0 else 0.0 + clock = f"{int(elapsed)//60:02d}:{int(elapsed)%60:02d}" + lines = [ + f"{C['bold']}== 6.0 LIVE DASH =={C['reset']} {clock} {hz:4.1f} Hz " + f"frame {frame} [{preset}]", + f"{C['dim']}q=quit r=reset min/max green=ok yellow=warn red=low/crit" + f" [DOC]=PID not yet confirmed on this truck{C['reset']}", + "", + ] + vitals = [g for g in gauges if g.status or g.bar_max] + others = [g for g in gauges if not (g.status or g.bar_max)] + + if vitals: + lines.append(f"{C['bold']} NO-START VITALS{C['reset']}") + for g in vitals: + col = _color_for(g) + val = f"{col}{_fmt(g)}{C['reset']}" + bar = f" {col}{_bar(g.cur, g.bar_max)}{C['reset']}" if g.bar_max else "" + mm = f" {C['dim']}min {g.lo:g} / max {g.hi:g}{C['reset']}" if g.lo is not None else "" + doc = f" {C['dim']}[DOC]{C['reset']}" if g.doc else "" + lines.append(f" {g.label:11}{val}{bar}{mm}{doc}") + lines.append("") + + if others: + lines.append(f"{C['bold']} ENGINE / DRIVELINE{C['reset']}") + cells = [] + for g in others: + plain = f"{g.label:6}{_compact(g)}" + cells.append(_color_for(g) + f"{plain:<22}" + C["reset"]) + for i in range(0, len(cells), 3): + lines.append(" " + "".join(cells[i:i + 3])) + + body = "\n".join(ln + ESC + "K" for ln in lines) + sys.stdout.write(ESC + "H" + body + ESC + "J") + sys.stdout.flush() + + +def dashboard(elm, preset="vitals", log_path=None): + """Real-time gauge dashboard. Polls a focused PID set in a loop and + redraws in place. min/max persist so a crank's PEAK ICP is captured.""" + gauges = _gauge_set(preset) + enable_ansi() + get_key, restore = _key_io() + logf = open(log_path, "w") if log_path else None + if logf: + logf.write("t," + ",".join(g.key for g in gauges) + "\n") + + elm.cmd("ATAT2") # max adaptive timing + elm.cmd("ATST19") # ~100ms ECU response wait -> snappier polling + + sys.stdout.write(ESC + "2J" + ESC + "H" + ESC + "?25l") # clear, home, hide cursor + t0 = time.time() + frame, last_dt, last_recheck = 0, 0.0, t0 + try: + while True: + k = get_key() + if k in ("q", "\x03"): + break + if k == "r": + for g in gauges: + g.reset_minmax() + + now = time.time() + if now - last_recheck > 5.0: # periodically revive dead PIDs + for g in gauges: + if not g.active: + g.active, g.fails = True, 0 + last_recheck = now + + fstart = time.time() + frame_vals = {} + for g in gauges: # non-derived first + if g.kind == "derived" or not g.active: + frame_vals[g.key] = g.cur if not g.active else None + continue + v = _poll(elm, g, frame_vals) + g.feed(v) + frame_vals[g.key] = v + for g in gauges: # then derived (needs others) + if g.kind == "derived": + v = _poll(elm, g, frame_vals) + g.feed(v) + last_dt = time.time() - fstart + frame += 1 + + if logf: + row = [f"{now - t0:.2f}"] + ["" if g.cur is None else str(g.cur) for g in gauges] + logf.write(",".join(row) + "\n") + logf.flush() + + _render(gauges, preset, now - t0, last_dt, frame) + except KeyboardInterrupt: + pass + finally: + restore() + if logf: + logf.close() + sys.stdout.write(ESC + "?25h" + C["reset"] + "\n") + sys.stdout.flush() + elm.cmd("ATAT1") + elm.cmd("ATST32") + print("Dashboard closed." + (f" CSV log: {log_path}" if log_path else "")) + + def scan_mode22(elm, start, end, log_path=None): """Brute-force scan Mode-22 PIDs over [start, end] (inclusive, ints). Logs every PID that returns data. Safe: Mode 22 is read-only by @@ -655,6 +954,19 @@ def main(): if i + 1 < len(raw_args): scan_log = raw_args[i + 1] + # --dash [crank|vitals|full] : real-time live gauge dashboard + do_dash = "--dash" in raw_args + dash_preset = "vitals" + if do_dash: + i = raw_args.index("--dash") + if i + 1 < len(raw_args) and raw_args[i + 1] in ("crank", "vitals", "full"): + dash_preset = raw_args[i + 1] + dash_log = None + if "--dash-log" in raw_args: + i = raw_args.index("--dash-log") + if i + 1 < len(raw_args): + dash_log = raw_args[i + 1] + # Positional args: [port] [baud] -- skip flags and their values pos = [] i = 0 @@ -666,6 +978,15 @@ def main(): if a == "--scan-log" and i + 1 < len(raw_args): i += 2 continue + if a == "--dash": + if i + 1 < len(raw_args) and raw_args[i + 1] in ("crank", "vitals", "full"): + i += 2 + else: + i += 1 + continue + if a == "--dash-log" and i + 1 < len(raw_args): + i += 2 + continue if a == "--watch": if i + 1 < len(raw_args) and raw_args[i + 1].isdigit(): i += 2 @@ -730,6 +1051,14 @@ def main(): is_can = elm.is_can() + # ---- Live dashboard: real-time gauges, skips the static report ---- + if do_dash: + print(f"\nEntering live dashboard (preset: {dash_preset}).") + print("Crank or run the engine and watch. q=quit, r=reset min/max.") + time.sleep(1.2) + dashboard(elm, preset=dash_preset, log_path=dash_log) + return + # ---- DTCs ---- print("\n" + "-" * 64) print(" TROUBLE CODES")