Add Tools/Diagnostics: thread-safe DTC read/clear + Diagnostics panel
The polling thread owns the ELM327, so reading/clearing trouble codes from
the GUI thread would race PID reads and corrupt the stream. Add a one-off
command path that serializes ad-hoc link work onto the polling thread.
obdcore/scheduler.py:
- PollScheduler.run_oneoff(fn, timeout) enqueues a callable (queue.Queue +
threading.Event) and blocks for its result, re-raising the callable's
exception. tick() drains queued one-offs at its very top, so they run on
the same thread that does PID reads -- never concurrently. When the
scheduler thread isn't running, the job is drained inline on the caller
(still serialized vs tick(), safe because nothing else touches the link).
gui/controller.py:
- Controller.read_dtcs() -> {"stored","pending","permanent"} (modes 03/07/0A,
svc 0x43/0x47/0x4A) and clear_dtcs() -> bool. Both route through the
scheduler one-off when a scheduler exists, else call the link directly.
gui/main.py:
- Diagnostics menu (Read Codes / Clear Codes...) and a right-side QDockWidget
listing codes grouped Stored/Pending/Permanent. Each row is code +
description + system from DtcDatabase; no_start codes are flagged bold red.
- Clear is guarded by a confirmation warning (erases codes + freeze frame;
honest "the code comes right back" / permanent-codes-won't-clear tone from
run_clear in obd_reader.py). On confirm: clear, then re-read immediately and
show whatever returned, reporting active faults that came straight back.
tests/test_diagnostics.py:
- one-off returns its value, re-raises exceptions, is drained before a tick's
PID reads, and runs on a live background thread while polling continues.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
@@ -15,6 +15,14 @@ FAST = {"ICP", "FICM_M", "RPM"}
|
|||||||
DEFAULT_HZ = 2
|
DEFAULT_HZ = 2
|
||||||
FAST_HZ = 5
|
FAST_HZ = 5
|
||||||
|
|
||||||
|
# DTC services: result-bucket label, request mode string, response service byte
|
||||||
|
# (mode "03"->0x43 stored, "07"->0x47 pending, "0A"->0x4A permanent)
|
||||||
|
DTC_SERVICES = (
|
||||||
|
("stored", "03", 0x43),
|
||||||
|
("pending", "07", 0x47),
|
||||||
|
("permanent", "0A", 0x4A),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Controller:
|
class Controller:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
@@ -79,6 +87,31 @@ class Controller:
|
|||||||
def now(self):
|
def now(self):
|
||||||
return (time.time() - self.t0) if self.t0 else 0.0
|
return (time.time() - self.t0) if self.t0 else 0.0
|
||||||
|
|
||||||
|
# -- diagnostics (DTCs) --------------------------------------------------
|
||||||
|
# All link access goes through the scheduler's one-off path when a
|
||||||
|
# scheduler exists, so a DTC read/clear never races the polling thread for
|
||||||
|
# the serial link. When disconnected (no scheduler), call the link direct.
|
||||||
|
def _oneoff(self, fn, timeout=8.0):
|
||||||
|
if self.sched is not None:
|
||||||
|
return self.sched.run_oneoff(fn, timeout=timeout)
|
||||||
|
if self.link is not None:
|
||||||
|
return fn()
|
||||||
|
raise RuntimeError("not connected")
|
||||||
|
|
||||||
|
def read_dtcs(self):
|
||||||
|
"""Read stored (03), pending (07) and permanent (0A) DTCs.
|
||||||
|
Returns {"stored": [...], "pending": [...], "permanent": [...]}."""
|
||||||
|
out = {}
|
||||||
|
for label, mode, svc in DTC_SERVICES:
|
||||||
|
out[label] = self._oneoff(
|
||||||
|
lambda m=mode, s=svc: self.link.read_dtcs(m, s)) or []
|
||||||
|
return out
|
||||||
|
|
||||||
|
def clear_dtcs(self):
|
||||||
|
"""Mode 04: clear stored+pending codes and freeze frame.
|
||||||
|
Returns True if the ECU acknowledged."""
|
||||||
|
return bool(self._oneoff(lambda: self.link.clear_dtcs()))
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self.sched:
|
if self.sched:
|
||||||
self.sched.stop()
|
self.sched.stop()
|
||||||
|
|||||||
+146
@@ -45,6 +45,7 @@ class MainWindow(QtWidgets.QMainWindow):
|
|||||||
self._build_menubar()
|
self._build_menubar()
|
||||||
self._build_connection_bar()
|
self._build_connection_bar()
|
||||||
self._build_pid_browser()
|
self._build_pid_browser()
|
||||||
|
self._build_diag_dock()
|
||||||
self._build_center()
|
self._build_center()
|
||||||
self._build_statusbar()
|
self._build_statusbar()
|
||||||
self._refresh_title()
|
self._refresh_title()
|
||||||
@@ -73,6 +74,12 @@ class MainWindow(QtWidgets.QMainWindow):
|
|||||||
self.profm = mb.addMenu("&Profile")
|
self.profm = mb.addMenu("&Profile")
|
||||||
self._rebuild_profile_menu()
|
self._rebuild_profile_menu()
|
||||||
|
|
||||||
|
diagm = mb.addMenu("&Diagnostics")
|
||||||
|
self.read_dtc_act = self._act(diagm, "Read Codes", self._read_codes,
|
||||||
|
"Read stored / pending / permanent trouble codes")
|
||||||
|
self.clear_dtc_act = self._act(diagm, "Clear Codes…", self._clear_codes,
|
||||||
|
"Erase stored codes + freeze frame (mode 04)")
|
||||||
|
|
||||||
viewm = mb.addMenu("&View")
|
viewm = mb.addMenu("&View")
|
||||||
self.view_graph = self._act(viewm, "Graph View", lambda: self._set_view(0),
|
self.view_graph = self._act(viewm, "Graph View", lambda: self._set_view(0),
|
||||||
checkable=True)
|
checkable=True)
|
||||||
@@ -85,6 +92,9 @@ class MainWindow(QtWidgets.QMainWindow):
|
|||||||
self.show_pids = self._act(viewm, "Show PID Panel", self._toggle_pid_dock,
|
self.show_pids = self._act(viewm, "Show PID Panel", self._toggle_pid_dock,
|
||||||
checkable=True)
|
checkable=True)
|
||||||
self.show_pids.setChecked(True)
|
self.show_pids.setChecked(True)
|
||||||
|
self.show_diag = self._act(viewm, "Show Diagnostics Panel", self._toggle_diag_dock,
|
||||||
|
checkable=True)
|
||||||
|
self.show_diag.setChecked(True)
|
||||||
self.norm_act = self._act(viewm, "Normalize Graph (% of range)",
|
self.norm_act = self._act(viewm, "Normalize Graph (% of range)",
|
||||||
self._sync_norm_from_menu, checkable=True)
|
self._sync_norm_from_menu, checkable=True)
|
||||||
viewm.addSeparator()
|
viewm.addSeparator()
|
||||||
@@ -202,6 +212,142 @@ class MainWindow(QtWidgets.QMainWindow):
|
|||||||
self.tree.resizeColumnToContents(0)
|
self.tree.resizeColumnToContents(0)
|
||||||
self.tree.blockSignals(False)
|
self.tree.blockSignals(False)
|
||||||
|
|
||||||
|
# ---------- diagnostics dock (DTCs) ----------
|
||||||
|
def _build_diag_dock(self):
|
||||||
|
self.diag_dock = QtWidgets.QDockWidget("Diagnostics", self)
|
||||||
|
wrap = QtWidgets.QWidget()
|
||||||
|
lay = QtWidgets.QVBoxLayout(wrap)
|
||||||
|
lay.setContentsMargins(4, 4, 4, 4)
|
||||||
|
bar = QtWidgets.QHBoxLayout()
|
||||||
|
self.diag_read_btn = QtWidgets.QPushButton("Read Codes")
|
||||||
|
self.diag_read_btn.clicked.connect(self._read_codes)
|
||||||
|
self.diag_clear_btn = QtWidgets.QPushButton("Clear Codes…")
|
||||||
|
self.diag_clear_btn.clicked.connect(self._clear_codes)
|
||||||
|
bar.addWidget(self.diag_read_btn)
|
||||||
|
bar.addWidget(self.diag_clear_btn)
|
||||||
|
bar.addStretch(1)
|
||||||
|
lay.addLayout(bar)
|
||||||
|
self.diag_tree = QtWidgets.QTreeWidget()
|
||||||
|
self.diag_tree.setColumnCount(3)
|
||||||
|
self.diag_tree.setHeaderLabels(["Code", "Description", "System"])
|
||||||
|
self.diag_tree.setRootIsDecorated(True)
|
||||||
|
self.diag_tree.header().setStretchLastSection(False)
|
||||||
|
self.diag_tree.header().setSectionResizeMode(
|
||||||
|
1, QtWidgets.QHeaderView.Stretch)
|
||||||
|
lay.addWidget(self.diag_tree)
|
||||||
|
self.diag_hint = QtWidgets.QLabel(
|
||||||
|
"Connect, then Read Codes. Bold red = no-start / drive-disabling.")
|
||||||
|
self.diag_hint.setWordWrap(True)
|
||||||
|
lay.addWidget(self.diag_hint)
|
||||||
|
self.diag_dock.setWidget(wrap)
|
||||||
|
self.diag_dock.visibilityChanged.connect(
|
||||||
|
lambda vis: self.show_diag.setChecked(vis))
|
||||||
|
self.addDockWidget(QtCore.Qt.RightDockWidgetArea, self.diag_dock)
|
||||||
|
|
||||||
|
_DIAG_GROUPS = [("stored", "Stored (mode 03)"),
|
||||||
|
("pending", "Pending (mode 07)"),
|
||||||
|
("permanent", "Permanent (mode 0A)")]
|
||||||
|
|
||||||
|
def _populate_diag(self, codes):
|
||||||
|
"""codes: {'stored':[...], 'pending':[...], 'permanent':[...]}"""
|
||||||
|
self.diag_tree.clear()
|
||||||
|
total = 0
|
||||||
|
for bucket, label in self._DIAG_GROUPS:
|
||||||
|
lst = codes.get(bucket, [])
|
||||||
|
total += len(lst)
|
||||||
|
top = QtWidgets.QTreeWidgetItem([f"{label} ({len(lst)})", "", ""])
|
||||||
|
f = top.font(0); f.setBold(True); top.setFont(0, f)
|
||||||
|
top.setFirstColumnSpanned(False)
|
||||||
|
self.diag_tree.addTopLevelItem(top)
|
||||||
|
if not lst:
|
||||||
|
none = QtWidgets.QTreeWidgetItem(["—", "(no codes)", ""])
|
||||||
|
none.setForeground(1, QtGui.QBrush(QtGui.QColor("#888")))
|
||||||
|
top.addChild(none)
|
||||||
|
for code in lst:
|
||||||
|
d = self.ctl.dtcdb.get(code)
|
||||||
|
it = QtWidgets.QTreeWidgetItem([code, d.desc, d.system])
|
||||||
|
it.setData(0, QtCore.Qt.UserRole, code)
|
||||||
|
if getattr(d, "no_start", False):
|
||||||
|
red = QtGui.QBrush(QtGui.QColor("#e6194B"))
|
||||||
|
bf = it.font(0); bf.setBold(True)
|
||||||
|
for c in range(3):
|
||||||
|
it.setFont(c, bf)
|
||||||
|
it.setForeground(c, red)
|
||||||
|
it.setToolTip(0, "No-start / drive-disabling fault")
|
||||||
|
top.addChild(it)
|
||||||
|
top.setExpanded(True)
|
||||||
|
self.diag_tree.resizeColumnToContents(0)
|
||||||
|
self.diag_tree.resizeColumnToContents(2)
|
||||||
|
return total
|
||||||
|
|
||||||
|
def _read_codes(self):
|
||||||
|
if not self.ctl.connected:
|
||||||
|
QtWidgets.QMessageBox.information(
|
||||||
|
self, "Not connected", "Connect (or tick Mock) before reading codes.")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
codes = self.ctl.read_dtcs()
|
||||||
|
except Exception as e:
|
||||||
|
QtWidgets.QMessageBox.critical(self, "Read failed", str(e))
|
||||||
|
return
|
||||||
|
total = self._populate_diag(codes)
|
||||||
|
self.diag_dock.setVisible(True)
|
||||||
|
self.show_diag.setChecked(True)
|
||||||
|
self.status.showMessage(
|
||||||
|
f"Read codes: {total} found "
|
||||||
|
f"(stored {len(codes.get('stored', []))}, "
|
||||||
|
f"pending {len(codes.get('pending', []))}, "
|
||||||
|
f"permanent {len(codes.get('permanent', []))})."
|
||||||
|
if total else "Read codes: none stored.")
|
||||||
|
|
||||||
|
def _clear_codes(self):
|
||||||
|
if not self.ctl.connected:
|
||||||
|
QtWidgets.QMessageBox.information(
|
||||||
|
self, "Not connected", "Connect (or tick Mock) before clearing codes.")
|
||||||
|
return
|
||||||
|
btn = QtWidgets.QMessageBox.warning(
|
||||||
|
self, "Clear codes?",
|
||||||
|
"This erases stored + pending codes AND freeze-frame data, and "
|
||||||
|
"resets emissions monitors.\n\n"
|
||||||
|
"Write the codes down first — and read them on a no-start before "
|
||||||
|
"clearing. If the fault is still present the code comes right back.\n"
|
||||||
|
"Permanent codes (mode 0A) will NOT clear until the fault is fixed "
|
||||||
|
"and the vehicle self-clears them over several drive cycles.\n\n"
|
||||||
|
"Clear codes now?",
|
||||||
|
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
|
||||||
|
QtWidgets.QMessageBox.No)
|
||||||
|
if btn != QtWidgets.QMessageBox.Yes:
|
||||||
|
self.status.showMessage("Cancelled. No codes cleared.")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
ok = self.ctl.clear_dtcs()
|
||||||
|
except Exception as e:
|
||||||
|
QtWidgets.QMessageBox.critical(self, "Clear failed", str(e))
|
||||||
|
return
|
||||||
|
if not ok:
|
||||||
|
QtWidgets.QMessageBox.warning(
|
||||||
|
self, "No acknowledgement",
|
||||||
|
"The ECU did not acknowledge the clear.\n"
|
||||||
|
"Make sure the key is in RUN and the vehicle is connected, then "
|
||||||
|
"try again.")
|
||||||
|
self.status.showMessage("Clear not acknowledged by ECU.")
|
||||||
|
return
|
||||||
|
# Re-read immediately so anything that came straight back is shown.
|
||||||
|
try:
|
||||||
|
codes = self.ctl.read_dtcs()
|
||||||
|
except Exception:
|
||||||
|
codes = {}
|
||||||
|
returned = self._populate_diag(codes)
|
||||||
|
if returned:
|
||||||
|
self.status.showMessage(
|
||||||
|
f"Cleared — but {returned} code(s) returned immediately "
|
||||||
|
"(active fault present).")
|
||||||
|
else:
|
||||||
|
self.status.showMessage("Cleared. No codes on re-read.")
|
||||||
|
|
||||||
|
def _toggle_diag_dock(self):
|
||||||
|
self.diag_dock.setVisible(self.show_diag.isChecked())
|
||||||
|
|
||||||
# ---------- center (graph + table stack) ----------
|
# ---------- center (graph + table stack) ----------
|
||||||
def _build_center(self):
|
def _build_center(self):
|
||||||
self.stack = QtWidgets.QStackedWidget()
|
self.stack = QtWidgets.QStackedWidget()
|
||||||
|
|||||||
@@ -9,10 +9,23 @@ periodically revived so they don't starve the sample rate.
|
|||||||
Acquisition runs on a background thread; tick() is also public so tests can
|
Acquisition runs on a background thread; tick() is also public so tests can
|
||||||
drive it deterministically with a fake clock (no threads, no sleeps).
|
drive it deterministically with a fake clock (no threads, no sleeps).
|
||||||
"""
|
"""
|
||||||
|
import queue
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class _OneOff:
|
||||||
|
"""A single command to run once on the polling thread (DTC read/clear,
|
||||||
|
probe, etc). The submitter blocks on `done` until the thread has run it."""
|
||||||
|
__slots__ = ("fn", "done", "result", "error")
|
||||||
|
|
||||||
|
def __init__(self, fn):
|
||||||
|
self.fn = fn
|
||||||
|
self.done = threading.Event()
|
||||||
|
self.result = None
|
||||||
|
self.error = None
|
||||||
|
|
||||||
|
|
||||||
class _Sub:
|
class _Sub:
|
||||||
__slots__ = ("key", "period", "next_due", "fails", "active")
|
__slots__ = ("key", "period", "next_due", "fails", "active")
|
||||||
|
|
||||||
@@ -38,6 +51,7 @@ class PollScheduler:
|
|||||||
self._thread = None
|
self._thread = None
|
||||||
self._running = False
|
self._running = False
|
||||||
self._last_revive = 0.0
|
self._last_revive = 0.0
|
||||||
|
self._oneoffs = queue.Queue()
|
||||||
|
|
||||||
# -- subscription management --
|
# -- subscription management --
|
||||||
def set_subscriptions(self, specs):
|
def set_subscriptions(self, specs):
|
||||||
@@ -78,8 +92,41 @@ class PollScheduler:
|
|||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# -- one-off commands (thread-safe, serialized onto the polling thread) --
|
||||||
|
def _drain_oneoffs(self):
|
||||||
|
"""Run every queued one-off on the *calling* thread. Invoked at the top
|
||||||
|
of tick() so one-offs interleave with polling on the same thread that
|
||||||
|
owns the serial link -- never concurrently with a PID read."""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
job = self._oneoffs.get_nowait()
|
||||||
|
except queue.Empty:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
job.result = job.fn()
|
||||||
|
except Exception as e: # hand the failure back
|
||||||
|
job.error = e
|
||||||
|
finally:
|
||||||
|
job.done.set()
|
||||||
|
|
||||||
|
def run_oneoff(self, fn, timeout=8.0):
|
||||||
|
"""Enqueue `fn` to run once on the polling thread and block for its
|
||||||
|
result (or re-raise its exception). When the scheduler thread isn't
|
||||||
|
running, the job is drained inline on the caller -- still serialized
|
||||||
|
against tick(), and safe because nothing else is touching the link."""
|
||||||
|
job = _OneOff(fn)
|
||||||
|
self._oneoffs.put(job)
|
||||||
|
if not self._running:
|
||||||
|
self._drain_oneoffs()
|
||||||
|
if not job.done.wait(timeout):
|
||||||
|
raise TimeoutError("one-off command timed out")
|
||||||
|
if job.error is not None:
|
||||||
|
raise job.error
|
||||||
|
return job.result
|
||||||
|
|
||||||
def tick(self, now=None):
|
def tick(self, now=None):
|
||||||
"""Read all due PIDs once. Returns number of PIDs read."""
|
"""Read all due PIDs once. Returns number of PIDs read."""
|
||||||
|
self._drain_oneoffs() # one-offs first, same thread
|
||||||
now = self.clock() if now is None else now
|
now = self.clock() if now is None else now
|
||||||
if now - self._last_revive >= self.revive_every:
|
if now - self._last_revive >= self.revive_every:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
|||||||
@@ -0,0 +1,148 @@
|
|||||||
|
"""Hardware-free tests for the Diagnostics (DTC) one-off command path.
|
||||||
|
|
||||||
|
The polling thread owns the serial link, so DTC reads/clears can't hit the link
|
||||||
|
directly -- they go through PollScheduler.run_oneoff, which serializes them onto
|
||||||
|
the same thread that does PID reads. These tests prove (a) a one-off returns
|
||||||
|
its callable's value (and re-raises its exceptions), (b) one-offs are drained at
|
||||||
|
the top of tick() without disturbing normal polling, and (c) the same path runs
|
||||||
|
on a live background thread while polling continues.
|
||||||
|
|
||||||
|
Run: python tests/test_diagnostics.py
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
|
|
||||||
|
from obdcore import (PidRegistry, TimeSeriesStore, PollScheduler,
|
||||||
|
load_default)
|
||||||
|
from obdcore.mock import MockLink
|
||||||
|
|
||||||
|
|
||||||
|
class FakeClock:
|
||||||
|
def __init__(self):
|
||||||
|
self.t = 0.0
|
||||||
|
|
||||||
|
def __call__(self):
|
||||||
|
return self.t
|
||||||
|
|
||||||
|
def advance(self, dt):
|
||||||
|
self.t += dt
|
||||||
|
|
||||||
|
|
||||||
|
def _setup(specs):
|
||||||
|
clk = FakeClock()
|
||||||
|
reg = PidRegistry(load_default())
|
||||||
|
store = TimeSeriesStore()
|
||||||
|
link = MockLink(clock=clk)
|
||||||
|
sch = PollScheduler(link, reg, store, clock=clk)
|
||||||
|
sch.set_subscriptions(specs)
|
||||||
|
return clk, reg, store, link, sch
|
||||||
|
|
||||||
|
|
||||||
|
def test_oneoff_returns_value_and_polling_continues():
|
||||||
|
"""Single-threaded (disconnected-style) path: run_oneoff drains inline when
|
||||||
|
no background thread is running, and normal tick() polling is untouched."""
|
||||||
|
clk, reg, store, link, sch = _setup([("ICP", 5), ("FICM_M", 2)])
|
||||||
|
|
||||||
|
# A one-off returns exactly what its callable returns -- here, the MockLink's
|
||||||
|
# stored DTCs (mode 03 -> ["P0148"]).
|
||||||
|
stored = sch.run_oneoff(lambda: link.read_dtcs("03", 0x43))
|
||||||
|
assert stored == ["P0148"], stored
|
||||||
|
# Pending / permanent are empty on the mock.
|
||||||
|
assert sch.run_oneoff(lambda: link.read_dtcs("07", 0x47)) == []
|
||||||
|
assert sch.run_oneoff(lambda: link.read_dtcs("0A", 0x4A)) == []
|
||||||
|
# A clear one-off returns the link's ack.
|
||||||
|
assert sch.run_oneoff(lambda: link.clear_dtcs()) is True
|
||||||
|
|
||||||
|
# Normal polling still happens through tick().
|
||||||
|
for _ in range(20):
|
||||||
|
sch.tick()
|
||||||
|
clk.advance(0.05)
|
||||||
|
assert store.latest("ICP") is not None, "ICP should still be polled"
|
||||||
|
assert store.latest("FICM_M") == 48.0
|
||||||
|
print(" one-off returns value + tick() polling intact: OK")
|
||||||
|
|
||||||
|
|
||||||
|
def test_oneoff_exception_propagates():
|
||||||
|
clk, reg, store, link, sch = _setup([("ICP", 5)])
|
||||||
|
|
||||||
|
def boom():
|
||||||
|
raise ValueError("link blew up")
|
||||||
|
|
||||||
|
try:
|
||||||
|
sch.run_oneoff(boom)
|
||||||
|
raise AssertionError("expected ValueError to propagate")
|
||||||
|
except ValueError as e:
|
||||||
|
assert "blew up" in str(e)
|
||||||
|
print(" one-off re-raises callable's exception: OK")
|
||||||
|
|
||||||
|
|
||||||
|
def test_oneoff_drained_at_top_of_tick():
|
||||||
|
"""Enqueue a job without inline-draining (simulate the running scheduler),
|
||||||
|
then prove a single tick() drains+runs it before doing its PID reads."""
|
||||||
|
clk, reg, store, link, sch = _setup([("ICP", 5)])
|
||||||
|
sch._running = True # pretend the thread is alive
|
||||||
|
box = {"ran": False, "ticks_at_run": None}
|
||||||
|
|
||||||
|
def job():
|
||||||
|
box["ran"] = True
|
||||||
|
# store is still empty -> the one-off ran before any PID push this tick
|
||||||
|
box["ticks_at_run"] = store.latest("ICP")
|
||||||
|
return "done"
|
||||||
|
|
||||||
|
import threading
|
||||||
|
# Submit via the public API on a helper thread (it blocks waiting for the
|
||||||
|
# job), so the main thread can drive tick() to drain it.
|
||||||
|
result = {}
|
||||||
|
|
||||||
|
def submit():
|
||||||
|
result["val"] = sch.run_oneoff(job, timeout=2.0)
|
||||||
|
|
||||||
|
t = threading.Thread(target=submit)
|
||||||
|
t.start()
|
||||||
|
# wait until the job is queued, then drain it with one tick()
|
||||||
|
for _ in range(200):
|
||||||
|
if not sch._oneoffs.empty():
|
||||||
|
break
|
||||||
|
time.sleep(0.001)
|
||||||
|
sch.tick() # drains the one-off, then polls ICP
|
||||||
|
t.join(timeout=2.0)
|
||||||
|
|
||||||
|
assert box["ran"] is True
|
||||||
|
assert result.get("val") == "done"
|
||||||
|
assert box["ticks_at_run"] is None, "one-off must run before this tick's reads"
|
||||||
|
assert store.latest("ICP") is not None, "ICP polled in the same tick"
|
||||||
|
print(" one-off drained at top of tick(), before PID reads: OK")
|
||||||
|
|
||||||
|
|
||||||
|
def test_oneoff_on_live_polling_thread():
|
||||||
|
"""Realistic path: scheduler running on its own thread (real clock) while a
|
||||||
|
one-off DTC read is submitted from the 'GUI' thread and returns its value."""
|
||||||
|
clk = time.time
|
||||||
|
reg = PidRegistry(load_default())
|
||||||
|
store = TimeSeriesStore()
|
||||||
|
link = MockLink(clock=clk)
|
||||||
|
sch = PollScheduler(link, reg, store, clock=clk)
|
||||||
|
sch.set_subscriptions([("ICP", 50), ("FICM_M", 20)])
|
||||||
|
sch.start()
|
||||||
|
try:
|
||||||
|
time.sleep(0.05) # let it poll a few frames
|
||||||
|
stored = sch.run_oneoff(lambda: link.read_dtcs("03", 0x43), timeout=3.0)
|
||||||
|
assert stored == ["P0148"], stored
|
||||||
|
assert sch.run_oneoff(lambda: link.clear_dtcs(), timeout=3.0) is True
|
||||||
|
time.sleep(0.05)
|
||||||
|
finally:
|
||||||
|
sch.stop()
|
||||||
|
assert store.latest("ICP") is not None, "polling ran alongside the one-off"
|
||||||
|
print(" one-off on live polling thread, polling continued: OK")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
for fn in [test_oneoff_returns_value_and_polling_continues,
|
||||||
|
test_oneoff_exception_propagates,
|
||||||
|
test_oneoff_drained_at_top_of_tick,
|
||||||
|
test_oneoff_on_live_polling_thread]:
|
||||||
|
fn()
|
||||||
|
print("\nALL diagnostics TESTS PASS")
|
||||||
Reference in New Issue
Block a user