Files
obdash/obdcore/link.py
T
justin 6bee9c0d7f Scaffold obdcore (headless acquisition core) + ARCHITECTURE.md
Foundation for the PySide6 + pyqtgraph Windows GUI, shared with the terminal
tool. Pure data/IO -- no Qt, no curses.

obdcore/
  link.py      ElmLink   -- ELM327 serial (Mode-01/22, ATRV, DTC read/clear)
  mock.py      MockLink  -- synthetic crank for tests + GUI dev (no truck)
  registry.py  PidRegistry (verified Ford 6.0 PIDs + confidence) + DtcDatabase
  scheduler.py PollScheduler -- prioritized round-robin polling, dead-PID park,
               derived channels; tick() is fake-clock test-drivable
  store.py     TimeSeriesStore (ring buffers + min/max) + CsvRecorder/replay

Design centers on the ELM327 bandwidth limit (~7-15 reads/sec): the active
view subscribes PIDs at chosen rates; acquisition runs off the UI thread;
the GUI only reads the store. FICM_M (09D0) promoted to verified after the
2026-06-30 on-truck crank read (48.0V, intermittent).

tests/test_obdcore.py: decoders vs real truck bytes, crank ramp + peak,
derived BOOST, dead-PID park/revive, record/replay roundtrip -- all pass.

ARCHITECTURE.md: layers, data model, GUI plan, 6.0 stock-PID limits
(no EGT/oil-PSI), feature backlog, P0-P5 roadmap.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
2026-06-30 13:41:24 -04:00

180 lines
5.7 KiB
Python

"""ElmLink -- ELM327 serial transport for obdcore.
Ported from the validated obd_reader.py: the cmd/read loop, Mode-01/Mode-22
reads, ATRV, protocol negotiation, and DTC read/clear. Returns raw byte
lists (decoding happens in the registry). No GUI, no scheduler.
The terminal tool's ELM class will be migrated to import this (see
ARCHITECTURE.md, P0) once the GUI work stabilises.
"""
import time
try:
import serial
from serial.tools import list_ports
except ImportError: # allow import on machines w/o pyserial
serial = None
list_ports = None
_LETTER = {0: "P", 1: "C", 2: "B", 3: "U"}
def decode_dtc(b1, b2):
return f"{_LETTER[(b1 >> 6) & 3]}{(b1 >> 4) & 3}{b1 & 0xF:X}{b2:02X}"
def _line_bytes(ln):
ln = ln.replace(" ", "")
if len(ln) >= 2 and ln[1] == ":" and ln[0] in "0123456789":
ln = ln[2:] # drop CAN multiframe index "N:"
if not ln or any(c not in "0123456789ABCDEFabcdef" for c in ln):
return []
return [int(ln[i:i + 2], 16) for i in range(0, len(ln) - 1, 2)]
def parse_dtcs(lines, svc, is_can):
pairs = []
if is_can:
data = [b for ln in lines for b in _line_bytes(ln)]
if svc in data:
data = data[data.index(svc) + 1:]
data = data[1:] if data else data
pairs = data
else:
for ln in lines:
data = _line_bytes(ln)
if svc in data:
data = data[data.index(svc) + 1:]
elif data and data[0] == svc:
data = data[1:]
else:
continue
pairs.extend(data)
out, seen = [], set()
for i in range(0, len(pairs) - 1, 2):
b1, b2 = pairs[i], pairs[i + 1]
if b1 == 0 and b2 == 0:
continue
d = decode_dtc(b1, b2)
if d not in seen:
seen.add(d)
out.append(d)
return out
def find_ports():
if list_ports is None:
return []
def score(p):
s = (p.description + " " + (p.manufacturer or "")).lower()
return 0 if ("ch340" in s or "1a86" in s) else 1 if ("serial" in s or "usb" in s) else 2
return sorted(list_ports.comports(), key=score)
class ElmLink:
PROMPT = b">"
def __init__(self, port, baud=38400, verbose=False):
if serial is None:
raise RuntimeError("pyserial not installed (pip install pyserial)")
self.verbose = verbose
self.ser = serial.Serial(port, baud, timeout=0.2)
self.protocol = "?"
time.sleep(0.3)
self.ser.reset_input_buffer()
# -- low-level --
def cmd(self, s, settle=0.0, timeout=4.0):
self.ser.reset_input_buffer()
self.ser.write((s + "\r").encode())
if settle:
time.sleep(settle)
buf = bytearray()
deadline = time.time() + timeout
while time.time() < deadline:
chunk = self.ser.read(256)
if chunk:
buf += chunk
if self.PROMPT in buf:
break
elif self.PROMPT in buf:
break
raw = buf.decode("ascii", "replace")
if self.verbose:
print(f" [TX {s!r}] -> {raw!r}")
return [ln.strip() for ln in raw.replace(">", "").split("\r")
if ln.strip() and ln.strip() != s]
def init(self):
self.cmd("ATZ", settle=1.0)
for c in ("ATE0", "ATL0", "ATS0", "ATH0", "ATAT1", "ATSP0"):
self.cmd(c)
def fast_timing(self, on=True):
"""Tighten ELM response wait for live polling (on) or restore (off)."""
if on:
self.cmd("ATAT2"); self.cmd("ATST19")
else:
self.cmd("ATAT1"); self.cmd("ATST32")
def connect(self):
for _ in range(3):
resp = "".join(self.cmd("0100", timeout=8.0)).upper()
if "41" in resp and "00" in resp:
self.protocol = " ".join(self.cmd("ATDPN")) or "?"
return True
if any(x in resp for x in ("UNABLE", "NODATA", "NO DATA", "ERROR", "SEARCHING")):
time.sleep(0.5)
self.protocol = " ".join(self.cmd("ATDPN")) or "?"
return False
def is_can(self):
d = self.protocol.replace("A", "").strip()
return d[:1] in ("6", "7", "8", "9")
# -- reads (return list[int] or None) --
def _bytes(self, lines):
return [b for ln in lines for b in _line_bytes(ln)]
def read_m01(self, pid, nbytes, timeout=0.6):
data = self._bytes(self.cmd(f"01{pid}", timeout=timeout))
if 0x41 in data:
i = data.index(0x41)
payload = data[i + 2:i + 2 + nbytes]
if len(payload) == nbytes:
return payload
return None
def read_m22(self, pid, timeout=0.5):
data = self._bytes(self.cmd(f"22{pid}", timeout=timeout))
# response: 62 <pid hi> <pid lo> <data...>
if 0x62 in data:
i = data.index(0x62)
return data[i + 3:] or None
return None
def read_atrv(self, timeout=0.8):
s = " ".join(self.cmd("ATRV", timeout=timeout)).replace("V", "").strip()
try:
return float(s)
except ValueError:
return None
# -- DTCs --
def read_dtcs(self, mode, svc, timeout=5.0):
lines = self.cmd(mode, timeout=timeout)
if "NODATA" in "".join(lines).upper().replace(" ", ""):
return []
return parse_dtcs(lines, svc, self.is_can())
def clear_dtcs(self):
lines = self.cmd("04", timeout=6.0)
data = self._bytes(lines)
return 0x44 in data or ("OK" in "".join(lines).upper())
def close(self):
try:
self.ser.close()
except Exception:
pass