Files
justin fa7225d6dc Fix #9: DTC/freeze-frame parsing (phantom codes, Mode 02, hex frame index)
- parse_dtcs CAN branch is now message-aware: each ECU reply '<svc> <count>
  <pairs>' has its header stripped per-message, instead of flattening all lines
  and stripping svc+count once. With multiple ECUs the old code ate the second
  header as a DTC pair -> phantom codes. Critically, it does NOT blind-scan for
  svc (0x43 is a legal DTC first byte: C03xx) — a numbered ISO-TP continuation
  is distinguished by its 'N:' frame-index prefix, not by value.
- _line_bytes strips hex frame indices A:-F: (ISO-TP index cycles 0-F), not just
  0-9, so consecutive frames past the 10th aren't dropped.
- read_freeze_frame sends the correct '020200' (svc 02, PID 02, frame 00) and
  skips SID+PID+frame (+3), fixing the off-by-one that mis-read the freeze DTC.
- tests/test_dtc_parse.py: single-frame, multi-ECU (no phantom), numbered
  multiframe with a real C03xx continuation, hex index, non-CAN legacy.

Closes #9

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
2026-07-01 19:36:35 -04:00

251 lines
8.9 KiB
Python

"""ElmLink -- ELM327 serial transport for obdcore.
Ported from the validated obd_reader.py: the cmd/read loop, Mode-01/Mode-22
reads, ATRV, protocol negotiation, and DTC read/clear. Returns raw byte
lists (decoding happens in the registry). No GUI, no scheduler.
The terminal tool's ELM class will be migrated to import this (see
ARCHITECTURE.md, P0) once the GUI work stabilises.
"""
import time
try:
import serial
from serial.tools import list_ports
except ImportError: # allow import on machines w/o pyserial
serial = None
list_ports = None
_LETTER = {0: "P", 1: "C", 2: "B", 3: "U"}
def decode_dtc(b1, b2):
return f"{_LETTER[(b1 >> 6) & 3]}{(b1 >> 4) & 3}{b1 & 0xF:X}{b2:02X}"
_HEX = "0123456789ABCDEFabcdef"
def _line_bytes(ln):
ln = ln.replace(" ", "")
if len(ln) >= 2 and ln[1] == ":" and ln[0] in _HEX:
ln = ln[2:] # drop CAN multiframe index "N:" (0-F, cycles)
if not ln or any(c not in _HEX for c in ln):
return []
return [int(ln[i:i + 2], 16) for i in range(0, len(ln) - 1, 2)]
def parse_dtcs(lines, svc, is_can):
pairs = []
if is_can:
# Message-aware: multiple ECUs each reply "<svc> <count> <pairs...>", and
# a DTC's own first byte can equal svc (0x43 == C03xx), so we must NOT
# blind-scan the flattened stream for svc. A line whose payload starts
# with svc begins a new ECU message (drop svc + count byte); an ISO-TP
# numbered continuation "N:" (N>=1) appends raw pairs to the current one.
started = False
for ln in lines:
raw = ln.replace(" ", "")
cont = len(raw) >= 2 and raw[1] == ":" and raw[0] in _HEX and raw[0] != "0"
b = _line_bytes(ln)
if not b:
continue
if not cont and b[0] == svc: # header of an ECU message: svc + count
pairs.extend(b[2:])
started = True
elif started: # continuation / pairs of current message
pairs.extend(b)
# else: bytes before any header (ISO-TP length line, stray) -> ignore
else:
for ln in lines:
data = _line_bytes(ln)
if svc in data:
data = data[data.index(svc) + 1:]
elif data and data[0] == svc:
data = data[1:]
else:
continue
pairs.extend(data)
out, seen = [], set()
for i in range(0, len(pairs) - 1, 2):
b1, b2 = pairs[i], pairs[i + 1]
if b1 == 0 and b2 == 0:
continue
d = decode_dtc(b1, b2)
if d not in seen:
seen.add(d)
out.append(d)
return out
def find_ports():
if list_ports is None:
return []
def score(p):
s = (p.description + " " + (p.manufacturer or "")).lower()
return 0 if ("ch340" in s or "1a86" in s) else 1 if ("serial" in s or "usb" in s) else 2
return sorted(list_ports.comports(), key=score)
class ElmLink:
PROMPT = b">"
def __init__(self, transport, verbose=False):
"""transport: any object with write/read/reset_input_buffer/close.
Use the .serial() / .tcp() / .ble() factory helpers to build one."""
self.io = transport
self.verbose = verbose
self.protocol = "?"
time.sleep(0.3)
self.io.reset_input_buffer()
@classmethod
def serial(cls, port, baud=38400, **kw):
from . import transport as tp
return cls(tp.SerialTransport(port, baud), **kw)
@classmethod
def tcp(cls, host, port=35000, **kw):
from . import transport as tp
return cls(tp.TcpTransport(host, port), **kw)
@classmethod
def ble(cls, address, **kw):
from . import transport as tp
return cls(tp.BleTransport(address), **kw)
# -- low-level --
def cmd(self, s, settle=0.0, timeout=4.0):
self.io.reset_input_buffer()
self.io.write((s + "\r").encode())
if settle:
time.sleep(settle)
buf = bytearray()
deadline = time.time() + timeout
while time.time() < deadline:
chunk = self.io.read(256)
if chunk:
buf += chunk
if self.PROMPT in buf:
break
elif self.PROMPT in buf:
break
raw = buf.decode("ascii", "replace")
if self.verbose:
print(f" [TX {s!r}] -> {raw!r}")
return [ln.strip() for ln in raw.replace(">", "").split("\r")
if ln.strip() and ln.strip() != s]
def init(self):
self.cmd("ATZ", settle=1.0)
for c in ("ATE0", "ATL0", "ATS0", "ATH0", "ATAT1", "ATSP0"):
self.cmd(c)
def fast_timing(self, on=True):
"""Tighten ELM response wait for live polling (on) or restore (off)."""
if on:
self.cmd("ATAT2"); self.cmd("ATST19")
else:
self.cmd("ATAT1"); self.cmd("ATST32")
def connect(self):
for _ in range(3):
resp = "".join(self.cmd("0100", timeout=8.0)).upper()
if "41" in resp and "00" in resp:
self.protocol = " ".join(self.cmd("ATDPN")) or "?"
return True
if any(x in resp for x in ("UNABLE", "NODATA", "NO DATA", "ERROR", "SEARCHING")):
time.sleep(0.5)
self.protocol = " ".join(self.cmd("ATDPN")) or "?"
return False
def is_can(self):
d = self.protocol.replace("A", "").strip()
return d[:1] in ("6", "7", "8", "9")
# -- reads (return list[int] or None) --
def _bytes(self, lines):
return [b for ln in lines for b in _line_bytes(ln)]
def read_m01(self, pid, nbytes, timeout=0.6):
data = self._bytes(self.cmd(f"01{pid}", timeout=timeout))
if 0x41 in data:
i = data.index(0x41)
payload = data[i + 2:i + 2 + nbytes]
if len(payload) == nbytes:
return payload
return None
def read_m22(self, pid, timeout=0.5):
data = self._bytes(self.cmd(f"22{pid}", timeout=timeout))
# response: 62 <pid hi> <pid lo> <data...>
if 0x62 in data:
i = data.index(0x62)
return data[i + 3:] or None
return None
def read_atrv(self, timeout=0.8):
s = " ".join(self.cmd("ATRV", timeout=timeout)).replace("V", "").strip()
try:
return float(s)
except ValueError:
return None
def read_raw(self, hexcmd, timeout=2.0):
"""Send an arbitrary hex command and return the flattened response
bytes (for bi-directional actions / service routines)."""
return self._bytes(self.cmd(hexcmd, timeout=timeout))
# -- DTCs --
def read_dtcs(self, mode, svc, timeout=5.0):
lines = self.cmd(mode, timeout=timeout)
if "NODATA" in "".join(lines).upper().replace(" ", ""):
return []
return parse_dtcs(lines, svc, self.is_can())
def clear_dtcs(self):
lines = self.cmd("04", timeout=6.0)
data = self._bytes(lines)
return 0x44 in data or ("OK" in "".join(lines).upper())
# -- standard OBD services (Mode 09 / 01-01 / 02) --
def read_vehicle_info(self, timeout=2.0):
"""Mode 09: VIN + calibration IDs + ECU name. Returns a dict."""
from . import obdservices as svc
vin = svc.decode_vin(self._bytes(self.cmd("0902", timeout=timeout)))
cal = svc.decode_ascii_block(self._bytes(self.cmd("0904", timeout=timeout)), 0x04)
ecu = svc.decode_ascii_block(self._bytes(self.cmd("090A", timeout=timeout)), 0x0A)
return {"vin": vin, "calibration": cal, "ecu_name": ecu}
def read_readiness(self, timeout=1.0):
"""Mode 01 PID 01: MIL, DTC count, and I-M readiness monitors."""
from . import obdservices as svc
data = self.read_m01("01", 4, timeout=timeout)
return svc.decode_readiness(data) if data else None
def read_freeze_frame(self, timeout=0.6):
"""Mode 02: the DTC that set the freeze frame + the standard PID snapshot."""
from . import obdservices as svc
out = {"dtc": None, "values": []}
d = self._bytes(self.cmd("020200", timeout=timeout)) # svc 02, PID 02, frame 00
if 0x42 in d:
r = d[d.index(0x42) + 3:] # skip 42 (SID) 02 (PID) 00 (frame)
if len(r) >= 2 and (r[0] or r[1]):
out["dtc"] = decode_dtc(r[0], r[1])
for name, pid, nbytes, dec, unit in svc.FREEZE_PIDS:
dd = self._bytes(self.cmd(f"02{pid}00", timeout=timeout))
if 0x42 in dd:
payload = dd[dd.index(0x42) + 3:dd.index(0x42) + 3 + nbytes]
if len(payload) == nbytes:
try:
out["values"].append((name, dec(payload), unit))
except Exception:
pass
return out
def close(self):
try:
self.io.close()
except Exception:
pass