"""ElmLink -- ELM327 serial transport for obdcore. Ported from the validated obd_reader.py: the cmd/read loop, Mode-01/Mode-22 reads, ATRV, protocol negotiation, and DTC read/clear. Returns raw byte lists (decoding happens in the registry). No GUI, no scheduler. The terminal tool's ELM class will be migrated to import this (see ARCHITECTURE.md, P0) once the GUI work stabilises. """ import time try: import serial from serial.tools import list_ports except ImportError: # allow import on machines w/o pyserial serial = None list_ports = None _LETTER = {0: "P", 1: "C", 2: "B", 3: "U"} def decode_dtc(b1, b2): return f"{_LETTER[(b1 >> 6) & 3]}{(b1 >> 4) & 3}{b1 & 0xF:X}{b2:02X}" def _line_bytes(ln): ln = ln.replace(" ", "") if len(ln) >= 2 and ln[1] == ":" and ln[0] in "0123456789": ln = ln[2:] # drop CAN multiframe index "N:" if not ln or any(c not in "0123456789ABCDEFabcdef" for c in ln): return [] return [int(ln[i:i + 2], 16) for i in range(0, len(ln) - 1, 2)] def parse_dtcs(lines, svc, is_can): pairs = [] if is_can: data = [b for ln in lines for b in _line_bytes(ln)] if svc in data: data = data[data.index(svc) + 1:] data = data[1:] if data else data pairs = data else: for ln in lines: data = _line_bytes(ln) if svc in data: data = data[data.index(svc) + 1:] elif data and data[0] == svc: data = data[1:] else: continue pairs.extend(data) out, seen = [], set() for i in range(0, len(pairs) - 1, 2): b1, b2 = pairs[i], pairs[i + 1] if b1 == 0 and b2 == 0: continue d = decode_dtc(b1, b2) if d not in seen: seen.add(d) out.append(d) return out def find_ports(): if list_ports is None: return [] def score(p): s = (p.description + " " + (p.manufacturer or "")).lower() return 0 if ("ch340" in s or "1a86" in s) else 1 if ("serial" in s or "usb" in s) else 2 return sorted(list_ports.comports(), key=score) class ElmLink: PROMPT = b">" def __init__(self, port, baud=38400, verbose=False): if serial is None: raise RuntimeError("pyserial not installed (pip install pyserial)") self.verbose = verbose self.ser = serial.Serial(port, baud, timeout=0.2) self.protocol = "?" time.sleep(0.3) self.ser.reset_input_buffer() # -- low-level -- def cmd(self, s, settle=0.0, timeout=4.0): self.ser.reset_input_buffer() self.ser.write((s + "\r").encode()) if settle: time.sleep(settle) buf = bytearray() deadline = time.time() + timeout while time.time() < deadline: chunk = self.ser.read(256) if chunk: buf += chunk if self.PROMPT in buf: break elif self.PROMPT in buf: break raw = buf.decode("ascii", "replace") if self.verbose: print(f" [TX {s!r}] -> {raw!r}") return [ln.strip() for ln in raw.replace(">", "").split("\r") if ln.strip() and ln.strip() != s] def init(self): self.cmd("ATZ", settle=1.0) for c in ("ATE0", "ATL0", "ATS0", "ATH0", "ATAT1", "ATSP0"): self.cmd(c) def fast_timing(self, on=True): """Tighten ELM response wait for live polling (on) or restore (off).""" if on: self.cmd("ATAT2"); self.cmd("ATST19") else: self.cmd("ATAT1"); self.cmd("ATST32") def connect(self): for _ in range(3): resp = "".join(self.cmd("0100", timeout=8.0)).upper() if "41" in resp and "00" in resp: self.protocol = " ".join(self.cmd("ATDPN")) or "?" return True if any(x in resp for x in ("UNABLE", "NODATA", "NO DATA", "ERROR", "SEARCHING")): time.sleep(0.5) self.protocol = " ".join(self.cmd("ATDPN")) or "?" return False def is_can(self): d = self.protocol.replace("A", "").strip() return d[:1] in ("6", "7", "8", "9") # -- reads (return list[int] or None) -- def _bytes(self, lines): return [b for ln in lines for b in _line_bytes(ln)] def read_m01(self, pid, nbytes, timeout=0.6): data = self._bytes(self.cmd(f"01{pid}", timeout=timeout)) if 0x41 in data: i = data.index(0x41) payload = data[i + 2:i + 2 + nbytes] if len(payload) == nbytes: return payload return None def read_m22(self, pid, timeout=0.5): data = self._bytes(self.cmd(f"22{pid}", timeout=timeout)) # response: 62 if 0x62 in data: i = data.index(0x62) return data[i + 3:] or None return None def read_atrv(self, timeout=0.8): s = " ".join(self.cmd("ATRV", timeout=timeout)).replace("V", "").strip() try: return float(s) except ValueError: return None # -- DTCs -- def read_dtcs(self, mode, svc, timeout=5.0): lines = self.cmd(mode, timeout=timeout) if "NODATA" in "".join(lines).upper().replace(" ", ""): return [] return parse_dtcs(lines, svc, self.is_can()) def clear_dtcs(self): lines = self.cmd("04", timeout=6.0) data = self._bytes(lines) return 0x44 in data or ("OK" in "".join(lines).upper()) def close(self): try: self.ser.close() except Exception: pass