From 23c92018c185651a659be1577bd8afd5a5d65530 Mon Sep 17 00:00:00 2001 From: Justin Paul Date: Wed, 1 Jul 2026 19:33:33 -0400 Subject: [PATCH] Fix #8: scheduler survives link death; timed-out one-offs cancelled - A transport exception in the poll loop killed the thread silently, leaving the GUI on a frozen 'Connected' dashboard and blocking run_oneoff callers for the full timeout. _loop now catches it -> stops, fails pending one-offs with the real error, and calls an on_error callback. Controller wires on_error to flag the connection dead; the GUI detects it in _tick and tears down with a 'Connection lost' dialog. - A run_oneoff that timed out left its job queued, so it executed LATER on the shared link -- a ghost/duplicate vehicle command. Jobs now carry cancelled/started flags under a lock; on timeout a not-yet-started job is cancelled (skipped by _drain_oneoffs), and a started one reports 'still running -- do NOT retry'. stop() also frees stranded submitters. - tests/test_scheduler.py: cancel-on-timeout, freed-on-death, loop-survives. Closes #8 Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs --- gui/controller.py | 10 +++++- gui/main.py | 13 ++++++++ obdcore/scheduler.py | 71 ++++++++++++++++++++++++++++++++------- tests/test_scheduler.py | 73 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 13 deletions(-) create mode 100644 tests/test_scheduler.py diff --git a/gui/controller.py b/gui/controller.py index ff4e487..c4d1100 100644 --- a/gui/controller.py +++ b/gui/controller.py @@ -49,6 +49,12 @@ class Controller: elif p.mode == "01" and p.pid.upper() == "10": self.maf_key = p.key + def _on_poll_error(self, exc): + """Called on the poll thread if it dies (transport failure). Flag the + connection dead so the GUI stops showing frozen 'Connected' data.""" + self.poll_error = exc + self.connected = False + def load_profile(self, path): """Switch the active vehicle profile (only allowed while disconnected).""" self.profile = load_profile(path) @@ -76,7 +82,9 @@ class Controller: self.link.fast_timing(True) except Exception: pass - self.sched = PollScheduler(self.link, self.reg, self.store, clock=time.time) + self.poll_error = None + self.sched = PollScheduler(self.link, self.reg, self.store, clock=time.time, + on_error=self._on_poll_error) self.t0 = time.time() self.connected = True self.trip.reset() diff --git a/gui/main.py b/gui/main.py index ef1c673..74c4f76 100644 --- a/gui/main.py +++ b/gui/main.py @@ -775,6 +775,17 @@ class MainWindow(QtWidgets.QMainWindow): b.setEnabled(False) self.status.showMessage("Disconnected.") + def _on_link_lost(self, exc): + """The polling thread died (transport failure). Tear down and tell the + user instead of leaving a frozen 'Connected' dashboard.""" + self.ctl.poll_error = None # one-shot + self._disconnect() + self.status.showMessage(f"Connection lost: {exc}") + QtWidgets.QMessageBox.warning( + self, "Connection lost", + f"The adapter connection failed and polling stopped:\n\n{exc}\n\n" + "Check the adapter/cable and reconnect.") + # ---------- PID selection ---------- def _apply_preset(self, name): if not self.ctl.connected: @@ -1001,6 +1012,8 @@ class MainWindow(QtWidgets.QMainWindow): def _tick(self): if not self.ctl.connected: + if getattr(self.ctl, "poll_error", None) is not None: + self._on_link_lost(self.ctl.poll_error) return self.tree.blockSignals(True) for key, it in self._items.items(): diff --git a/obdcore/scheduler.py b/obdcore/scheduler.py index 9de2ee2..ef557aa 100644 --- a/obdcore/scheduler.py +++ b/obdcore/scheduler.py @@ -16,14 +16,19 @@ import time class _OneOff: """A single command to run once on the polling thread (DTC read/clear, - probe, etc). The submitter blocks on `done` until the thread has run it.""" - __slots__ = ("fn", "done", "result", "error") + probe, etc). The submitter blocks on `done` until the thread has run it. + `cancelled`/`started` (guarded by `lock`) let a timed-out submitter cancel a + still-queued job so it never fires late on the vehicle.""" + __slots__ = ("fn", "done", "result", "error", "cancelled", "started", "lock") def __init__(self, fn): self.fn = fn self.done = threading.Event() self.result = None self.error = None + self.cancelled = False + self.started = False + self.lock = threading.Lock() class _Sub: @@ -39,13 +44,14 @@ class _Sub: class PollScheduler: def __init__(self, link, registry, store, clock=time.time, dead_after=4, - revive_every=5.0): + revive_every=5.0, on_error=None): self.link = link self.reg = registry self.store = store self.clock = clock self.dead_after = dead_after self.revive_every = revive_every + self.on_error = on_error # called(exc) if the poll thread dies self._subs = {} self._lock = threading.Lock() self._thread = None @@ -102,6 +108,10 @@ class PollScheduler: job = self._oneoffs.get_nowait() except queue.Empty: return + with job.lock: # a timed-out submitter may have cancelled + if job.cancelled: + continue + job.started = True try: job.result = job.fn() except Exception as e: # hand the failure back @@ -109,17 +119,40 @@ class PollScheduler: finally: job.done.set() + def _fail_pending_oneoffs(self, exc): + """Fail every still-queued (not yet started) one-off with `exc` so a + blocked submitter is freed immediately instead of hanging the full + timeout -- used when the poll thread dies or stops.""" + err = exc if isinstance(exc, BaseException) else RuntimeError(str(exc)) + while True: + try: + job = self._oneoffs.get_nowait() + except queue.Empty: + return + with job.lock: + if job.started or job.cancelled: + continue + job.cancelled = True + job.error = err + job.done.set() + def run_oneoff(self, fn, timeout=8.0): """Enqueue `fn` to run once on the polling thread and block for its - result (or re-raise its exception). When the scheduler thread isn't - running, the job is drained inline on the caller -- still serialized - against tick(), and safe because nothing else is touching the link.""" + result (or re-raise its exception). When no live polling thread is + servicing the queue, the job is drained inline on the caller -- still + serialized against tick(), and safe because nothing else touches the + link. On timeout a still-queued job is CANCELLED so it can never fire + late on the vehicle.""" job = _OneOff(fn) self._oneoffs.put(job) - if not self._running: + if not self._running or (self._thread is not None and not self._thread.is_alive()): self._drain_oneoffs() if not job.done.wait(timeout): - raise TimeoutError("one-off command timed out") + with job.lock: + if not job.started: + job.cancelled = True + raise TimeoutError("command timed out and was cancelled — it will not run") + raise TimeoutError("command is still running on the adapter — do NOT retry") if job.error is not None: raise job.error return job.result @@ -167,16 +200,30 @@ class PollScheduler: self._thread.start() def _loop(self): - while self._running: - n = self.tick() - if n == 0: - time.sleep(0.005) # nothing due; yield + try: + while self._running: + n = self.tick() + if n == 0: + time.sleep(0.005) # nothing due; yield + except Exception as e: + # a transport/link error would otherwise kill the thread silently, + # leaving the GUI showing "Connected" with frozen data and blocked + # one-off callers hung. Fail loudly instead. + self._running = False + self._fail_pending_oneoffs(e) + cb = self.on_error + if cb: + try: + cb(e) + except Exception: + pass def stop(self): self._running = False if self._thread: self._thread.join(timeout=2.0) self._thread = None + self._fail_pending_oneoffs(RuntimeError("scheduler stopped")) def _is_derived(reg, key): diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 0000000..c03ea87 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,73 @@ +"""PollScheduler robustness: one-off cancellation + surviving link death.""" +import os +import sys +import threading +import time + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from obdcore import PidRegistry, TimeSeriesStore, PollScheduler, load_default +from obdcore.mock import MockLink + + +def _sched(link=None): + prof = load_default() + return PollScheduler(link or MockLink(clock=time.time), PidRegistry(prof), + TimeSeriesStore(), clock=time.time) + + +class _RaisingLink(MockLink): + def read_m01(self, pid, nbytes, timeout=0.6): + raise OSError("device disconnected") + + +def test_oneoff_cancel_on_timeout(): + s = _sched() + s._running = True # pretend a poll thread is up but not servicing + ran = [] + try: + s.run_oneoff(lambda: ran.append(1), timeout=0.05) + raise AssertionError("should have timed out") + except TimeoutError as e: + assert "cancelled" in str(e), e + s._drain_oneoffs() # the cancelled job must NOT execute later + assert ran == [], "cancelled one-off fired late — ghost command" + print(" timed-out one-off is cancelled, never runs late: OK") + + +def test_oneoff_freed_when_thread_dies(): + s = _sched() + s._running = True + got = [] + + def submit(): + try: + s.run_oneoff(lambda: None, timeout=5.0) + except Exception as e: + got.append(e) + + t = threading.Thread(target=submit); t.start() + time.sleep(0.05) + s._fail_pending_oneoffs(RuntimeError("link died")) # simulate thread death + t.join(timeout=1.0) + assert got and "link died" in str(got[0]), got + print(" blocked one-off freed immediately on thread death: OK") + + +def test_loop_survives_link_death_and_reports(): + errs = [] + s = PollScheduler(_RaisingLink(clock=time.time), PidRegistry(load_default()), + TimeSeriesStore(), clock=time.time, on_error=lambda e: errs.append(e)) + s.subscribe("RPM", 5) + s._running = True + s._loop() # the raising read propagates -> caught, not fatal + assert not s._running, "thread should stop, not spin" + assert errs and isinstance(errs[0], OSError), errs + print(" poll loop catches transport death + fires on_error: OK") + + +if __name__ == "__main__": + test_oneoff_cancel_on_timeout() + test_oneoff_freed_when_thread_dies() + test_loop_survives_link_death_and_reports() + print("\nALL SCHEDULER TESTS PASS")