From 6bee9c0d7f856ed5aeb072d34303d29aacc09c0a Mon Sep 17 00:00:00 2001 From: Justin Paul Date: Tue, 30 Jun 2026 13:41:24 -0400 Subject: [PATCH] Scaffold obdcore (headless acquisition core) + ARCHITECTURE.md Foundation for the PySide6 + pyqtgraph Windows GUI, shared with the terminal tool. Pure data/IO -- no Qt, no curses. obdcore/ link.py ElmLink -- ELM327 serial (Mode-01/22, ATRV, DTC read/clear) mock.py MockLink -- synthetic crank for tests + GUI dev (no truck) registry.py PidRegistry (verified Ford 6.0 PIDs + confidence) + DtcDatabase scheduler.py PollScheduler -- prioritized round-robin polling, dead-PID park, derived channels; tick() is fake-clock test-drivable store.py TimeSeriesStore (ring buffers + min/max) + CsvRecorder/replay Design centers on the ELM327 bandwidth limit (~7-15 reads/sec): the active view subscribes PIDs at chosen rates; acquisition runs off the UI thread; the GUI only reads the store. FICM_M (09D0) promoted to verified after the 2026-06-30 on-truck crank read (48.0V, intermittent). tests/test_obdcore.py: decoders vs real truck bytes, crank ramp + peak, derived BOOST, dead-PID park/revive, record/replay roundtrip -- all pass. ARCHITECTURE.md: layers, data model, GUI plan, 6.0 stock-PID limits (no EGT/oil-PSI), feature backlog, P0-P5 roadmap. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs --- ARCHITECTURE.md | 134 +++++++++++++++++++++++++++++++ obdcore/__init__.py | 20 +++++ obdcore/link.py | 179 ++++++++++++++++++++++++++++++++++++++++++ obdcore/mock.py | 62 +++++++++++++++ obdcore/registry.py | 164 ++++++++++++++++++++++++++++++++++++++ obdcore/scheduler.py | 137 ++++++++++++++++++++++++++++++++ obdcore/store.py | 126 +++++++++++++++++++++++++++++ tests/test_obdcore.py | 117 +++++++++++++++++++++++++++ 8 files changed, 939 insertions(+) create mode 100644 ARCHITECTURE.md create mode 100644 obdcore/__init__.py create mode 100644 obdcore/link.py create mode 100644 obdcore/mock.py create mode 100644 obdcore/registry.py create mode 100644 obdcore/scheduler.py create mode 100644 obdcore/store.py create mode 100644 tests/test_obdcore.py diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 0000000..6aa34b2 --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,134 @@ +# ford-obd — Architecture & Roadmap + +Plan for growing the terminal `obd_reader.py` into a graphical Windows scan +tool for the 6.0L Power Stroke, on a shared headless core. + +## Vision + +A desktop app with: +- a **PID browser** (searchable list, live values) on the side, +- a flexible **graph workspace** — overlay many metrics on one plot, split into + a grid of single-metric plots, or show gauges (one metric each), +- purpose-built **perspectives**: Cranking, Driving, Diagnostics (DTCs), Logging, +- a **Ford DTC database** behind the codes page, +- **session record + playback** so intermittent faults can be reviewed offline. + +## The constraint that shapes everything: ELM327 bandwidth + +The ELM327 is a one-request-at-a-time link: each Mode-22 round-trip is +~40–150 ms over the CH340, so total throughput is **~7–15 PID reads/sec**, +shared across the whole UI. Consequences baked into the design: + +- A **prioritized polling scheduler** owns the link. The active view subscribes + the PIDs it needs at the rates it needs (cranking = ICP fast, ignore the rest). +- Acquisition runs **off the UI thread**; the GUI only reads the store. +- Per-PID rates and dead-PID parking keep the sample rate from collapsing. +- A faster adapter (**OBDLink SX/MX+**, STN chip — batched PIDs, faster + protocols) multiplies throughput; the link layer is abstracted so either works. + +## Layers + +``` + +------------------- GUI (PySide6 + pyqtgraph) -------------------+ + | PID browser | Graph workspace | Perspectives | DTC page | Log | + +--------------------------------|-------------------------------+ + reads only | (Qt thread) + +--------------------------------v-------------------------------+ + | obdcore (headless) | + | PollScheduler --reads--> ElmLink / MockLink --serial--> PCM | + | | pushes samples | + | v | + | TimeSeriesStore <--- CsvRecorder / replay_csv | + | PidRegistry (verified PIDs) DtcDatabase | + +-----------------------------------------------------------------+ +``` + +`obdcore` is pure data/IO — no Qt, no curses — so it's shared by the terminal +tool, the GUI, and tests. + +### obdcore modules (built, tested) + +| Module | Responsibility | +|---|---| +| `link.py` `ElmLink` | ELM327 serial: init, protocol negotiate, Mode-01/22 reads, ATRV, DTC read/clear. Returns raw bytes. | +| `mock.py` `MockLink` | Synthetic crank (ICP ramp, FICM ~48V, batt sag) — same interface; powers tests + GUI dev with no truck. | +| `registry.py` `PidRegistry` | Verified Ford 6.0 PID table (corrected addresses + scaling + confidence) and subscription presets. `DtcDatabase` seeds the code DB. | +| `scheduler.py` `PollScheduler` | Prioritized round-robin polling; per-PID Hz; derived channels; dead-PID park/revive. `tick()` is test-drivable with a fake clock. | +| `store.py` `TimeSeriesStore` | Per-PID ring buffers + min/max; `CsvRecorder` (long format) + `replay_csv` for record/playback. | + +Tests: `tests/test_obdcore.py` — decoders vs real truck bytes, crank ramp + +peak capture, derived BOOST, dead-PID parking, record/replay round-trip. + +## Data model + +- **Pid**: key, name, mode (`01`/`22`/`atrv`/`derived`), pid hex, nbytes, + decode fn, unit, group, vmin/vmax, **confidence** (`verified|doc|tentative`), + deps (for derived), notes. +- **Channel**: rolling `(t, value)` + session min/max. +- **Derived/virtual channels**: e.g. `BOOST = MAP − BARO`; later `ICP_error = + ICP_DES − ICP`, `FICM_sag = FICM_M − FICM_V`. + +## GUI plan (PySide6 + pyqtgraph) + +- **Left dock**: `QTreeView` PID browser grouped by system (fuel/ficm/air/…), + live value + confidence badge, checkbox to add to the focused panel. +- **Central workspace**: dockable/tabbed panels. Panel types: + - *Overlay plot* — many PIDs, multiple Y-axes (pyqtgraph `ViewBox` linking). + - *Split grid* — one plot per PID. + - *Gauge* — radial/linear single metric with warn/crit bands. + Drag a PID from the browser onto a panel to add it. +- **Perspectives** (saved layouts): + - **Cranking** — ICP big readout + 500-psi firing line + peak-hold + trace + (port of terminal `--crank`). + - **Driving** — boost (MGP), EOT, ECT, EBP, load, RPM, IPR, FICM, trans temp. + - **Diagnostics** — DTC read/clear (guarded) + freeze frame, joined to the + Ford DTC DB (description, causes, no-start relevance). + - **Logging/Playback** — record a session; scrub/replay through the graphs. +- **Settings** — COM port, baud, protocol, per-PID rates, units (psi/°F vs kPa/°C), + dark/night theme. +- **Bottom status** — adapter, protocol, port voltage, dropped-response rate. + +### Honest 6.0 data-stream limits + +Two commonly-wanted gauges are **not in the stock 6.0 PCM stream** from the OBD +port and need aftermarket sensors: +- **Engine oil PRESSURE** — only ICP (injection oil) + EOT (oil temp) exist; lube + pressure is an idiot-light switch, not a PID. +- **EGT** — only EBP (exhaust *back pressure*) exists; exhaust *gas temperature* + is an add-on pyrometer. + +Plan: present what the PCM exposes; design an **aux-input** path so external +sensors (e.g. a serial/analog EGT/oil-PSI module) can feed extra channels later. + +## Additional features (backlog) + +- Session record + **playback/scrub** (highest value; foundation already in `store`). +- Bi-directional tests: **KOEO/KOER self-tests, injector buzz, cylinder + contribution/balance**. +- **Alarms** + min/max hold per PID (EOT>230°F, ICP<500 cranking). +- Timeline **annotations** ("started cranking", "stabbed throttle"). +- **Computed channels** (ICP error, FICM sag). +- **Multi-vehicle profiles** + per-truck DTC history. +- **Export/report** (CSV/PDF, graph screenshots, one-click "share state"). +- **PID discovery scan** in GUI (the brute-scan, auto-add hits). +- Units toggle, night theme, big-touch cab mode. + +## Roadmap + +- **P0 — core (this commit):** `obdcore` package + tests + this doc. *Next:* + migrate `obd_reader.py` to import `obdcore` (remove the duplicated ELM/PID + logic) so terminal + GUI share one source of truth. +- **P1 — GUI shell:** PySide6 window, connect dialog, PID browser, one overlay + plot fed by the scheduler/store. Validate against `MockLink` first. +- **P2 — panels + perspectives:** split plots, gauges, Cranking + Driving views. +- **P3 — diagnostics:** DTC read/clear page + Ford DTC DB (built by the + cross-verified workflow, same method as the PID hunt). +- **P4 — record/playback + alarms + computed channels.** +- **P5 — packaging:** PyInstaller one-file `.exe` (+ CH340 driver note), + optional code-signing; OBDLink/STN fast-path support. + +## Dependencies + +- Runtime (core): `pyserial`. +- GUI: `PySide6`, `pyqtgraph`, `numpy`. +- Dev: `pytest`, `pyinstaller`. diff --git a/obdcore/__init__.py b/obdcore/__init__.py new file mode 100644 index 0000000..5f38efc --- /dev/null +++ b/obdcore/__init__.py @@ -0,0 +1,20 @@ +"""obdcore -- headless OBD-II acquisition core for the ford-obd project. + +Layered, GUI-agnostic foundation shared by the terminal tool and the +forthcoming PySide6 + pyqtgraph Windows app: + + link.py ElmLink -- ELM327 serial transport (+ MockLink in mock.py) + registry.py PidRegistry -- verified Ford 6.0 PID table + DTC database + scheduler.py PollScheduler -- prioritized round-robin polling engine + store.py TimeSeriesStore -- ring buffers, min/max, record/replay + +See ARCHITECTURE.md for the full design and roadmap. +""" +from .registry import PidRegistry, DtcDatabase, Pid, Dtc, PRESETS +from .store import TimeSeriesStore, CsvRecorder, replay_csv +from .scheduler import PollScheduler + +__all__ = [ + "PidRegistry", "DtcDatabase", "Pid", "Dtc", "PRESETS", + "TimeSeriesStore", "CsvRecorder", "replay_csv", "PollScheduler", +] diff --git a/obdcore/link.py b/obdcore/link.py new file mode 100644 index 0000000..2b9b8e8 --- /dev/null +++ b/obdcore/link.py @@ -0,0 +1,179 @@ +"""ElmLink -- ELM327 serial transport for obdcore. + +Ported from the validated obd_reader.py: the cmd/read loop, Mode-01/Mode-22 +reads, ATRV, protocol negotiation, and DTC read/clear. Returns raw byte +lists (decoding happens in the registry). No GUI, no scheduler. + +The terminal tool's ELM class will be migrated to import this (see +ARCHITECTURE.md, P0) once the GUI work stabilises. +""" +import time + +try: + import serial + from serial.tools import list_ports +except ImportError: # allow import on machines w/o pyserial + serial = None + list_ports = None + +_LETTER = {0: "P", 1: "C", 2: "B", 3: "U"} + + +def decode_dtc(b1, b2): + return f"{_LETTER[(b1 >> 6) & 3]}{(b1 >> 4) & 3}{b1 & 0xF:X}{b2:02X}" + + +def _line_bytes(ln): + ln = ln.replace(" ", "") + if len(ln) >= 2 and ln[1] == ":" and ln[0] in "0123456789": + ln = ln[2:] # drop CAN multiframe index "N:" + if not ln or any(c not in "0123456789ABCDEFabcdef" for c in ln): + return [] + return [int(ln[i:i + 2], 16) for i in range(0, len(ln) - 1, 2)] + + +def parse_dtcs(lines, svc, is_can): + pairs = [] + if is_can: + data = [b for ln in lines for b in _line_bytes(ln)] + if svc in data: + data = data[data.index(svc) + 1:] + data = data[1:] if data else data + pairs = data + else: + for ln in lines: + data = _line_bytes(ln) + if svc in data: + data = data[data.index(svc) + 1:] + elif data and data[0] == svc: + data = data[1:] + else: + continue + pairs.extend(data) + out, seen = [], set() + for i in range(0, len(pairs) - 1, 2): + b1, b2 = pairs[i], pairs[i + 1] + if b1 == 0 and b2 == 0: + continue + d = decode_dtc(b1, b2) + if d not in seen: + seen.add(d) + out.append(d) + return out + + +def find_ports(): + if list_ports is None: + return [] + def score(p): + s = (p.description + " " + (p.manufacturer or "")).lower() + return 0 if ("ch340" in s or "1a86" in s) else 1 if ("serial" in s or "usb" in s) else 2 + return sorted(list_ports.comports(), key=score) + + +class ElmLink: + PROMPT = b">" + + def __init__(self, port, baud=38400, verbose=False): + if serial is None: + raise RuntimeError("pyserial not installed (pip install pyserial)") + self.verbose = verbose + self.ser = serial.Serial(port, baud, timeout=0.2) + self.protocol = "?" + time.sleep(0.3) + self.ser.reset_input_buffer() + + # -- low-level -- + def cmd(self, s, settle=0.0, timeout=4.0): + self.ser.reset_input_buffer() + self.ser.write((s + "\r").encode()) + if settle: + time.sleep(settle) + buf = bytearray() + deadline = time.time() + timeout + while time.time() < deadline: + chunk = self.ser.read(256) + if chunk: + buf += chunk + if self.PROMPT in buf: + break + elif self.PROMPT in buf: + break + raw = buf.decode("ascii", "replace") + if self.verbose: + print(f" [TX {s!r}] -> {raw!r}") + return [ln.strip() for ln in raw.replace(">", "").split("\r") + if ln.strip() and ln.strip() != s] + + def init(self): + self.cmd("ATZ", settle=1.0) + for c in ("ATE0", "ATL0", "ATS0", "ATH0", "ATAT1", "ATSP0"): + self.cmd(c) + + def fast_timing(self, on=True): + """Tighten ELM response wait for live polling (on) or restore (off).""" + if on: + self.cmd("ATAT2"); self.cmd("ATST19") + else: + self.cmd("ATAT1"); self.cmd("ATST32") + + def connect(self): + for _ in range(3): + resp = "".join(self.cmd("0100", timeout=8.0)).upper() + if "41" in resp and "00" in resp: + self.protocol = " ".join(self.cmd("ATDPN")) or "?" + return True + if any(x in resp for x in ("UNABLE", "NODATA", "NO DATA", "ERROR", "SEARCHING")): + time.sleep(0.5) + self.protocol = " ".join(self.cmd("ATDPN")) or "?" + return False + + def is_can(self): + d = self.protocol.replace("A", "").strip() + return d[:1] in ("6", "7", "8", "9") + + # -- reads (return list[int] or None) -- + def _bytes(self, lines): + return [b for ln in lines for b in _line_bytes(ln)] + + def read_m01(self, pid, nbytes, timeout=0.6): + data = self._bytes(self.cmd(f"01{pid}", timeout=timeout)) + if 0x41 in data: + i = data.index(0x41) + payload = data[i + 2:i + 2 + nbytes] + if len(payload) == nbytes: + return payload + return None + + def read_m22(self, pid, timeout=0.5): + data = self._bytes(self.cmd(f"22{pid}", timeout=timeout)) + # response: 62 + if 0x62 in data: + i = data.index(0x62) + return data[i + 3:] or None + return None + + def read_atrv(self, timeout=0.8): + s = " ".join(self.cmd("ATRV", timeout=timeout)).replace("V", "").strip() + try: + return float(s) + except ValueError: + return None + + # -- DTCs -- + def read_dtcs(self, mode, svc, timeout=5.0): + lines = self.cmd(mode, timeout=timeout) + if "NODATA" in "".join(lines).upper().replace(" ", ""): + return [] + return parse_dtcs(lines, svc, self.is_can()) + + def clear_dtcs(self): + lines = self.cmd("04", timeout=6.0) + data = self._bytes(lines) + return 0x44 in data or ("OK" in "".join(lines).upper()) + + def close(self): + try: + self.ser.close() + except Exception: + pass diff --git a/obdcore/mock.py b/obdcore/mock.py new file mode 100644 index 0000000..87b26b2 --- /dev/null +++ b/obdcore/mock.py @@ -0,0 +1,62 @@ +"""MockLink -- a synthetic ElmLink for tests and GUI development without a +truck. Simulates a cranking 6.0: ICP ramps toward ~540 psi, FICM holds ~48V, +battery sags, MAP/BARO sit at atmospheric. Same read interface as ElmLink. +""" + + +class MockLink: + def __init__(self, clock): + self.clock = clock # callable -> float seconds + self.t0 = clock() + self.protocol = "A6" + + def init(self): + pass + + def fast_timing(self, on=True): + pass + + def connect(self): + return True + + def is_can(self): + return True + + def _u16le(self, raw16): + return [(raw16 >> 8) & 0xFF, raw16 & 0xFF] + + def read_m22(self, pid, timeout=0.5): + el = self.clock() - self.t0 + if pid == "1446": # ICP: ramps 0 -> 540 over ~2.7s + return self._u16le(int(min(540, el * 200) / 0.57)) + if pid == "09D0": # FICM main ~48V (0x3000) + return self._u16le(0x3000) + if pid == "1440": # MAP atmospheric + return [0x01, 0x89] + if pid == "1442": # BARO atmospheric + return [0x01, 0x88] + if pid == "1445": # EBP atmospheric + return [0x01, 0x8F] + if pid == "1310": # EOT ~33C + return [0x1C, 0x92] + return None # everything else: no response + + def read_m01(self, pid, nbytes, timeout=0.6): + if pid == "0C": # RPM 0 at rest + return [0x00, 0x00] + if pid == "05": # ECT 82C + return [122] + return None + + def read_atrv(self, timeout=0.8): + el = self.clock() - self.t0 + return 10.6 if el < 2.5 else 12.5 # crank sag then recover + + def read_dtcs(self, mode, svc, timeout=5.0): + return ["P0148"] if mode == "03" else [] + + def clear_dtcs(self): + return True + + def close(self): + pass diff --git a/obdcore/registry.py b/obdcore/registry.py new file mode 100644 index 0000000..101ec63 --- /dev/null +++ b/obdcore/registry.py @@ -0,0 +1,164 @@ +"""PID + DTC registry for the Ford 6.0L Power Stroke (plus generic OBD-II). + +Canonical home for the verified Mode-22 addresses, scaling, and the DTC +database. Decoders are plain callables on the raw byte list. Confidence: + verified -- multi-source AND confirmed on the truck's scan/crank + doc -- corroborated in sources, not (yet) read on the truck + tentative -- single-source or disputed scaling + +PID numbers/scaling corrected 2026-06-29 by the ford-60-pid-hunt workflow; +see diagnostics/2026-06-29-no-start/pid-research.md. 09D0 (FICM Main) was +confirmed on-truck 2026-06-30 (read 48.0V during a crank, intermittent). +""" +from dataclasses import dataclass, field +from typing import Callable, Tuple + + +def _u16(b): + return (b[0] << 8) + b[1] + + +@dataclass +class Pid: + key: str + name: str + mode: str # "01" | "22" | "atrv" | "derived" + pid: str = "" # hex: "1446" (m22) or "0C" (m01) + nbytes: int = 2 + decode: Callable = None # m01/m22: f(raw_bytes); derived: f(dep_values) + unit: str = "" + group: str = "misc" # fuel | ficm | air | engine | driveline | power + vmin: float = 0.0 + vmax: float = 100.0 + confidence: str = "verified" + deps: Tuple[str, ...] = () # for derived channels + notes: str = "" + + +def _build(): + P = [] + a = P.append + # ---- Ford-enhanced Mode 22 -- pressures / fuel ---- + a(Pid("ICP", "Injection Control Pressure", "22", "1446", 2, + lambda b: round(_u16(b) * 0.57, 1), "psi", "fuel", 0, 3500, + "verified", notes="need ~500+ psi to fire")) + a(Pid("ICP_V", "ICP Sensor Voltage", "22", "16AD", 2, + lambda b: round(_u16(b) * 0.000072, 4), "V", "fuel", 0, 5, + "tentative", notes="single-source")) + a(Pid("IPR", "Injection Pressure Regulator", "22", "1434", 1, + lambda b: round(b[0] * 13.53 / 35, 1), "%", "fuel", 0, 100, + "tentative", notes="KOEO ~14-15%, cranking ~30-40%")) + a(Pid("MAP", "Manifold Absolute Pressure", "22", "1440", 2, + lambda b: round(_u16(b) * 0.03625, 2), "psia", "air", 0, 60, + "verified")) + a(Pid("BARO", "Barometric Pressure", "22", "1442", 2, + lambda b: round(_u16(b) * 0.03625, 2), "psia", "air", 0, 20, + "verified")) + a(Pid("EBP", "Exhaust Back Pressure", "22", "1445", 2, + lambda b: round(_u16(b) * 0.03625, 2), "psia", "air", 0, 60, + "verified", notes="minus BARO = gauge")) + a(Pid("EOT", "Engine Oil Temperature", "22", "1310", 2, + lambda b: round(_u16(b) / 100.0 - 40, 1), "C", "engine", -40, 160, + "verified")) + # ---- FICM ---- + a(Pid("FICM_M", "FICM Main Power", "22", "09D0", 2, + lambda b: round(_u16(b) / 256.0, 1), "V", "ficm", 0, 55, + "verified", notes="~48V; <45 suspect; reads intermittently while cranking")) + a(Pid("FICM_L", "FICM Logic Power", "22", "09CF", 2, + lambda b: round(_u16(b) / 256.0, 1), "V", "ficm", 0, 16, + "doc")) + a(Pid("FICM_V", "FICM Vehicle Power", "22", "09CE", 2, + lambda b: round(_u16(b) / 256.0, 1), "V", "ficm", 0, 16, + "doc")) + a(Pid("FICM_SYNC", "FICM Sync", "22", "09CD", 1, + lambda b: (b[0] >> 1) & 1, "", "ficm", 0, 1, + "doc", notes="1=in sync, 0=no sync")) + # ---- Driveline ---- + a(Pid("GEAR", "Current Gear", "22", "11B3", 1, + lambda b: b[0] // 2, "", "driveline", 0, 6, "verified")) + a(Pid("TSS", "Trans Input Shaft Speed", "22", "11B4", 2, + lambda b: round(_u16(b) / 4), "rpm", "driveline", 0, 4000, "verified")) + # ---- Generic Mode 01 ---- + a(Pid("RPM", "Engine RPM", "01", "0C", 2, + lambda b: round(_u16(b) / 4), "rpm", "engine", 0, 4000, "verified")) + a(Pid("ECT", "Engine Coolant Temp", "01", "05", 1, + lambda b: b[0] - 40, "C", "engine", -40, 160, "verified")) + a(Pid("IAT", "Intake Air Temp", "01", "0F", 1, + lambda b: b[0] - 40, "C", "air", -40, 160, "verified")) + a(Pid("LOAD", "Engine Load", "01", "04", 1, + lambda b: round(b[0] * 100 / 255), "%", "engine", 0, 100, "verified")) + a(Pid("VPCM", "Module Voltage", "01", "42", 2, + lambda b: round(_u16(b) / 1000.0, 2), "V", "power", 0, 16, "verified")) + # ---- Pseudo / derived ---- + a(Pid("BATT", "Battery (OBD port)", "atrv", "", 0, + None, "V", "power", 0, 16, "verified")) + a(Pid("BOOST", "Boost (MGP)", "derived", "", 0, + lambda vals: round(vals[0] - vals[1], 2), "psi", "air", -5, 40, + "verified", deps=("MAP", "BARO"), notes="MAP - BARO")) + return P + + +# Subscription presets per perspective (key -> default poll Hz set by scheduler) +PRESETS = { + "crank": ["ICP", "FICM_M", "BATT", "RPM"], + "driving": ["BOOST", "EOT", "ECT", "EBP", "LOAD", "RPM", "IPR", "FICM_M", "BATT"], + "vitals": ["ICP", "FICM_M", "FICM_L", "IPR", "BATT", "RPM", "ECT", "EOT", + "IAT", "VPCM"], +} + + +class PidRegistry: + def __init__(self): + self._by_key = {p.key: p for p in _build()} + + def get(self, key): + return self._by_key.get(key) + + def all(self): + return list(self._by_key.values()) + + def group(self, g): + return [p for p in self._by_key.values() if p.group == g] + + def preset(self, name): + return [self._by_key[k] for k in PRESETS.get(name, []) if k in self._by_key] + + +# --------------------------------------------------------------------------- +# DTC database -- generic SAE + notable Ford 6.0 codes. The full Ford code +# DB is being built by a separate cross-verified workflow; this is the seed. +# --------------------------------------------------------------------------- +@dataclass +class Dtc: + code: str + desc: str + system: str = "powertrain" + no_start: bool = False + causes: str = "" + + +def _dtcs(): + rows = [ + Dtc("P0087", "Fuel rail/system pressure too LOW", "fuel", True), + Dtc("P0088", "Fuel rail/system pressure too HIGH", "fuel"), + Dtc("P0148", "Fuel delivery error (low pressure / HPOP / IPR)", "fuel", True), + Dtc("P0335", "Crankshaft position (CKP) sensor circuit", "engine", True), + Dtc("P0340", "Camshaft position (CMP) sensor circuit", "engine", True), + Dtc("P0611", "FICM performance", "ficm", True), + Dtc("P1316", "Injector circuit/FICM codes detected", "ficm", True), + Dtc("P0606", "PCM processor fault", "power", True), + Dtc("U0100", "Lost communication with PCM/ECM", "network", True), + Dtc("P0670", "Glow plug control module circuit", "engine"), + ] + return {d.code: d for d in rows} + + +class DtcDatabase: + def __init__(self): + self._db = _dtcs() + + def get(self, code): + return self._db.get(code) or Dtc(code, "(unknown - look up this code)") + + def all(self): + return list(self._db.values()) diff --git a/obdcore/scheduler.py b/obdcore/scheduler.py new file mode 100644 index 0000000..6cdfbd2 --- /dev/null +++ b/obdcore/scheduler.py @@ -0,0 +1,137 @@ +"""PollScheduler -- the prioritized acquisition engine. + +The ELM327 is a one-request-at-a-time straw (~7-15 reads/sec total). This +scheduler holds a subscription set (PID key -> target Hz) and, each tick, +reads the PIDs that are due, round-robin, pushing timestamped samples into the +TimeSeriesStore. Dead PIDs (4 consecutive no-responses) are parked and +periodically revived so they don't starve the sample rate. + +Acquisition runs on a background thread; tick() is also public so tests can +drive it deterministically with a fake clock (no threads, no sleeps). +""" +import threading +import time + + +class _Sub: + __slots__ = ("key", "period", "next_due", "fails", "active") + + def __init__(self, key, period): + self.key = key + self.period = period + self.next_due = 0.0 + self.fails = 0 + self.active = True + + +class PollScheduler: + def __init__(self, link, registry, store, clock=time.time, dead_after=4, + revive_every=5.0): + self.link = link + self.reg = registry + self.store = store + self.clock = clock + self.dead_after = dead_after + self.revive_every = revive_every + self._subs = {} + self._lock = threading.Lock() + self._thread = None + self._running = False + self._last_revive = 0.0 + + # -- subscription management -- + def set_subscriptions(self, specs): + """specs: iterable of (key, hz). Replaces the whole set.""" + with self._lock: + self._subs = {k: _Sub(k, (1.0 / hz) if hz > 0 else 0.5) for k, hz in specs} + + def subscribe(self, key, hz): + with self._lock: + self._subs[key] = _Sub(key, (1.0 / hz) if hz > 0 else 0.5) + + def unsubscribe(self, key): + with self._lock: + self._subs.pop(key, None) + + def subscriptions(self): + with self._lock: + return list(self._subs.keys()) + + # -- the core read of a single PID -- + def _read(self, p, frame_vals): + if p.mode == "atrv": + return self.link.read_atrv() + if p.mode == "derived": + vals = [frame_vals.get(d, self.store.latest(d)) for d in p.deps] + if any(v is None for v in vals): + return None + try: + return p.decode(vals) + except Exception: + return None + raw = (self.link.read_m01(p.pid, p.nbytes) if p.mode == "01" + else self.link.read_m22(p.pid)) + if not raw: + return None + try: + return p.decode(raw) + except Exception: + return None + + def tick(self, now=None): + """Read all due PIDs once. Returns number of PIDs read.""" + now = self.clock() if now is None else now + if now - self._last_revive >= self.revive_every: + with self._lock: + for s in self._subs.values(): + if not s.active: + s.active, s.fails = True, 0 + self._last_revive = now + + with self._lock: + due = [s for s in self._subs.values() if s.active and now >= s.next_due] + due.sort(key=lambda s: s.next_due) + + frame_vals = {} + # non-derived first so derived channels can use this frame's values + order = sorted(due, key=lambda s: 1 if _is_derived(self.reg, s.key) else 0) + for s in order: + p = self.reg.get(s.key) + if p is None: + continue + v = self._read(p, frame_vals) + frame_vals[s.key] = v + self.store.push(s.key, self.clock(), v) + s.next_due = now + s.period + if v is None: + s.fails += 1 + if s.fails >= self.dead_after: + s.active = False + else: + s.fails = 0 + return len(order) + + # -- background thread -- + def start(self): + if self._running: + return + self._running = True + self._thread = threading.Thread(target=self._loop, daemon=True) + self._thread.start() + + def _loop(self): + while self._running: + n = self.tick() + if n == 0: + time.sleep(0.005) # nothing due; yield + + def stop(self): + self._running = False + if self._thread: + self._thread.join(timeout=2.0) + self._thread = None + + +def _is_derived(reg, key): + p = reg.get(key) + return bool(p and p.mode == "derived") diff --git a/obdcore/store.py b/obdcore/store.py new file mode 100644 index 0000000..bcb5078 --- /dev/null +++ b/obdcore/store.py @@ -0,0 +1,126 @@ +"""Time-series store: per-channel ring buffers + min/max, with an optional +recorder for full-session capture/playback. + +The acquisition thread pushes samples here; the GUI (or terminal) only reads. +This decoupling is what keeps the UI smooth while the ELM327 plods along at +~7-15 reads/sec. Nothing here touches serial or Qt -- pure data. +""" +import threading +from collections import deque + + +class Channel: + """One PID's rolling history plus session min/max.""" + + def __init__(self, key, maxlen=3600): + self.key = key + self.buf = deque(maxlen=maxlen) # (t, value); value may be None + self.lo = None + self.hi = None + self.last_t = None + self.last_v = None + + def push(self, t, v): + self.buf.append((t, v)) + self.last_t, self.last_v = t, v + if v is not None: + self.lo = v if self.lo is None else min(self.lo, v) + self.hi = v if self.hi is None else max(self.hi, v) + + def reset_minmax(self): + self.lo = self.hi = self.last_v + + def series(self, since=None): + """Return [(t, v), ...]; if since given, only samples with t >= since.""" + if since is None: + return list(self.buf) + return [(t, v) for (t, v) in self.buf if t >= since] + + +class TimeSeriesStore: + """Thread-safe collection of Channels keyed by PID key.""" + + def __init__(self, maxlen=3600): + self._ch = {} + self._maxlen = maxlen + self._lock = threading.Lock() + self.recorder = None # set to a recorder with .write(key, t, v) + + def channel(self, key): + with self._lock: + c = self._ch.get(key) + if c is None: + c = Channel(key, self._maxlen) + self._ch[key] = c + return c + + def push(self, key, t, v): + self.channel(key).push(t, v) + rec = self.recorder + if rec is not None: + rec.write(key, t, v) + + def latest(self, key): + c = self._ch.get(key) + return None if c is None else c.last_v + + def minmax(self, key): + c = self._ch.get(key) + return (None, None) if c is None else (c.lo, c.hi) + + def reset_minmax(self, keys=None): + with self._lock: + for k, c in self._ch.items(): + if keys is None or k in keys: + c.reset_minmax() + + def keys(self): + with self._lock: + return list(self._ch.keys()) + + +class CsvRecorder: + """Long-format session recorder: one row per sample (t,key,value). + + Long format (vs wide) tolerates per-PID poll rates and PIDs appearing + mid-session. Replay re-pushes rows into a fresh store in t order. + """ + + def __init__(self, path): + self._f = open(path, "w") + self._f.write("t,key,value\n") + self._lock = threading.Lock() + + def write(self, key, t, v): + with self._lock: + self._f.write(f"{t:.3f},{key},{'' if v is None else v}\n") + + def close(self): + with self._lock: + self._f.close() + + +def replay_csv(path, store): + """Load a CsvRecorder file back into a store (for playback).""" + with open(path) as f: + next(f, None) # header + for line in f: + parts = line.rstrip("\n").split(",", 2) + if len(parts) != 3: + continue + t, key, v = parts + try: + t = float(t) + except ValueError: + continue + val = None if v == "" else float(v) if _is_num(v) else v + store.push(key, t, val) + return store + + +def _is_num(s): + try: + float(s) + return True + except ValueError: + return False diff --git a/tests/test_obdcore.py b/tests/test_obdcore.py new file mode 100644 index 0000000..ab5c19e --- /dev/null +++ b/tests/test_obdcore.py @@ -0,0 +1,117 @@ +"""Hardware-free tests for the obdcore acquisition engine. + +Drives the scheduler deterministically with a fake clock + MockLink, so the +prioritized polling, derived channels, dead-PID parking, store min/max, and +record/replay are all validated without a truck or serial port. + +Run: python -m pytest tests/ -q (or) python tests/test_obdcore.py +""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from obdcore import PidRegistry, TimeSeriesStore, PollScheduler, CsvRecorder, replay_csv +from obdcore.mock import MockLink + + +class FakeClock: + def __init__(self): + self.t = 0.0 + + def __call__(self): + return self.t + + def advance(self, dt): + self.t += dt + + +def _setup(specs): + clk = FakeClock() + reg = PidRegistry() + store = TimeSeriesStore() + link = MockLink(clock=clk) + sch = PollScheduler(link, reg, store, clock=clk) + sch.set_subscriptions(specs) + return clk, reg, store, sch + + +def test_registry_decoders_match_truck_bytes(): + reg = PidRegistry() + cases = { + "ICP": ([0x00, 0x16], 12.5), "EBP": ([0x01, 0x8F], 14.46), + "MAP": ([0x01, 0x89], 14.25), "BARO": ([0x01, 0x88], 14.21), + "EOT": ([0x1C, 0x92], 33.1), "GEAR": ([0x02], 1), + "FICM_M": ([0x30, 0x00], 48.0), + } + for key, (raw, expect) in cases.items(): + got = reg.get(key).decode(raw) + assert abs(got - expect) < 0.05, f"{key}: {got} != {expect}" + print(" decoders match truck bytes: OK") + + +def test_crank_ramp_and_peak(): + clk, reg, store, sch = _setup([("ICP", 5), ("FICM_M", 2), ("BATT", 2), ("RPM", 2)]) + for _ in range(60): # ~3s at 50ms steps + sch.tick() + clk.advance(0.05) + icp = store.channel("ICP") + lo, hi = store.minmax("ICP") + assert hi >= 500, f"ICP should ramp past 500, got peak {hi}" + assert lo is not None and lo < 50, "ICP should start low" + assert store.latest("FICM_M") == 48.0 + bat_lo, _ = store.minmax("BATT") + assert bat_lo <= 10.7, f"battery sag should be captured, got {bat_lo}" + print(f" crank ramp: ICP {lo} -> peak {hi}, FICM {store.latest('FICM_M')}V, " + f"batt min {bat_lo}V: OK") + + +def test_derived_boost_channel(): + clk, reg, store, sch = _setup([("MAP", 5), ("BARO", 5), ("BOOST", 5)]) + for _ in range(10): + sch.tick() + clk.advance(0.05) + boost = store.latest("BOOST") + assert boost is not None and abs(boost) < 0.5, f"atm boost ~0, got {boost}" + print(f" derived BOOST = MAP-BARO = {boost} psi: OK") + + +def test_dead_pid_parks_and_revives(): + clk, reg, store, sch = _setup([("ICP", 5), ("IPR", 5)]) # IPR not in MockLink -> dead + for _ in range(20): + sch.tick() + clk.advance(0.05) + # IPR should have parked (4 fails) but scheduler still runs ICP + assert store.latest("ICP") is not None + assert store.latest("IPR") is None + # advance past revive window -> IPR re-attempted (still None, but tried) + clk.advance(6.0) + sch.tick() + print(" dead-PID parking + revive: OK") + + +def test_record_replay_roundtrip(tmp_path=None): + import tempfile + path = os.path.join(tempfile.gettempdir(), "obdcore_replay_test.csv") + clk, reg, store, sch = _setup([("ICP", 5), ("FICM_M", 5)]) + store.recorder = CsvRecorder(path) + for _ in range(20): + sch.tick() + clk.advance(0.05) + store.recorder.close() + peak_orig = store.minmax("ICP")[1] + + store2 = TimeSeriesStore() + replay_csv(path, store2) + peak_replay = store2.minmax("ICP")[1] + assert abs(peak_orig - peak_replay) < 0.01, "replay should reproduce peak ICP" + print(f" record/replay: peak ICP {peak_orig} == replay {peak_replay}: OK") + os.remove(path) + + +if __name__ == "__main__": + for fn in [test_registry_decoders_match_truck_bytes, test_crank_ramp_and_peak, + test_derived_boost_channel, test_dead_pid_parks_and_revives, + test_record_replay_roundtrip]: + fn() + print("\nALL obdcore TESTS PASS")