Add real-time live gauge dashboard (--dash)
In-place updating CLI dashboard for watching data while cranking/running. Pure-ANSI (no new deps; works on Windows 10+ terminals). - Color-coded gauges (green/yellow/red) by no-start thresholds - Live min/max per gauge -> captures PEAK ICP during a crank - ASCII bars for ICP and FICM main voltage - Presets: crank (ICP/FICM/batt/RPM, fastest), vitals (default), full - Dead-PID auto-skip keeps refresh rate up when 09xx FICM PIDs no-respond - --dash-log PATH writes a CSV while you watch (streaming log preserved) - q=quit, r=reset min/max; cross-platform non-blocking key input Validated: render + decoders vs the truck's real scan bytes, and the full dashboard() loop via a mock ELM (ICP climb across the 500psi firing threshold, peak capture, battery-sag capture, CSV logging, clean exit). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
@@ -32,6 +32,27 @@ python obd_reader.py COM5 --clear # read, then optionally clear (asks to confi
|
||||
python obd_reader.py COM5 -v # verbose: show raw ELM327 traffic
|
||||
```
|
||||
|
||||
### Live dashboard (real-time gauges)
|
||||
|
||||
Updates in place as you crank or run the engine — color-coded, with live
|
||||
min/max so a crank's **peak ICP** is captured. No extra dependencies (ANSI;
|
||||
works on any Windows 10+ terminal). `q` quits, `r` resets min/max.
|
||||
|
||||
```
|
||||
python obd_reader.py COM5 --dash # vitals preset (ICP, FICM, IPR, batt, RPM, temps)
|
||||
python obd_reader.py COM5 --dash crank # cranking preset: ICP / FICM main / batt / RPM (fastest)
|
||||
python obd_reader.py COM5 --dash full # every PID
|
||||
python obd_reader.py COM5 --dash crank --dash-log crank.csv # + write a CSV while you watch
|
||||
```
|
||||
|
||||
**No-start use:** run `--dash crank`, then crank. A healthy 6.0 builds
|
||||
**~500+ psi ICP within 1–2 s**; if ICP stalls below 500 (red), that confirms
|
||||
the high-pressure oil bleed-off. FICM Main should hold ~48V. The `--dash-log`
|
||||
CSV is your streaming log — paste it back for analysis.
|
||||
|
||||
Note: the FICM PIDs (`09xx`) are `[DOC]` (not yet confirmed on this truck); if
|
||||
they read `--`, they auto-drop after a few frames so the refresh rate stays up.
|
||||
|
||||
Or just double-click **`RUN_OBD.bat`** on Windows (auto-installs `pyserial`).
|
||||
|
||||
On the truck: plug into the OBD port under the dash, key to **RUN** (engine off is fine
|
||||
|
||||
+329
@@ -23,12 +23,16 @@ Usage (Windows):
|
||||
python obd_reader.py COM5 # force a port
|
||||
python obd_reader.py COM5 9600 # force port + baud
|
||||
python obd_reader.py --ford # + read Ford 6.0 Mode-22 PIDs
|
||||
python obd_reader.py --dash # LIVE gauge dashboard (real-time)
|
||||
python obd_reader.py --dash crank # live dash, cranking preset (ICP/FICM/batt/rpm)
|
||||
python obd_reader.py --dash full --dash-log run.csv # all gauges + CSV log
|
||||
python obd_reader.py --pid 1446 # probe one Mode-22 PID (1446=ICP), show raw
|
||||
python obd_reader.py --clear # erase stored + pending DTCs
|
||||
|
||||
Requires: pip install pyserial
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
@@ -534,6 +538,301 @@ def watch_loop(elm, seconds=20, with_ford=False):
|
||||
print("\n (stopped)")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Live dashboard -- real-time gauges that update as you crank / run
|
||||
# ---------------------------------------------------------------------------
|
||||
ESC = "\x1b["
|
||||
C = {
|
||||
"reset": ESC + "0m", "bold": ESC + "1m", "dim": ESC + "2m",
|
||||
"green": ESC + "32m", "yellow": ESC + "33m", "red": ESC + "31m",
|
||||
"cyan": ESC + "36m",
|
||||
}
|
||||
|
||||
|
||||
def enable_ansi():
|
||||
"""Enable ANSI/VT processing on Windows 10+ consoles (no-op elsewhere)."""
|
||||
if os.name == "nt":
|
||||
try:
|
||||
import ctypes
|
||||
k = ctypes.windll.kernel32
|
||||
k.SetConsoleMode(k.GetStdHandle(-11), 7) # ENABLE_VT_PROCESSING
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _key_io():
|
||||
"""(get_key, restore) for non-blocking single-key reads; no-ops if unsupported."""
|
||||
try:
|
||||
import msvcrt
|
||||
def get():
|
||||
if msvcrt.kbhit():
|
||||
try:
|
||||
return msvcrt.getch().decode("ascii", "ignore").lower()
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
return get, (lambda: None)
|
||||
except ImportError:
|
||||
try:
|
||||
import termios, tty, select
|
||||
fd = sys.stdin.fileno()
|
||||
old = termios.tcgetattr(fd)
|
||||
tty.setcbreak(fd)
|
||||
def get():
|
||||
if select.select([sys.stdin], [], [], 0)[0]:
|
||||
return sys.stdin.read(1).lower()
|
||||
return None
|
||||
def restore():
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
||||
return get, restore
|
||||
except Exception:
|
||||
return (lambda: None), (lambda: None)
|
||||
|
||||
|
||||
class Gauge:
|
||||
def __init__(self, key, label, unit, kind, pid=None, nbytes=2,
|
||||
decode=None, status=None, bar_max=None, doc=False):
|
||||
self.key, self.label, self.unit = key, label, unit
|
||||
self.kind, self.pid, self.nbytes = kind, pid, nbytes
|
||||
self.decode, self.status, self.bar_max, self.doc = decode, status, bar_max, doc
|
||||
self.cur = self.lo = self.hi = None
|
||||
self.fails = 0
|
||||
self.active = True
|
||||
|
||||
def feed(self, val):
|
||||
self.cur = val
|
||||
if val is not None:
|
||||
self.fails = 0
|
||||
self.lo = val if self.lo is None else min(self.lo, val)
|
||||
self.hi = val if self.hi is None else max(self.hi, val)
|
||||
else:
|
||||
self.fails += 1
|
||||
if self.fails >= 4:
|
||||
self.active = False # stop polling a dead PID
|
||||
|
||||
def reset_minmax(self):
|
||||
self.lo = self.hi = self.cur
|
||||
|
||||
|
||||
def _gauge_set(preset):
|
||||
u16 = lambda b: (b[0] << 8) + b[1]
|
||||
icp_st = lambda v: "ok" if v >= 500 else "crit"
|
||||
fm_st = lambda v: "ok" if v >= 46 else "warn" if v >= 45 else "crit"
|
||||
fl_st = lambda v: "ok" if v >= 11 else "warn"
|
||||
bat_st = lambda v: "ok" if v >= 12.2 else "warn" if v >= 11.0 else "crit"
|
||||
G = {
|
||||
"ICP": Gauge("ICP", "ICP", "psi", "m22", "1446",
|
||||
decode=lambda b: round(u16(b) * 0.57, 1),
|
||||
status=icp_st, bar_max=600),
|
||||
"FICM_M": Gauge("FICM_M", "FICM Main", "V", "m22", "09D0",
|
||||
decode=lambda b: round(u16(b) / 256.0, 1),
|
||||
status=fm_st, bar_max=50, doc=True),
|
||||
"FICM_L": Gauge("FICM_L", "FICM Logic", "V", "m22", "09CF",
|
||||
decode=lambda b: round(u16(b) / 256.0, 1),
|
||||
status=fl_st, doc=True),
|
||||
"IPR": Gauge("IPR", "IPR", "%", "m22", "1434",
|
||||
decode=lambda b: round(b[0] * 13.53 / 35, 1), bar_max=100),
|
||||
"BATT": Gauge("BATT", "Batt", "V", "atrv", status=bat_st),
|
||||
"RPM": Gauge("RPM", "RPM", "rpm", "m01", "0C", nbytes=2,
|
||||
decode=lambda b: round(u16(b) / 4)),
|
||||
"ECT": Gauge("ECT", "ECT", "C", "m01", "05", nbytes=1,
|
||||
decode=lambda b: b[0] - 40),
|
||||
"EOT": Gauge("EOT", "EOT", "C", "m22", "1310",
|
||||
decode=lambda b: round(u16(b) / 100.0 - 40, 1)),
|
||||
"IAT": Gauge("IAT", "IAT", "C", "m01", "0F", nbytes=1,
|
||||
decode=lambda b: b[0] - 40),
|
||||
"VPCM": Gauge("VPCM", "VPCM", "V", "m01", "42", nbytes=2,
|
||||
decode=lambda b: round(u16(b) / 1000.0, 2)),
|
||||
"MAP": Gauge("MAP", "MAP", "psia", "m22", "1440",
|
||||
decode=lambda b: round(u16(b) * 0.03625, 2)),
|
||||
"BARO": Gauge("BARO", "BARO", "psia", "m22", "1442",
|
||||
decode=lambda b: round(u16(b) * 0.03625, 2)),
|
||||
"BOOST": Gauge("BOOST", "Boost", "psi", "derived", ("MAP", "BARO")),
|
||||
"EBP": Gauge("EBP", "EBP", "psia", "m22", "1445",
|
||||
decode=lambda b: round(u16(b) * 0.03625, 2)),
|
||||
"GEAR": Gauge("GEAR", "Gear", "", "m22", "11B3",
|
||||
decode=lambda b: b[0] // 2),
|
||||
"TSS": Gauge("TSS", "TSS", "rpm", "m22", "11B4",
|
||||
decode=lambda b: round(u16(b) / 4)),
|
||||
}
|
||||
presets = {
|
||||
"crank": ["ICP", "FICM_M", "BATT", "RPM"],
|
||||
"vitals": ["ICP", "FICM_M", "FICM_L", "IPR", "BATT", "RPM",
|
||||
"ECT", "EOT", "IAT", "VPCM"],
|
||||
"full": list(G.keys()),
|
||||
}
|
||||
keys = presets.get(preset, presets["vitals"])
|
||||
return [G[k] for k in keys]
|
||||
|
||||
|
||||
def _poll_m01(elm, pid, nbytes):
|
||||
data = hexbytes(elm.cmd(f"01{pid}", read_timeout=0.6))
|
||||
if 0x41 in data:
|
||||
i = data.index(0x41)
|
||||
payload = data[i + 2:i + 2 + nbytes]
|
||||
if len(payload) == nbytes:
|
||||
return payload
|
||||
return None
|
||||
|
||||
|
||||
def _poll(elm, g, frame_vals):
|
||||
if g.kind == "derived":
|
||||
a, b = frame_vals.get(g.pid[0]), frame_vals.get(g.pid[1])
|
||||
return None if (a is None or b is None) else round(a - b, 2)
|
||||
if g.kind == "atrv":
|
||||
s = " ".join(elm.cmd("ATRV", read_timeout=0.8)).replace("V", "").strip()
|
||||
try:
|
||||
return float(s)
|
||||
except ValueError:
|
||||
return None
|
||||
raw = _poll_m01(elm, g.pid, g.nbytes) if g.kind == "m01" else elm.mode22(g.pid, timeout=0.5)
|
||||
if not raw:
|
||||
return None
|
||||
try:
|
||||
return g.decode(raw)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _color_for(g):
|
||||
if g.cur is None:
|
||||
return C["dim"]
|
||||
if g.status:
|
||||
try:
|
||||
s = g.status(g.cur)
|
||||
except Exception:
|
||||
s = None
|
||||
return {"ok": C["green"], "warn": C["yellow"], "crit": C["red"]}.get(s, C["cyan"])
|
||||
return C["cyan"]
|
||||
|
||||
|
||||
def _bar(val, maxv, width=12):
|
||||
if val is None or not maxv:
|
||||
return " " * (width + 2)
|
||||
n = int(round(width * max(0.0, min(1.0, val / maxv))))
|
||||
return "[" + "#" * n + "-" * (width - n) + "]"
|
||||
|
||||
|
||||
def _fmt(g):
|
||||
if g.cur is None:
|
||||
return f"{'--':>8} {g.unit}"
|
||||
if isinstance(g.cur, float):
|
||||
return f"{g.cur:>8.1f} {g.unit}"
|
||||
return f"{g.cur:>8} {g.unit}"
|
||||
|
||||
|
||||
def _compact(g):
|
||||
if g.cur is None:
|
||||
return "--"
|
||||
v = f"{g.cur:.1f}" if isinstance(g.cur, float) else f"{g.cur}"
|
||||
return f"{v}{g.unit}"
|
||||
|
||||
|
||||
def _render(gauges, preset, elapsed, last_dt, frame):
|
||||
hz = (1.0 / last_dt) if last_dt > 0 else 0.0
|
||||
clock = f"{int(elapsed)//60:02d}:{int(elapsed)%60:02d}"
|
||||
lines = [
|
||||
f"{C['bold']}== 6.0 LIVE DASH =={C['reset']} {clock} {hz:4.1f} Hz "
|
||||
f"frame {frame} [{preset}]",
|
||||
f"{C['dim']}q=quit r=reset min/max green=ok yellow=warn red=low/crit"
|
||||
f" [DOC]=PID not yet confirmed on this truck{C['reset']}",
|
||||
"",
|
||||
]
|
||||
vitals = [g for g in gauges if g.status or g.bar_max]
|
||||
others = [g for g in gauges if not (g.status or g.bar_max)]
|
||||
|
||||
if vitals:
|
||||
lines.append(f"{C['bold']} NO-START VITALS{C['reset']}")
|
||||
for g in vitals:
|
||||
col = _color_for(g)
|
||||
val = f"{col}{_fmt(g)}{C['reset']}"
|
||||
bar = f" {col}{_bar(g.cur, g.bar_max)}{C['reset']}" if g.bar_max else ""
|
||||
mm = f" {C['dim']}min {g.lo:g} / max {g.hi:g}{C['reset']}" if g.lo is not None else ""
|
||||
doc = f" {C['dim']}[DOC]{C['reset']}" if g.doc else ""
|
||||
lines.append(f" {g.label:11}{val}{bar}{mm}{doc}")
|
||||
lines.append("")
|
||||
|
||||
if others:
|
||||
lines.append(f"{C['bold']} ENGINE / DRIVELINE{C['reset']}")
|
||||
cells = []
|
||||
for g in others:
|
||||
plain = f"{g.label:6}{_compact(g)}"
|
||||
cells.append(_color_for(g) + f"{plain:<22}" + C["reset"])
|
||||
for i in range(0, len(cells), 3):
|
||||
lines.append(" " + "".join(cells[i:i + 3]))
|
||||
|
||||
body = "\n".join(ln + ESC + "K" for ln in lines)
|
||||
sys.stdout.write(ESC + "H" + body + ESC + "J")
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def dashboard(elm, preset="vitals", log_path=None):
|
||||
"""Real-time gauge dashboard. Polls a focused PID set in a loop and
|
||||
redraws in place. min/max persist so a crank's PEAK ICP is captured."""
|
||||
gauges = _gauge_set(preset)
|
||||
enable_ansi()
|
||||
get_key, restore = _key_io()
|
||||
logf = open(log_path, "w") if log_path else None
|
||||
if logf:
|
||||
logf.write("t," + ",".join(g.key for g in gauges) + "\n")
|
||||
|
||||
elm.cmd("ATAT2") # max adaptive timing
|
||||
elm.cmd("ATST19") # ~100ms ECU response wait -> snappier polling
|
||||
|
||||
sys.stdout.write(ESC + "2J" + ESC + "H" + ESC + "?25l") # clear, home, hide cursor
|
||||
t0 = time.time()
|
||||
frame, last_dt, last_recheck = 0, 0.0, t0
|
||||
try:
|
||||
while True:
|
||||
k = get_key()
|
||||
if k in ("q", "\x03"):
|
||||
break
|
||||
if k == "r":
|
||||
for g in gauges:
|
||||
g.reset_minmax()
|
||||
|
||||
now = time.time()
|
||||
if now - last_recheck > 5.0: # periodically revive dead PIDs
|
||||
for g in gauges:
|
||||
if not g.active:
|
||||
g.active, g.fails = True, 0
|
||||
last_recheck = now
|
||||
|
||||
fstart = time.time()
|
||||
frame_vals = {}
|
||||
for g in gauges: # non-derived first
|
||||
if g.kind == "derived" or not g.active:
|
||||
frame_vals[g.key] = g.cur if not g.active else None
|
||||
continue
|
||||
v = _poll(elm, g, frame_vals)
|
||||
g.feed(v)
|
||||
frame_vals[g.key] = v
|
||||
for g in gauges: # then derived (needs others)
|
||||
if g.kind == "derived":
|
||||
v = _poll(elm, g, frame_vals)
|
||||
g.feed(v)
|
||||
last_dt = time.time() - fstart
|
||||
frame += 1
|
||||
|
||||
if logf:
|
||||
row = [f"{now - t0:.2f}"] + ["" if g.cur is None else str(g.cur) for g in gauges]
|
||||
logf.write(",".join(row) + "\n")
|
||||
logf.flush()
|
||||
|
||||
_render(gauges, preset, now - t0, last_dt, frame)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
restore()
|
||||
if logf:
|
||||
logf.close()
|
||||
sys.stdout.write(ESC + "?25h" + C["reset"] + "\n")
|
||||
sys.stdout.flush()
|
||||
elm.cmd("ATAT1")
|
||||
elm.cmd("ATST32")
|
||||
print("Dashboard closed." + (f" CSV log: {log_path}" if log_path else ""))
|
||||
|
||||
|
||||
def scan_mode22(elm, start, end, log_path=None):
|
||||
"""Brute-force scan Mode-22 PIDs over [start, end] (inclusive, ints).
|
||||
Logs every PID that returns data. Safe: Mode 22 is read-only by
|
||||
@@ -655,6 +954,19 @@ def main():
|
||||
if i + 1 < len(raw_args):
|
||||
scan_log = raw_args[i + 1]
|
||||
|
||||
# --dash [crank|vitals|full] : real-time live gauge dashboard
|
||||
do_dash = "--dash" in raw_args
|
||||
dash_preset = "vitals"
|
||||
if do_dash:
|
||||
i = raw_args.index("--dash")
|
||||
if i + 1 < len(raw_args) and raw_args[i + 1] in ("crank", "vitals", "full"):
|
||||
dash_preset = raw_args[i + 1]
|
||||
dash_log = None
|
||||
if "--dash-log" in raw_args:
|
||||
i = raw_args.index("--dash-log")
|
||||
if i + 1 < len(raw_args):
|
||||
dash_log = raw_args[i + 1]
|
||||
|
||||
# Positional args: [port] [baud] -- skip flags and their values
|
||||
pos = []
|
||||
i = 0
|
||||
@@ -666,6 +978,15 @@ def main():
|
||||
if a == "--scan-log" and i + 1 < len(raw_args):
|
||||
i += 2
|
||||
continue
|
||||
if a == "--dash":
|
||||
if i + 1 < len(raw_args) and raw_args[i + 1] in ("crank", "vitals", "full"):
|
||||
i += 2
|
||||
else:
|
||||
i += 1
|
||||
continue
|
||||
if a == "--dash-log" and i + 1 < len(raw_args):
|
||||
i += 2
|
||||
continue
|
||||
if a == "--watch":
|
||||
if i + 1 < len(raw_args) and raw_args[i + 1].isdigit():
|
||||
i += 2
|
||||
@@ -730,6 +1051,14 @@ def main():
|
||||
|
||||
is_can = elm.is_can()
|
||||
|
||||
# ---- Live dashboard: real-time gauges, skips the static report ----
|
||||
if do_dash:
|
||||
print(f"\nEntering live dashboard (preset: {dash_preset}).")
|
||||
print("Crank or run the engine and watch. q=quit, r=reset min/max.")
|
||||
time.sleep(1.2)
|
||||
dashboard(elm, preset=dash_preset, log_path=dash_log)
|
||||
return
|
||||
|
||||
# ---- DTCs ----
|
||||
print("\n" + "-" * 64)
|
||||
print(" TROUBLE CODES")
|
||||
|
||||
Reference in New Issue
Block a user