23c92018c1
- A transport exception in the poll loop killed the thread silently, leaving the GUI on a frozen 'Connected' dashboard and blocking run_oneoff callers for the full timeout. _loop now catches it -> stops, fails pending one-offs with the real error, and calls an on_error callback. Controller wires on_error to flag the connection dead; the GUI detects it in _tick and tears down with a 'Connection lost' dialog. - A run_oneoff that timed out left its job queued, so it executed LATER on the shared link -- a ghost/duplicate vehicle command. Jobs now carry cancelled/started flags under a lock; on timeout a not-yet-started job is cancelled (skipped by _drain_oneoffs), and a started one reports 'still running -- do NOT retry'. stop() also frees stranded submitters. - tests/test_scheduler.py: cancel-on-timeout, freed-on-death, loop-survives. Closes #8 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
232 lines
8.3 KiB
Python
232 lines
8.3 KiB
Python
"""PollScheduler -- the prioritized acquisition engine.
|
|
|
|
The ELM327 is a one-request-at-a-time straw (~7-15 reads/sec total). This
|
|
scheduler holds a subscription set (PID key -> target Hz) and, each tick,
|
|
reads the PIDs that are due, round-robin, pushing timestamped samples into the
|
|
TimeSeriesStore. Dead PIDs (4 consecutive no-responses) are parked and
|
|
periodically revived so they don't starve the sample rate.
|
|
|
|
Acquisition runs on a background thread; tick() is also public so tests can
|
|
drive it deterministically with a fake clock (no threads, no sleeps).
|
|
"""
|
|
import queue
|
|
import threading
|
|
import time
|
|
|
|
|
|
class _OneOff:
|
|
"""A single command to run once on the polling thread (DTC read/clear,
|
|
probe, etc). The submitter blocks on `done` until the thread has run it.
|
|
`cancelled`/`started` (guarded by `lock`) let a timed-out submitter cancel a
|
|
still-queued job so it never fires late on the vehicle."""
|
|
__slots__ = ("fn", "done", "result", "error", "cancelled", "started", "lock")
|
|
|
|
def __init__(self, fn):
|
|
self.fn = fn
|
|
self.done = threading.Event()
|
|
self.result = None
|
|
self.error = None
|
|
self.cancelled = False
|
|
self.started = False
|
|
self.lock = threading.Lock()
|
|
|
|
|
|
class _Sub:
|
|
__slots__ = ("key", "period", "next_due", "fails", "active")
|
|
|
|
def __init__(self, key, period):
|
|
self.key = key
|
|
self.period = period
|
|
self.next_due = 0.0
|
|
self.fails = 0
|
|
self.active = True
|
|
|
|
|
|
class PollScheduler:
|
|
def __init__(self, link, registry, store, clock=time.time, dead_after=4,
|
|
revive_every=5.0, on_error=None):
|
|
self.link = link
|
|
self.reg = registry
|
|
self.store = store
|
|
self.clock = clock
|
|
self.dead_after = dead_after
|
|
self.revive_every = revive_every
|
|
self.on_error = on_error # called(exc) if the poll thread dies
|
|
self._subs = {}
|
|
self._lock = threading.Lock()
|
|
self._thread = None
|
|
self._running = False
|
|
self._last_revive = 0.0
|
|
self._oneoffs = queue.Queue()
|
|
|
|
# -- subscription management --
|
|
def set_subscriptions(self, specs):
|
|
"""specs: iterable of (key, hz). Replaces the whole set."""
|
|
with self._lock:
|
|
self._subs = {k: _Sub(k, (1.0 / hz) if hz > 0 else 0.5) for k, hz in specs}
|
|
|
|
def subscribe(self, key, hz):
|
|
with self._lock:
|
|
self._subs[key] = _Sub(key, (1.0 / hz) if hz > 0 else 0.5)
|
|
|
|
def unsubscribe(self, key):
|
|
with self._lock:
|
|
self._subs.pop(key, None)
|
|
|
|
def subscriptions(self):
|
|
with self._lock:
|
|
return list(self._subs.keys())
|
|
|
|
# -- the core read of a single PID --
|
|
def _read(self, p, frame_vals):
|
|
if p.mode == "atrv":
|
|
return self.link.read_atrv()
|
|
if p.mode == "derived":
|
|
vals = [frame_vals.get(d, self.store.latest(d)) for d in p.deps]
|
|
if any(v is None for v in vals):
|
|
return None
|
|
try:
|
|
return p.decode(vals)
|
|
except Exception:
|
|
return None
|
|
raw = (self.link.read_m01(p.pid, p.nbytes) if p.mode == "01"
|
|
else self.link.read_m22(p.pid))
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return p.decode(raw)
|
|
except Exception:
|
|
return None
|
|
|
|
# -- one-off commands (thread-safe, serialized onto the polling thread) --
|
|
def _drain_oneoffs(self):
|
|
"""Run every queued one-off on the *calling* thread. Invoked at the top
|
|
of tick() so one-offs interleave with polling on the same thread that
|
|
owns the serial link -- never concurrently with a PID read."""
|
|
while True:
|
|
try:
|
|
job = self._oneoffs.get_nowait()
|
|
except queue.Empty:
|
|
return
|
|
with job.lock: # a timed-out submitter may have cancelled
|
|
if job.cancelled:
|
|
continue
|
|
job.started = True
|
|
try:
|
|
job.result = job.fn()
|
|
except Exception as e: # hand the failure back
|
|
job.error = e
|
|
finally:
|
|
job.done.set()
|
|
|
|
def _fail_pending_oneoffs(self, exc):
|
|
"""Fail every still-queued (not yet started) one-off with `exc` so a
|
|
blocked submitter is freed immediately instead of hanging the full
|
|
timeout -- used when the poll thread dies or stops."""
|
|
err = exc if isinstance(exc, BaseException) else RuntimeError(str(exc))
|
|
while True:
|
|
try:
|
|
job = self._oneoffs.get_nowait()
|
|
except queue.Empty:
|
|
return
|
|
with job.lock:
|
|
if job.started or job.cancelled:
|
|
continue
|
|
job.cancelled = True
|
|
job.error = err
|
|
job.done.set()
|
|
|
|
def run_oneoff(self, fn, timeout=8.0):
|
|
"""Enqueue `fn` to run once on the polling thread and block for its
|
|
result (or re-raise its exception). When no live polling thread is
|
|
servicing the queue, the job is drained inline on the caller -- still
|
|
serialized against tick(), and safe because nothing else touches the
|
|
link. On timeout a still-queued job is CANCELLED so it can never fire
|
|
late on the vehicle."""
|
|
job = _OneOff(fn)
|
|
self._oneoffs.put(job)
|
|
if not self._running or (self._thread is not None and not self._thread.is_alive()):
|
|
self._drain_oneoffs()
|
|
if not job.done.wait(timeout):
|
|
with job.lock:
|
|
if not job.started:
|
|
job.cancelled = True
|
|
raise TimeoutError("command timed out and was cancelled — it will not run")
|
|
raise TimeoutError("command is still running on the adapter — do NOT retry")
|
|
if job.error is not None:
|
|
raise job.error
|
|
return job.result
|
|
|
|
def tick(self, now=None):
|
|
"""Read all due PIDs once. Returns number of PIDs read."""
|
|
self._drain_oneoffs() # one-offs first, same thread
|
|
now = self.clock() if now is None else now
|
|
if now - self._last_revive >= self.revive_every:
|
|
with self._lock:
|
|
for s in self._subs.values():
|
|
if not s.active:
|
|
s.active, s.fails = True, 0
|
|
self._last_revive = now
|
|
|
|
with self._lock:
|
|
due = [s for s in self._subs.values() if s.active and now >= s.next_due]
|
|
due.sort(key=lambda s: s.next_due)
|
|
|
|
frame_vals = {}
|
|
# non-derived first so derived channels can use this frame's values
|
|
order = sorted(due, key=lambda s: 1 if _is_derived(self.reg, s.key) else 0)
|
|
for s in order:
|
|
p = self.reg.get(s.key)
|
|
if p is None:
|
|
continue
|
|
v = self._read(p, frame_vals)
|
|
frame_vals[s.key] = v
|
|
self.store.push(s.key, self.clock(), v)
|
|
s.next_due = now + s.period
|
|
if v is None:
|
|
s.fails += 1
|
|
if s.fails >= self.dead_after:
|
|
s.active = False
|
|
else:
|
|
s.fails = 0
|
|
return len(order)
|
|
|
|
# -- background thread --
|
|
def start(self):
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._loop, daemon=True)
|
|
self._thread.start()
|
|
|
|
def _loop(self):
|
|
try:
|
|
while self._running:
|
|
n = self.tick()
|
|
if n == 0:
|
|
time.sleep(0.005) # nothing due; yield
|
|
except Exception as e:
|
|
# a transport/link error would otherwise kill the thread silently,
|
|
# leaving the GUI showing "Connected" with frozen data and blocked
|
|
# one-off callers hung. Fail loudly instead.
|
|
self._running = False
|
|
self._fail_pending_oneoffs(e)
|
|
cb = self.on_error
|
|
if cb:
|
|
try:
|
|
cb(e)
|
|
except Exception:
|
|
pass
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
if self._thread:
|
|
self._thread.join(timeout=2.0)
|
|
self._thread = None
|
|
self._fail_pending_oneoffs(RuntimeError("scheduler stopped"))
|
|
|
|
|
|
def _is_derived(reg, key):
|
|
p = reg.get(key)
|
|
return bool(p and p.mode == "derived")
|