Fix #8: scheduler survives link death; timed-out one-offs cancelled

- A transport exception in the poll loop killed the thread silently, leaving the
  GUI on a frozen 'Connected' dashboard and blocking run_oneoff callers for the
  full timeout. _loop now catches it -> stops, fails pending one-offs with the
  real error, and calls an on_error callback. Controller wires on_error to flag
  the connection dead; the GUI detects it in _tick and tears down with a
  'Connection lost' dialog.
- A run_oneoff that timed out left its job queued, so it executed LATER on the
  shared link -- a ghost/duplicate vehicle command. Jobs now carry
  cancelled/started flags under a lock; on timeout a not-yet-started job is
  cancelled (skipped by _drain_oneoffs), and a started one reports 'still
  running -- do NOT retry'. stop() also frees stranded submitters.
- tests/test_scheduler.py: cancel-on-timeout, freed-on-death, loop-survives.

Closes #8

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
This commit is contained in:
2026-07-01 19:33:33 -04:00
parent b5e0c96763
commit 23c92018c1
4 changed files with 154 additions and 13 deletions
+9 -1
View File
@@ -49,6 +49,12 @@ class Controller:
elif p.mode == "01" and p.pid.upper() == "10":
self.maf_key = p.key
def _on_poll_error(self, exc):
"""Called on the poll thread if it dies (transport failure). Flag the
connection dead so the GUI stops showing frozen 'Connected' data."""
self.poll_error = exc
self.connected = False
def load_profile(self, path):
"""Switch the active vehicle profile (only allowed while disconnected)."""
self.profile = load_profile(path)
@@ -76,7 +82,9 @@ class Controller:
self.link.fast_timing(True)
except Exception:
pass
self.sched = PollScheduler(self.link, self.reg, self.store, clock=time.time)
self.poll_error = None
self.sched = PollScheduler(self.link, self.reg, self.store, clock=time.time,
on_error=self._on_poll_error)
self.t0 = time.time()
self.connected = True
self.trip.reset()
+13
View File
@@ -775,6 +775,17 @@ class MainWindow(QtWidgets.QMainWindow):
b.setEnabled(False)
self.status.showMessage("Disconnected.")
def _on_link_lost(self, exc):
"""The polling thread died (transport failure). Tear down and tell the
user instead of leaving a frozen 'Connected' dashboard."""
self.ctl.poll_error = None # one-shot
self._disconnect()
self.status.showMessage(f"Connection lost: {exc}")
QtWidgets.QMessageBox.warning(
self, "Connection lost",
f"The adapter connection failed and polling stopped:\n\n{exc}\n\n"
"Check the adapter/cable and reconnect.")
# ---------- PID selection ----------
def _apply_preset(self, name):
if not self.ctl.connected:
@@ -1001,6 +1012,8 @@ class MainWindow(QtWidgets.QMainWindow):
def _tick(self):
if not self.ctl.connected:
if getattr(self.ctl, "poll_error", None) is not None:
self._on_link_lost(self.ctl.poll_error)
return
self.tree.blockSignals(True)
for key, it in self._items.items():
+59 -12
View File
@@ -16,14 +16,19 @@ import time
class _OneOff:
"""A single command to run once on the polling thread (DTC read/clear,
probe, etc). The submitter blocks on `done` until the thread has run it."""
__slots__ = ("fn", "done", "result", "error")
probe, etc). The submitter blocks on `done` until the thread has run it.
`cancelled`/`started` (guarded by `lock`) let a timed-out submitter cancel a
still-queued job so it never fires late on the vehicle."""
__slots__ = ("fn", "done", "result", "error", "cancelled", "started", "lock")
def __init__(self, fn):
self.fn = fn
self.done = threading.Event()
self.result = None
self.error = None
self.cancelled = False
self.started = False
self.lock = threading.Lock()
class _Sub:
@@ -39,13 +44,14 @@ class _Sub:
class PollScheduler:
def __init__(self, link, registry, store, clock=time.time, dead_after=4,
revive_every=5.0):
revive_every=5.0, on_error=None):
self.link = link
self.reg = registry
self.store = store
self.clock = clock
self.dead_after = dead_after
self.revive_every = revive_every
self.on_error = on_error # called(exc) if the poll thread dies
self._subs = {}
self._lock = threading.Lock()
self._thread = None
@@ -102,6 +108,10 @@ class PollScheduler:
job = self._oneoffs.get_nowait()
except queue.Empty:
return
with job.lock: # a timed-out submitter may have cancelled
if job.cancelled:
continue
job.started = True
try:
job.result = job.fn()
except Exception as e: # hand the failure back
@@ -109,17 +119,40 @@ class PollScheduler:
finally:
job.done.set()
def _fail_pending_oneoffs(self, exc):
"""Fail every still-queued (not yet started) one-off with `exc` so a
blocked submitter is freed immediately instead of hanging the full
timeout -- used when the poll thread dies or stops."""
err = exc if isinstance(exc, BaseException) else RuntimeError(str(exc))
while True:
try:
job = self._oneoffs.get_nowait()
except queue.Empty:
return
with job.lock:
if job.started or job.cancelled:
continue
job.cancelled = True
job.error = err
job.done.set()
def run_oneoff(self, fn, timeout=8.0):
"""Enqueue `fn` to run once on the polling thread and block for its
result (or re-raise its exception). When the scheduler thread isn't
running, the job is drained inline on the caller -- still serialized
against tick(), and safe because nothing else is touching the link."""
result (or re-raise its exception). When no live polling thread is
servicing the queue, the job is drained inline on the caller -- still
serialized against tick(), and safe because nothing else touches the
link. On timeout a still-queued job is CANCELLED so it can never fire
late on the vehicle."""
job = _OneOff(fn)
self._oneoffs.put(job)
if not self._running:
if not self._running or (self._thread is not None and not self._thread.is_alive()):
self._drain_oneoffs()
if not job.done.wait(timeout):
raise TimeoutError("one-off command timed out")
with job.lock:
if not job.started:
job.cancelled = True
raise TimeoutError("command timed out and was cancelled — it will not run")
raise TimeoutError("command is still running on the adapter — do NOT retry")
if job.error is not None:
raise job.error
return job.result
@@ -167,16 +200,30 @@ class PollScheduler:
self._thread.start()
def _loop(self):
while self._running:
n = self.tick()
if n == 0:
time.sleep(0.005) # nothing due; yield
try:
while self._running:
n = self.tick()
if n == 0:
time.sleep(0.005) # nothing due; yield
except Exception as e:
# a transport/link error would otherwise kill the thread silently,
# leaving the GUI showing "Connected" with frozen data and blocked
# one-off callers hung. Fail loudly instead.
self._running = False
self._fail_pending_oneoffs(e)
cb = self.on_error
if cb:
try:
cb(e)
except Exception:
pass
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
self._thread = None
self._fail_pending_oneoffs(RuntimeError("scheduler stopped"))
def _is_derived(reg, key):
+73
View File
@@ -0,0 +1,73 @@
"""PollScheduler robustness: one-off cancellation + surviving link death."""
import os
import sys
import threading
import time
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from obdcore import PidRegistry, TimeSeriesStore, PollScheduler, load_default
from obdcore.mock import MockLink
def _sched(link=None):
prof = load_default()
return PollScheduler(link or MockLink(clock=time.time), PidRegistry(prof),
TimeSeriesStore(), clock=time.time)
class _RaisingLink(MockLink):
def read_m01(self, pid, nbytes, timeout=0.6):
raise OSError("device disconnected")
def test_oneoff_cancel_on_timeout():
s = _sched()
s._running = True # pretend a poll thread is up but not servicing
ran = []
try:
s.run_oneoff(lambda: ran.append(1), timeout=0.05)
raise AssertionError("should have timed out")
except TimeoutError as e:
assert "cancelled" in str(e), e
s._drain_oneoffs() # the cancelled job must NOT execute later
assert ran == [], "cancelled one-off fired late — ghost command"
print(" timed-out one-off is cancelled, never runs late: OK")
def test_oneoff_freed_when_thread_dies():
s = _sched()
s._running = True
got = []
def submit():
try:
s.run_oneoff(lambda: None, timeout=5.0)
except Exception as e:
got.append(e)
t = threading.Thread(target=submit); t.start()
time.sleep(0.05)
s._fail_pending_oneoffs(RuntimeError("link died")) # simulate thread death
t.join(timeout=1.0)
assert got and "link died" in str(got[0]), got
print(" blocked one-off freed immediately on thread death: OK")
def test_loop_survives_link_death_and_reports():
errs = []
s = PollScheduler(_RaisingLink(clock=time.time), PidRegistry(load_default()),
TimeSeriesStore(), clock=time.time, on_error=lambda e: errs.append(e))
s.subscribe("RPM", 5)
s._running = True
s._loop() # the raising read propagates -> caught, not fatal
assert not s._running, "thread should stop, not spin"
assert errs and isinstance(errs[0], OSError), errs
print(" poll loop catches transport death + fires on_error: OK")
if __name__ == "__main__":
test_oneoff_cancel_on_timeout()
test_oneoff_freed_when_thread_dies()
test_loop_survives_link_death_and_reports()
print("\nALL SCHEDULER TESTS PASS")