Add dedicated --crank monitor for no-start diagnosis

Big ICP readout focused on the cranking scenario:
- Wide ICP bar with the 500-psi firing threshold marked (|)
- Rolling ASCII trace chart of the ICP build-up (10 rows; renders anywhere,
  no unicode) -- clearly shows ICP climbing above/below the 500 firing line
- Peak-hold (the crank's max ICP, the money number) + pass/fail verdict
- FICM main / battery / RPM secondaries with sag (min) tracking
- --dash-log writes a CSV (t,icp,ficm,batt,rpm) while you watch
- On exit prints peak ICP + verdict (reached 500 / suspect oil bleed-off)

Validated end-to-end via a mock crank: ICP ramp past 500, peak capture,
battery-sag capture, trace resolution, CSV logging, clean terminal restore.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
2026-06-30 09:15:14 -04:00
parent c94caefd50
commit a9b2e133c0
2 changed files with 221 additions and 0 deletions
+191
View File
@@ -23,6 +23,7 @@ Usage (Windows):
python obd_reader.py COM5 # force a port
python obd_reader.py COM5 9600 # force port + baud
python obd_reader.py --ford # + read Ford 6.0 Mode-22 PIDs
python obd_reader.py --crank # dedicated CRANK monitor (big ICP + trace)
python obd_reader.py --dash # LIVE gauge dashboard (real-time)
python obd_reader.py --dash crank # live dash, cranking preset (ICP/FICM/batt/rpm)
python obd_reader.py --dash full --dash-log run.csv # all gauges + CSV log
@@ -833,6 +834,185 @@ def dashboard(elm, preset="vitals", log_path=None):
print("Dashboard closed." + (f" CSV log: {log_path}" if log_path else ""))
# ---------------------------------------------------------------------------
# Crank monitor -- dedicated big-ICP view for diagnosing a no-start crank
# ---------------------------------------------------------------------------
def _icp_color(v):
if v is None:
return C["dim"]
return C["green"] if v >= 500 else C["yellow"] if v >= 350 else C["red"]
def _wide_bar(val, maxv, width, mark=None):
n = 0 if val is None else int(round(width * max(0.0, min(1.0, val / maxv))))
cells = ["#" if i < n else "-" for i in range(width)]
if mark is not None:
mi = int(width * mark / maxv)
if 0 <= mi < width:
cells[mi] = "|" # firing-threshold marker
return "".join(cells)
def _trace_rows(hist, width=50, height=10, vmax=600, thresh=500):
"""Filled-area ASCII chart of ICP over recent samples (climbs as it builds)."""
data = hist[-width:]
data = [None] * (width - len(data)) + data
thr_level = int(round(thresh / vmax * height))
rows = []
for r in range(height): # r=0 = top (high psi)
level = height - r # column filled here if fill_h >= level
cells = []
for v in data:
if v is None:
cells.append(" ")
else:
fh = int(round(min(1.0, v / vmax) * height))
cells.append("#" if fh >= level else " ")
line = "".join(cells)
is_thr = (level == thr_level)
if is_thr: # draw the 500-psi firing line
line = "".join(c if c == "#" else "-" for c in line)
label = " 500"
elif r == 0:
label = f"{vmax:4d}"
else:
label = " "
rows.append((label, line, is_thr))
return rows
def _render_crank(elapsed, last_dt, frame, icp, peak, ficm, ficm_min,
batt, batt_min, rpm, hist):
hz = (1.0 / last_dt) if last_dt > 0 else 0.0
clock = f"{int(elapsed)//60:02d}:{int(elapsed)%60:02d}"
icpc = _icp_color(icp)
L = [
f"{C['bold']}===== 6.0 CRANK MONITOR ====={C['reset']} {clock} "
f"{hz:4.1f} Hz frame {frame}",
f"{C['dim']}crank the engine q=quit r=reset firing threshold = 500 psi{C['reset']}",
"",
]
icpval = f"{icp:6.1f}" if icp is not None else " -- "
L.append(f" {C['bold']}ICP {C['reset']} {icpc}[{_wide_bar(icp, 600, 40, mark=500)}]"
f"{C['reset']} {icpc}{C['bold']}{icpval} psi{C['reset']}")
fired = peak >= 500
pc = C["green"] if fired else C["red"]
verdict = ("FIRING PRESSURE REACHED" if fired
else "BELOW 500 - keep cranking" if peak > 0 else "waiting for crank...")
L.append(f" {C['bold']}PEAK{C['reset']} {pc}{peak:6.0f} psi {verdict}{C['reset']}")
L.append("")
fc = (C["dim"] if ficm is None else
C["green"] if ficm >= 46 else C["yellow"] if ficm >= 45 else C["red"])
fmv = f"{ficm:.1f}V" if ficm is not None else "--"
fmm = f" (min {ficm_min:.1f})" if ficm_min is not None else ""
bc = (C["dim"] if batt is None else
C["green"] if batt >= 12.2 else C["yellow"] if batt >= 11 else C["red"])
bv = f"{batt:.1f}V" if batt is not None else "--"
bm = f" (min {batt_min:.1f})" if batt_min is not None else ""
rv = f"{rpm}" if rpm is not None else "--"
L.append(f" FICM Main {fc}{fmv}{C['reset']}{C['dim']}{fmm} [DOC]{C['reset']} "
f"Batt {bc}{bv}{C['reset']}{C['dim']}{bm}{C['reset']} RPM {rv}")
L.append("")
L.append(f" {C['dim']}ICP trace (psi vs time, last {min(len(hist), 50)} samples){C['reset']}")
for label, line, is_thr in _trace_rows(hist):
color = C["yellow"] if is_thr else icpc
L.append(f" {C['dim']}{label}{C['reset']} |{color}{line}{C['reset']}")
L.append(f" +{'-' * 50}")
body = "\n".join(ln + ESC + "K" for ln in L)
sys.stdout.write(ESC + "H" + body + ESC + "J")
sys.stdout.flush()
def crank_monitor(elm, log_path=None):
"""Dedicated cranking view: big ICP readout + firing-line bar + rolling
trace, with peak-hold and a pass/fail on the 500-psi threshold. Start it,
then crank -- watch whether ICP builds past 500 psi or stalls (oil leak)."""
enable_ansi()
get_key, restore = _key_io()
logf = open(log_path, "w") if log_path else None
if logf:
logf.write("t,icp_psi,ficm_v,batt_v,rpm\n")
elm.cmd("ATAT2")
elm.cmd("ATST19")
u16 = lambda b: (b[0] << 8) + b[1]
hist = []
peak, batt_min, ficm_min = 0.0, None, None
icp_dead, ficm_dead = 0, 0
sys.stdout.write(ESC + "2J" + ESC + "H" + ESC + "?25l")
t0 = time.time()
frame, last_dt = 0, 0.0
try:
while True:
k = get_key()
if k in ("q", "\x03"):
break
if k == "r":
hist.clear()
peak, batt_min, ficm_min = 0.0, None, None
fstart = time.time()
icp = ficm = batt = rpm = None
if icp_dead < 4:
raw = elm.mode22("1446", timeout=0.5)
if raw:
icp = round(u16(raw) * 0.57, 1); icp_dead = 0
else:
icp_dead += 1
if ficm_dead < 4:
raw = elm.mode22("09D0", timeout=0.4)
if raw:
ficm = round(u16(raw) / 256.0, 1); ficm_dead = 0
else:
ficm_dead += 1
s = " ".join(elm.cmd("ATRV", read_timeout=0.8)).replace("V", "").strip()
try:
batt = float(s)
except ValueError:
batt = None
raw = _poll_m01(elm, "0C", 2)
if raw:
rpm = round(u16(raw) / 4)
now = time.time()
last_dt = now - fstart
frame += 1
if icp is not None:
hist.append(icp)
del hist[:-120]
peak = max(peak, icp)
if batt is not None:
batt_min = batt if batt_min is None else min(batt_min, batt)
if ficm is not None:
ficm_min = ficm if ficm_min is None else min(ficm_min, ficm)
if logf:
logf.write(f"{now - t0:.2f},{'' if icp is None else icp},"
f"{'' if ficm is None else ficm},"
f"{'' if batt is None else batt},"
f"{'' if rpm is None else rpm}\n")
logf.flush()
_render_crank(now - t0, last_dt, frame, icp, peak, ficm, ficm_min,
batt, batt_min, rpm, hist)
except KeyboardInterrupt:
pass
finally:
restore()
if logf:
logf.close()
sys.stdout.write(ESC + "?25h" + C["reset"] + "\n")
sys.stdout.flush()
elm.cmd("ATAT1")
elm.cmd("ATST32")
verdict = "REACHED firing pressure (>=500 psi)" if peak >= 500 else \
"did NOT reach 500 psi -- suspect high-pressure oil bleed-off"
print(f"Crank monitor closed. Peak ICP {peak:.0f} psi -- {verdict}."
+ (f" CSV: {log_path}" if log_path else ""))
def scan_mode22(elm, start, end, log_path=None):
"""Brute-force scan Mode-22 PIDs over [start, end] (inclusive, ints).
Logs every PID that returns data. Safe: Mode 22 is read-only by
@@ -967,6 +1147,9 @@ def main():
if i + 1 < len(raw_args):
dash_log = raw_args[i + 1]
# --crank : dedicated cranking monitor (big ICP + trace). --dash-log = CSV.
do_crank = "--crank" in raw_args
# Positional args: [port] [baud] -- skip flags and their values
pos = []
i = 0
@@ -1051,6 +1234,14 @@ def main():
is_can = elm.is_can()
# ---- Crank monitor: dedicated big-ICP cranking view ----
if do_crank:
print("\nEntering CRANK MONITOR. Start it, then crank the engine.")
print("Watch ICP build toward 500 psi (firing line). q=quit, r=reset.")
time.sleep(1.2)
crank_monitor(elm, log_path=dash_log)
return
# ---- Live dashboard: real-time gauges, skips the static report ----
if do_dash:
print(f"\nEntering live dashboard (preset: {dash_preset}).")