Files
obdash/obdcore/scheduler.py
T
justin 0fea0908c8 Add Tools/Diagnostics: thread-safe DTC read/clear + Diagnostics panel
The polling thread owns the ELM327, so reading/clearing trouble codes from
the GUI thread would race PID reads and corrupt the stream. Add a one-off
command path that serializes ad-hoc link work onto the polling thread.

obdcore/scheduler.py:
- PollScheduler.run_oneoff(fn, timeout) enqueues a callable (queue.Queue +
  threading.Event) and blocks for its result, re-raising the callable's
  exception. tick() drains queued one-offs at its very top, so they run on
  the same thread that does PID reads -- never concurrently. When the
  scheduler thread isn't running, the job is drained inline on the caller
  (still serialized vs tick(), safe because nothing else touches the link).

gui/controller.py:
- Controller.read_dtcs() -> {"stored","pending","permanent"} (modes 03/07/0A,
  svc 0x43/0x47/0x4A) and clear_dtcs() -> bool. Both route through the
  scheduler one-off when a scheduler exists, else call the link directly.

gui/main.py:
- Diagnostics menu (Read Codes / Clear Codes...) and a right-side QDockWidget
  listing codes grouped Stored/Pending/Permanent. Each row is code +
  description + system from DtcDatabase; no_start codes are flagged bold red.
- Clear is guarded by a confirmation warning (erases codes + freeze frame;
  honest "the code comes right back" / permanent-codes-won't-clear tone from
  run_clear in obd_reader.py). On confirm: clear, then re-read immediately and
  show whatever returned, reporting active faults that came straight back.

tests/test_diagnostics.py:
- one-off returns its value, re-raises exceptions, is drained before a tick's
  PID reads, and runs on a live background thread while polling continues.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_016yT89n4zR4qbrySoSiEyZs
2026-06-30 14:53:57 -04:00

185 lines
6.2 KiB
Python

"""PollScheduler -- the prioritized acquisition engine.
The ELM327 is a one-request-at-a-time straw (~7-15 reads/sec total). This
scheduler holds a subscription set (PID key -> target Hz) and, each tick,
reads the PIDs that are due, round-robin, pushing timestamped samples into the
TimeSeriesStore. Dead PIDs (4 consecutive no-responses) are parked and
periodically revived so they don't starve the sample rate.
Acquisition runs on a background thread; tick() is also public so tests can
drive it deterministically with a fake clock (no threads, no sleeps).
"""
import queue
import threading
import time
class _OneOff:
"""A single command to run once on the polling thread (DTC read/clear,
probe, etc). The submitter blocks on `done` until the thread has run it."""
__slots__ = ("fn", "done", "result", "error")
def __init__(self, fn):
self.fn = fn
self.done = threading.Event()
self.result = None
self.error = None
class _Sub:
__slots__ = ("key", "period", "next_due", "fails", "active")
def __init__(self, key, period):
self.key = key
self.period = period
self.next_due = 0.0
self.fails = 0
self.active = True
class PollScheduler:
def __init__(self, link, registry, store, clock=time.time, dead_after=4,
revive_every=5.0):
self.link = link
self.reg = registry
self.store = store
self.clock = clock
self.dead_after = dead_after
self.revive_every = revive_every
self._subs = {}
self._lock = threading.Lock()
self._thread = None
self._running = False
self._last_revive = 0.0
self._oneoffs = queue.Queue()
# -- subscription management --
def set_subscriptions(self, specs):
"""specs: iterable of (key, hz). Replaces the whole set."""
with self._lock:
self._subs = {k: _Sub(k, (1.0 / hz) if hz > 0 else 0.5) for k, hz in specs}
def subscribe(self, key, hz):
with self._lock:
self._subs[key] = _Sub(key, (1.0 / hz) if hz > 0 else 0.5)
def unsubscribe(self, key):
with self._lock:
self._subs.pop(key, None)
def subscriptions(self):
with self._lock:
return list(self._subs.keys())
# -- the core read of a single PID --
def _read(self, p, frame_vals):
if p.mode == "atrv":
return self.link.read_atrv()
if p.mode == "derived":
vals = [frame_vals.get(d, self.store.latest(d)) for d in p.deps]
if any(v is None for v in vals):
return None
try:
return p.decode(vals)
except Exception:
return None
raw = (self.link.read_m01(p.pid, p.nbytes) if p.mode == "01"
else self.link.read_m22(p.pid))
if not raw:
return None
try:
return p.decode(raw)
except Exception:
return None
# -- one-off commands (thread-safe, serialized onto the polling thread) --
def _drain_oneoffs(self):
"""Run every queued one-off on the *calling* thread. Invoked at the top
of tick() so one-offs interleave with polling on the same thread that
owns the serial link -- never concurrently with a PID read."""
while True:
try:
job = self._oneoffs.get_nowait()
except queue.Empty:
return
try:
job.result = job.fn()
except Exception as e: # hand the failure back
job.error = e
finally:
job.done.set()
def run_oneoff(self, fn, timeout=8.0):
"""Enqueue `fn` to run once on the polling thread and block for its
result (or re-raise its exception). When the scheduler thread isn't
running, the job is drained inline on the caller -- still serialized
against tick(), and safe because nothing else is touching the link."""
job = _OneOff(fn)
self._oneoffs.put(job)
if not self._running:
self._drain_oneoffs()
if not job.done.wait(timeout):
raise TimeoutError("one-off command timed out")
if job.error is not None:
raise job.error
return job.result
def tick(self, now=None):
"""Read all due PIDs once. Returns number of PIDs read."""
self._drain_oneoffs() # one-offs first, same thread
now = self.clock() if now is None else now
if now - self._last_revive >= self.revive_every:
with self._lock:
for s in self._subs.values():
if not s.active:
s.active, s.fails = True, 0
self._last_revive = now
with self._lock:
due = [s for s in self._subs.values() if s.active and now >= s.next_due]
due.sort(key=lambda s: s.next_due)
frame_vals = {}
# non-derived first so derived channels can use this frame's values
order = sorted(due, key=lambda s: 1 if _is_derived(self.reg, s.key) else 0)
for s in order:
p = self.reg.get(s.key)
if p is None:
continue
v = self._read(p, frame_vals)
frame_vals[s.key] = v
self.store.push(s.key, self.clock(), v)
s.next_due = now + s.period
if v is None:
s.fails += 1
if s.fails >= self.dead_after:
s.active = False
else:
s.fails = 0
return len(order)
# -- background thread --
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
def _loop(self):
while self._running:
n = self.tick()
if n == 0:
time.sleep(0.005) # nothing due; yield
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
self._thread = None
def _is_derived(reg, key):
p = reg.get(key)
return bool(p and p.mode == "derived")