Files
obdash/tests/test_scheduler.py
justin 23c92018c1 Fix #8: scheduler survives link death; timed-out one-offs cancelled
- A transport exception in the poll loop killed the thread silently, leaving the
  GUI on a frozen 'Connected' dashboard and blocking run_oneoff callers for the
  full timeout. _loop now catches it -> stops, fails pending one-offs with the
  real error, and calls an on_error callback. Controller wires on_error to flag
  the connection dead; the GUI detects it in _tick and tears down with a
  'Connection lost' dialog.
- A run_oneoff that timed out left its job queued, so it executed LATER on the
  shared link -- a ghost/duplicate vehicle command. Jobs now carry
  cancelled/started flags under a lock; on timeout a not-yet-started job is
  cancelled (skipped by _drain_oneoffs), and a started one reports 'still
  running -- do NOT retry'. stop() also frees stranded submitters.
- tests/test_scheduler.py: cancel-on-timeout, freed-on-death, loop-survives.

Closes #8

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
2026-07-01 19:33:33 -04:00

74 lines
2.4 KiB
Python

"""PollScheduler robustness: one-off cancellation + surviving link death."""
import os
import sys
import threading
import time
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from obdcore import PidRegistry, TimeSeriesStore, PollScheduler, load_default
from obdcore.mock import MockLink
def _sched(link=None):
prof = load_default()
return PollScheduler(link or MockLink(clock=time.time), PidRegistry(prof),
TimeSeriesStore(), clock=time.time)
class _RaisingLink(MockLink):
def read_m01(self, pid, nbytes, timeout=0.6):
raise OSError("device disconnected")
def test_oneoff_cancel_on_timeout():
s = _sched()
s._running = True # pretend a poll thread is up but not servicing
ran = []
try:
s.run_oneoff(lambda: ran.append(1), timeout=0.05)
raise AssertionError("should have timed out")
except TimeoutError as e:
assert "cancelled" in str(e), e
s._drain_oneoffs() # the cancelled job must NOT execute later
assert ran == [], "cancelled one-off fired late — ghost command"
print(" timed-out one-off is cancelled, never runs late: OK")
def test_oneoff_freed_when_thread_dies():
s = _sched()
s._running = True
got = []
def submit():
try:
s.run_oneoff(lambda: None, timeout=5.0)
except Exception as e:
got.append(e)
t = threading.Thread(target=submit); t.start()
time.sleep(0.05)
s._fail_pending_oneoffs(RuntimeError("link died")) # simulate thread death
t.join(timeout=1.0)
assert got and "link died" in str(got[0]), got
print(" blocked one-off freed immediately on thread death: OK")
def test_loop_survives_link_death_and_reports():
errs = []
s = PollScheduler(_RaisingLink(clock=time.time), PidRegistry(load_default()),
TimeSeriesStore(), clock=time.time, on_error=lambda e: errs.append(e))
s.subscribe("RPM", 5)
s._running = True
s._loop() # the raising read propagates -> caught, not fatal
assert not s._running, "thread should stop, not spin"
assert errs and isinstance(errs[0], OSError), errs
print(" poll loop catches transport death + fires on_error: OK")
if __name__ == "__main__":
test_oneoff_cancel_on_timeout()
test_oneoff_freed_when_thread_dies()
test_loop_survives_link_death_and_reports()
print("\nALL SCHEDULER TESTS PASS")